diff options
Diffstat (limited to 'poky/bitbake')
-rw-r--r-- | poky/bitbake/README | 8 | ||||
-rwxr-xr-x | poky/bitbake/bin/bitbake-prserv | 4 | ||||
-rwxr-xr-x | poky/bitbake/bin/bitbake-worker | 2 | ||||
-rw-r--r-- | poky/bitbake/contrib/vim/syntax/bitbake.vim | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/__init__.py | 4 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/client.py | 10 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/serv.py | 134 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/cooker.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/wget.py | 87 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/providers.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/server/process.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bblayers/query.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/server.py | 4 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/client.py | 48 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/db.py | 65 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/serv.py | 257 |
16 files changed, 384 insertions, 249 deletions
diff --git a/poky/bitbake/README b/poky/bitbake/README index 96e6007e7..2d5cd254e 100644 --- a/poky/bitbake/README +++ b/poky/bitbake/README @@ -33,3 +33,11 @@ Mailing list: Source code: http://git.openembedded.org/bitbake/ + +Testing: + +Bitbake has a testsuite located in lib/bb/tests/ whichs aim to try and prevent regressions. +You can run this with "bitbake-selftest". In particular the fetcher is well covered since +it has so many corner cases. The datastore has many tests too. Testing with the testsuite is +recommended before submitting patches, particularly to the fetcher and datastore. We also +appreciate new test cases and may require them for more obscure issues. diff --git a/poky/bitbake/bin/bitbake-prserv b/poky/bitbake/bin/bitbake-prserv index 1e9b6cbc1..bef5ef689 100755 --- a/poky/bitbake/bin/bitbake-prserv +++ b/poky/bitbake/bin/bitbake-prserv @@ -36,12 +36,14 @@ def main(): dest="host", type="string", default=PRHOST_DEFAULT) parser.add_option("--port", help="port number(default: 8585)", action="store", dest="port", type="int", default=PRPORT_DEFAULT) + parser.add_option("-r", "--read-only", help="open database in read-only mode", + action="store_true") options, args = parser.parse_args(sys.argv) prserv.init_logger(os.path.abspath(options.logfile),options.loglevel) if options.start: - ret=prserv.serv.start_daemon(options.dbfile, options.host, options.port,os.path.abspath(options.logfile)) + ret=prserv.serv.start_daemon(options.dbfile, options.host, options.port,os.path.abspath(options.logfile), options.read_only) elif options.stop: ret=prserv.serv.stop_daemon(options.host, options.port) else: diff --git a/poky/bitbake/bin/bitbake-worker b/poky/bitbake/bin/bitbake-worker index 7765b9368..7d982f90b 100755 --- a/poky/bitbake/bin/bitbake-worker +++ b/poky/bitbake/bin/bitbake-worker @@ -517,5 +517,5 @@ except BaseException as e: worker_thread_exit = True worker_thread.join() -workerlog_write("exitting") +workerlog_write("exiting") sys.exit(0) diff --git a/poky/bitbake/contrib/vim/syntax/bitbake.vim b/poky/bitbake/contrib/vim/syntax/bitbake.vim index d8aa0f1ba..c5ea80fdf 100644 --- a/poky/bitbake/contrib/vim/syntax/bitbake.vim +++ b/poky/bitbake/contrib/vim/syntax/bitbake.vim @@ -77,7 +77,7 @@ syn keyword bbOEFunctions do_fetch do_unpack do_patch do_configure do_comp " Generic Functions syn match bbFunction "\h[0-9A-Za-z_\-\.]*" display contained contains=bbOEFunctions -syn keyword bbOverrideOperator append prepend contained +syn keyword bbOverrideOperator append prepend remove contained " BitBake shell metadata syn include @shell syntax/sh.vim diff --git a/poky/bitbake/lib/bb/__init__.py b/poky/bitbake/lib/bb/__init__.py index c1e30697b..5c248d365 100644 --- a/poky/bitbake/lib/bb/__init__.py +++ b/poky/bitbake/lib/bb/__init__.py @@ -12,8 +12,8 @@ __version__ = "1.51.1" import sys -if sys.version_info < (3, 5, 0): - raise RuntimeError("Sorry, python 3.5.0 or later is required for this version of bitbake") +if sys.version_info < (3, 6, 0): + raise RuntimeError("Sorry, python 3.6.0 or later is required for this version of bitbake") class BBHandledException(Exception): diff --git a/poky/bitbake/lib/bb/asyncrpc/client.py b/poky/bitbake/lib/bb/asyncrpc/client.py index 3eb4fdde8..50e60d5c3 100644 --- a/poky/bitbake/lib/bb/asyncrpc/client.py +++ b/poky/bitbake/lib/bb/asyncrpc/client.py @@ -119,6 +119,16 @@ class Client(object): self.client = self._get_async_client() self.loop = asyncio.new_event_loop() + # Override any pre-existing loop. + # Without this, the PR server export selftest triggers a hang + # when running with Python 3.7. The drawback is that there is + # potential for issues if the PR and hash equiv (or some new) + # clients need to both be instantiated in the same process. + # This should be revisited if/when Python 3.9 becomes the + # minimum required version for BitBake, as it seems not + # required (but harmless) with it. + asyncio.set_event_loop(self.loop) + self._add_methods('connect_tcp', 'close', 'ping') @abc.abstractmethod diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py index 4084f300d..b4cffff21 100644 --- a/poky/bitbake/lib/bb/asyncrpc/serv.py +++ b/poky/bitbake/lib/bb/asyncrpc/serv.py @@ -131,53 +131,55 @@ class AsyncServerConnection(object): class AsyncServer(object): - def __init__(self, logger, loop=None): - if loop is None: - self.loop = asyncio.new_event_loop() - self.close_loop = True - else: - self.loop = loop - self.close_loop = False - + def __init__(self, logger): self._cleanup_socket = None self.logger = logger + self.start = None + self.address = None + self.loop = None def start_tcp_server(self, host, port): - self.server = self.loop.run_until_complete( - asyncio.start_server(self.handle_client, host, port, loop=self.loop) - ) - - for s in self.server.sockets: - self.logger.debug('Listening on %r' % (s.getsockname(),)) - # Newer python does this automatically. Do it manually here for - # maximum compatibility - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) - - name = self.server.sockets[0].getsockname() - if self.server.sockets[0].family == socket.AF_INET6: - self.address = "[%s]:%d" % (name[0], name[1]) - else: - self.address = "%s:%d" % (name[0], name[1]) + def start_tcp(): + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, host, port) + ) + + for s in self.server.sockets: + self.logger.debug('Listening on %r' % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + self.start = start_tcp def start_unix_server(self, path): def cleanup(): os.unlink(path) - cwd = os.getcwd() - try: - # Work around path length limits in AF_UNIX - os.chdir(os.path.dirname(path)) - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) - ) - finally: - os.chdir(cwd) + def start_unix(): + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(path)) + self.server = self.loop.run_until_complete( + asyncio.start_unix_server(self.handle_client, os.path.basename(path)) + ) + finally: + os.chdir(cwd) + + self.logger.debug('Listening on %r' % path) - self.logger.debug('Listening on %r' % path) + self._cleanup_socket = cleanup + self.address = "unix://%s" % os.path.abspath(path) - self._cleanup_socket = cleanup - self.address = "unix://%s" % os.path.abspath(path) + self.start = start_unix @abc.abstractmethod def accept_client(self, reader, writer): @@ -205,8 +207,7 @@ class AsyncServer(object): self.logger.debug("Got exit signal") self.loop.stop() - def serve_forever(self): - asyncio.set_event_loop(self.loop) + def _serve_forever(self): try: self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) @@ -217,28 +218,69 @@ class AsyncServer(object): self.loop.run_until_complete(self.server.wait_closed()) self.logger.debug('Server shutting down') finally: - if self.close_loop: - if sys.version_info >= (3, 6): - self.loop.run_until_complete(self.loop.shutdown_asyncgens()) - self.loop.close() - if self._cleanup_socket is not None: self._cleanup_socket() + def serve_forever(self): + """ + Serve requests in the current process + """ + # Create loop and override any loop that may have existed in + # a parent process. It is possible that the usecases of + # serve_forever might be constrained enough to allow using + # get_event_loop here, but better safe than sorry for now. + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.start() + self._serve_forever() + def serve_as_process(self, *, prefunc=None, args=()): - def run(): + """ + Serve requests in a child process + """ + def run(queue): + # Create loop and override any loop that may have existed + # in a parent process. Without doing this and instead + # using get_event_loop, at the very minimum the hashserv + # unit tests will hang when running the second test. + # This happens since get_event_loop in the spawned server + # process for the second testcase ends up with the loop + # from the hashserv client created in the unit test process + # when running the first testcase. The problem is somewhat + # more general, though, as any potential use of asyncio in + # Cooker could create a loop that needs to replaced in this + # new process. + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + try: + self.start() + finally: + queue.put(self.address) + queue.close() + if prefunc is not None: prefunc(self, *args) - self.serve_forever() + + self._serve_forever() + + if sys.version_info >= (3, 6): + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + + queue = multiprocessing.Queue() # Temporarily block SIGTERM. The server process will inherit this # block which will ensure it doesn't receive the SIGTERM until the # handler is ready for it mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM]) try: - self.process = multiprocessing.Process(target=run) + self.process = multiprocessing.Process(target=run, args=(queue,)) self.process.start() + self.address = queue.get() + queue.close() + queue.join_thread() + return self.process finally: signal.pthread_sigmask(signal.SIG_SETMASK, mask) diff --git a/poky/bitbake/lib/bb/cooker.py b/poky/bitbake/lib/bb/cooker.py index b2d69c28c..db991702e 100644 --- a/poky/bitbake/lib/bb/cooker.py +++ b/poky/bitbake/lib/bb/cooker.py @@ -382,7 +382,7 @@ class BBCooker: try: self.prhost = prserv.serv.auto_start(self.data) except prserv.serv.PRServiceConfigError as e: - bb.fatal("Unable to start PR Server, exitting") + bb.fatal("Unable to start PR Server, exiting, check the bitbake-cookerdaemon.log") if self.data.getVar("BB_HASHSERVE") == "auto": # Create a new hash server bound to a unix domain socket diff --git a/poky/bitbake/lib/bb/fetch2/wget.py b/poky/bitbake/lib/bb/fetch2/wget.py index 29fcfbb3d..9a49e64a0 100644 --- a/poky/bitbake/lib/bb/fetch2/wget.py +++ b/poky/bitbake/lib/bb/fetch2/wget.py @@ -330,50 +330,51 @@ class Wget(FetchMethod): urllib.request.HTTPSHandler(context=context)] opener = urllib.request.build_opener(*handlers) - try: - uri = ud.url.split(";")[0] - r = urllib.request.Request(uri) - r.get_method = lambda: "HEAD" - # Some servers (FusionForge, as used on Alioth) require that the - # optional Accept header is set. - r.add_header("Accept", "*/*") - r.add_header("User-Agent", self.user_agent) - def add_basic_auth(login_str, request): - '''Adds Basic auth to http request, pass in login:password as string''' - import base64 - encodeuser = base64.b64encode(login_str.encode('utf-8')).decode("utf-8") - authheader = "Basic %s" % encodeuser - r.add_header("Authorization", authheader) - - if ud.user and ud.pswd: - add_basic_auth(ud.user + ':' + ud.pswd, r) - try: - import netrc - n = netrc.netrc() - login, unused, password = n.authenticators(urllib.parse.urlparse(uri).hostname) - add_basic_auth("%s:%s" % (login, password), r) - except (TypeError, ImportError, IOError, netrc.NetrcParseError): - pass - - with opener.open(r) as response: - pass - except urllib.error.URLError as e: - if try_again: - logger.debug2("checkstatus: trying again") - return self.checkstatus(fetch, ud, d, False) - else: - # debug for now to avoid spamming the logs in e.g. remote sstate searches - logger.debug2("checkstatus() urlopen failed: %s" % e) - return False - except ConnectionResetError as e: - if try_again: - logger.debug2("checkstatus: trying again") - return self.checkstatus(fetch, ud, d, False) - else: - # debug for now to avoid spamming the logs in e.g. remote sstate searches - logger.debug2("checkstatus() urlopen failed: %s" % e) - return False + uri = ud.url.split(";")[0] + r = urllib.request.Request(uri) + r.get_method = lambda: "HEAD" + # Some servers (FusionForge, as used on Alioth) require that the + # optional Accept header is set. + r.add_header("Accept", "*/*") + r.add_header("User-Agent", self.user_agent) + def add_basic_auth(login_str, request): + '''Adds Basic auth to http request, pass in login:password as string''' + import base64 + encodeuser = base64.b64encode(login_str.encode('utf-8')).decode("utf-8") + authheader = "Basic %s" % encodeuser + r.add_header("Authorization", authheader) + + if ud.user and ud.pswd: + add_basic_auth(ud.user + ':' + ud.pswd, r) + + try: + import netrc + n = netrc.netrc() + login, unused, password = n.authenticators(urllib.parse.urlparse(uri).hostname) + add_basic_auth("%s:%s" % (login, password), r) + except (TypeError, ImportError, IOError, netrc.NetrcParseError): + pass + + with opener.open(r) as response: + pass + except urllib.error.URLError as e: + if try_again: + logger.debug2("checkstatus: trying again") + return self.checkstatus(fetch, ud, d, False) + else: + # debug for now to avoid spamming the logs in e.g. remote sstate searches + logger.debug2("checkstatus() urlopen failed: %s" % e) + return False + except ConnectionResetError as e: + if try_again: + logger.debug2("checkstatus: trying again") + return self.checkstatus(fetch, ud, d, False) + else: + # debug for now to avoid spamming the logs in e.g. remote sstate searches + logger.debug2("checkstatus() urlopen failed: %s" % e) + return False + return True def _parse_path(self, regex, s): diff --git a/poky/bitbake/lib/bb/providers.py b/poky/bitbake/lib/bb/providers.py index 516d45e4a..8c1c31a5c 100644 --- a/poky/bitbake/lib/bb/providers.py +++ b/poky/bitbake/lib/bb/providers.py @@ -94,7 +94,7 @@ def versionVariableMatch(cfgData, keyword, pn): # pn can contain '_', e.g. gcc-cross-x86_64 and an override cannot # hence we do this manually rather than use OVERRIDES - ver = cfgData.getVar("%s_VERSION_pn-%s" % (keyword, pn)) + ver = cfgData.getVar("%s_VERSION:pn-%s" % (keyword, pn)) if not ver: ver = cfgData.getVar("%s_VERSION_%s" % (keyword, pn)) if not ver: diff --git a/poky/bitbake/lib/bb/server/process.py b/poky/bitbake/lib/bb/server/process.py index 6127fd40e..b593830cc 100644 --- a/poky/bitbake/lib/bb/server/process.py +++ b/poky/bitbake/lib/bb/server/process.py @@ -473,7 +473,7 @@ class BitBakeServer(object): try: r = ready.get() except EOFError: - # Trap the child exitting/closing the pipe and error out + # Trap the child exiting/closing the pipe and error out r = None if not r or r[0] != "r": ready.close() diff --git a/poky/bitbake/lib/bblayers/query.py b/poky/bitbake/lib/bblayers/query.py index 947422a72..6e94c8307 100644 --- a/poky/bitbake/lib/bblayers/query.py +++ b/poky/bitbake/lib/bblayers/query.py @@ -154,7 +154,7 @@ skipped recipes will also be listed, with a " (skipped)" suffix. def print_item(f, pn, ver, layer, ispref): if not selected_layer or layer == selected_layer: if not bare and f in skiplist: - skipped = ' (skipped)' + skipped = ' (skipped: %s)' % self.tinfoil.cooker.skiplist[f].skipreason else: skipped = '' if show_filenames: diff --git a/poky/bitbake/lib/hashserv/server.py b/poky/bitbake/lib/hashserv/server.py index 8e8498973..a059e5211 100644 --- a/poky/bitbake/lib/hashserv/server.py +++ b/poky/bitbake/lib/hashserv/server.py @@ -410,11 +410,11 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): class Server(bb.asyncrpc.AsyncServer): - def __init__(self, db, loop=None, upstream=None, read_only=False): + def __init__(self, db, upstream=None, read_only=False): if upstream and read_only: raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") - super().__init__(logger, loop) + super().__init__(logger) self.request_stats = Stats() self.db = db diff --git a/poky/bitbake/lib/prserv/client.py b/poky/bitbake/lib/prserv/client.py new file mode 100644 index 000000000..a3f19ddaf --- /dev/null +++ b/poky/bitbake/lib/prserv/client.py @@ -0,0 +1,48 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import logging +import bb.asyncrpc + +logger = logging.getLogger("BitBake.PRserv") + +class PRAsyncClient(bb.asyncrpc.AsyncClient): + def __init__(self): + super().__init__('PRSERVICE', '1.0', logger) + + async def getPR(self, version, pkgarch, checksum): + response = await self.send_message( + {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} + ) + if response: + return response['value'] + + async def importone(self, version, pkgarch, checksum, value): + response = await self.send_message( + {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} + ) + if response: + return response['value'] + + async def export(self, version, pkgarch, checksum, colinfo): + response = await self.send_message( + {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} + ) + if response: + return (response['metainfo'], response['datainfo']) + + async def is_readonly(self): + response = await self.send_message( + {'is-readonly': {}} + ) + if response: + return response['readonly'] + +class PRClient(bb.asyncrpc.Client): + def __init__(self): + super().__init__() + self._add_methods('getPR', 'importone', 'export', 'is_readonly') + + def _get_async_client(self): + return PRAsyncClient() diff --git a/poky/bitbake/lib/prserv/db.py b/poky/bitbake/lib/prserv/db.py index cb2a2461e..2710d4a22 100644 --- a/poky/bitbake/lib/prserv/db.py +++ b/poky/bitbake/lib/prserv/db.py @@ -30,21 +30,29 @@ if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3): # class PRTable(object): - def __init__(self, conn, table, nohist): + def __init__(self, conn, table, nohist, read_only): self.conn = conn self.nohist = nohist + self.read_only = read_only self.dirty = False if nohist: self.table = "%s_nohist" % table else: self.table = "%s_hist" % table - self._execute("CREATE TABLE IF NOT EXISTS %s \ - (version TEXT NOT NULL, \ - pkgarch TEXT NOT NULL, \ - checksum TEXT NOT NULL, \ - value INTEGER, \ - PRIMARY KEY (version, pkgarch, checksum));" % self.table) + if self.read_only: + table_exists = self._execute( + "SELECT count(*) FROM sqlite_master \ + WHERE type='table' AND name='%s'" % (self.table)) + if not table_exists: + raise prserv.NotFoundError + else: + self._execute("CREATE TABLE IF NOT EXISTS %s \ + (version TEXT NOT NULL, \ + pkgarch TEXT NOT NULL, \ + checksum TEXT NOT NULL, \ + value INTEGER, \ + PRIMARY KEY (version, pkgarch, checksum));" % self.table) def _execute(self, *query): """Execute a query, waiting to acquire a lock if necessary""" @@ -59,8 +67,9 @@ class PRTable(object): raise exc def sync(self): - self.conn.commit() - self._execute("BEGIN EXCLUSIVE TRANSACTION") + if not self.read_only: + self.conn.commit() + self._execute("BEGIN EXCLUSIVE TRANSACTION") def sync_if_dirty(self): if self.dirty: @@ -75,6 +84,15 @@ class PRTable(object): return row[0] else: #no value found, try to insert + if self.read_only: + data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table), + (version, pkgarch)) + row = data.fetchone() + if row is not None: + return row[0] + else: + return 0 + try: self._execute("INSERT INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" % (self.table,self.table), @@ -103,6 +121,15 @@ class PRTable(object): return row[0] else: #no value found, try to insert + if self.read_only: + data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table), + (version, pkgarch)) + row = data.fetchone() + if row is not None: + return row[0] + else: + return 0 + try: self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" % (self.table,self.table), @@ -128,6 +155,9 @@ class PRTable(object): return self._getValueHist(version, pkgarch, checksum) def _importHist(self, version, pkgarch, checksum, value): + if self.read_only: + return None + val = None data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, (version, pkgarch, checksum)) @@ -152,6 +182,9 @@ class PRTable(object): return val def _importNohist(self, version, pkgarch, checksum, value): + if self.read_only: + return None + try: #try to insert self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), @@ -245,19 +278,23 @@ class PRTable(object): class PRData(object): """Object representing the PR database""" - def __init__(self, filename, nohist=True): + def __init__(self, filename, nohist=True, read_only=False): self.filename=os.path.abspath(filename) self.nohist=nohist + self.read_only = read_only #build directory hierarchy try: os.makedirs(os.path.dirname(self.filename)) except OSError as e: if e.errno != errno.EEXIST: raise e - self.connection=sqlite3.connect(self.filename, isolation_level="EXCLUSIVE", check_same_thread = False) + uri = "file:%s%s" % (self.filename, "?mode=ro" if self.read_only else "") + logger.debug("Opening PRServ database '%s'" % (uri)) + self.connection=sqlite3.connect(uri, uri=True, isolation_level="EXCLUSIVE", check_same_thread = False) self.connection.row_factory=sqlite3.Row - self.connection.execute("pragma synchronous = off;") - self.connection.execute("PRAGMA journal_mode = MEMORY;") + if not self.read_only: + self.connection.execute("pragma synchronous = off;") + self.connection.execute("PRAGMA journal_mode = MEMORY;") self._tables={} def disconnect(self): @@ -270,7 +307,7 @@ class PRData(object): if tblname in self._tables: return self._tables[tblname] else: - tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist) + tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist, self.read_only) return tableobj def __delitem__(self, tblname): diff --git a/poky/bitbake/lib/prserv/serv.py b/poky/bitbake/lib/prserv/serv.py index 5e322bf83..0a20b927c 100644 --- a/poky/bitbake/lib/prserv/serv.py +++ b/poky/bitbake/lib/prserv/serv.py @@ -4,157 +4,135 @@ import os,sys,logging import signal, time -from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler import socket import io import sqlite3 -import bb.server.xmlrpcclient import prserv import prserv.db import errno -import multiprocessing +import bb.asyncrpc logger = logging.getLogger("BitBake.PRserv") -class Handler(SimpleXMLRPCRequestHandler): - def _dispatch(self,method,params): - try: - value=self.server.funcs[method](*params) - except: - import traceback - traceback.print_exc() - raise - return value - PIDPREFIX = "/tmp/PRServer_%s_%s.pid" singleton = None - -class PRServer(SimpleXMLRPCServer): - def __init__(self, dbfile, logfile, interface): - ''' constructor ''' +class PRServerClient(bb.asyncrpc.AsyncServerConnection): + def __init__(self, reader, writer, table, read_only): + super().__init__(reader, writer, 'PRSERVICE', logger) + self.handlers.update({ + 'get-pr': self.handle_get_pr, + 'import-one': self.handle_import_one, + 'export': self.handle_export, + 'is-readonly': self.handle_is_readonly, + }) + self.table = table + self.read_only = read_only + + def validate_proto_version(self): + return (self.proto_version == (1, 0)) + + async def dispatch_message(self, msg): try: - SimpleXMLRPCServer.__init__(self, interface, - logRequests=False, allow_none=True) - except socket.error: - ip=socket.gethostbyname(interface[0]) - port=interface[1] - msg="PR Server unable to bind to %s:%s\n" % (ip, port) - sys.stderr.write(msg) - raise PRServiceConfigError - - self.dbfile=dbfile - self.logfile=logfile - self.host, self.port = self.socket.getsockname() - - self.register_function(self.getPR, "getPR") - self.register_function(self.ping, "ping") - self.register_function(self.export, "export") - self.register_function(self.importone, "importone") - self.register_introspection_functions() - - self.iter_count = 0 - # 60 iterations between syncs or sync if dirty every ~30 seconds - self.iterations_between_sync = 60 - - def sigint_handler(self, signum, stack): - if self.table: - self.table.sync() - - def sigterm_handler(self, signum, stack): - if self.table: - self.table.sync() - raise(SystemExit) - - def process_request(self, request, client_address): - if request is None: - return - try: - self.finish_request(request, client_address) - self.shutdown_request(request) - self.iter_count = (self.iter_count + 1) % self.iterations_between_sync - if self.iter_count == 0: - self.table.sync_if_dirty() + await super().dispatch_message(msg) except: - self.handle_error(request, client_address) - self.shutdown_request(request) self.table.sync() - self.table.sync_if_dirty() + raise - def serve_forever(self, poll_interval=0.5): - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) + self.table.sync_if_dirty() - self.db = prserv.db.PRData(self.dbfile) - self.table = self.db["PRMAIN"] - return super().serve_forever(poll_interval) + async def handle_get_pr(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] - def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): + response = None try: - return self.table.export(version, pkgarch, checksum, colinfo) + value = self.table.getValue(version, pkgarch, checksum) + response = {'value': value} + except prserv.NotFoundError: + logger.error("can not find value for (%s, %s)",version, checksum) except sqlite3.Error as exc: logger.error(str(exc)) - return None - def importone(self, version, pkgarch, checksum, value): - return self.table.importone(version, pkgarch, checksum, value) + self.write_message(response) - def ping(self): - return True + async def handle_import_one(self, request): + response = None + if not self.read_only: + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + value = request['value'] - def getinfo(self): - return (self.host, self.port) + value = self.table.importone(version, pkgarch, checksum, value) + if value is not None: + response = {'value': value} + + self.write_message(response) + + async def handle_export(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + colinfo = request['colinfo'] - def getPR(self, version, pkgarch, checksum): try: - return self.table.getValue(version, pkgarch, checksum) - except prserv.NotFoundError: - logger.error("can not find value for (%s, %s)",version, checksum) - return None + (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) except sqlite3.Error as exc: logger.error(str(exc)) - return None + metainfo = datainfo = None -class PRServSingleton(object): - def __init__(self, dbfile, logfile, interface): + response = {'metainfo': metainfo, 'datainfo': datainfo} + self.write_message(response) + + async def handle_is_readonly(self, request): + response = {'readonly': self.read_only} + self.write_message(response) + +class PRServer(bb.asyncrpc.AsyncServer): + def __init__(self, dbfile, read_only=False): + super().__init__(logger) self.dbfile = dbfile - self.logfile = logfile - self.interface = interface - self.host = None - self.port = None + self.table = None + self.read_only = read_only - def start(self): - self.prserv = PRServer(self.dbfile, self.logfile, self.interface) - self.process = multiprocessing.Process(target=self.prserv.serve_forever) - self.process.start() + def accept_client(self, reader, writer): + return PRServerClient(reader, writer, self.table, self.read_only) - self.host, self.port = self.prserv.getinfo() + def _serve_forever(self): + self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) + self.table = self.db["PRMAIN"] - def getinfo(self): - return (self.host, self.port) + logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % + (self.dbfile, self.address, str(os.getpid()))) -class PRServerConnection(object): - def __init__(self, host, port): - if is_local_special(host, port): - host, port = singleton.getinfo() - self.host = host - self.port = port - self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) + super()._serve_forever() - def getPR(self, version, pkgarch, checksum): - return self.connection.getPR(version, pkgarch, checksum) + self.table.sync_if_dirty() + self.db.disconnect() - def ping(self): - return self.connection.ping() + def signal_handler(self): + super().signal_handler() + if self.table: + self.table.sync() - def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): - return self.connection.export(version, pkgarch, checksum, colinfo) +class PRServSingleton(object): + def __init__(self, dbfile, logfile, host, port): + self.dbfile = dbfile + self.logfile = logfile + self.host = host + self.port = port - def importone(self, version, pkgarch, checksum, value): - return self.connection.importone(version, pkgarch, checksum, value) + def start(self): + self.prserv = PRServer(self.dbfile) + self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) + self.process = self.prserv.serve_as_process() - def getinfo(self): - return self.host, self.port + if not self.prserv.address: + raise PRServiceConfigError + if not self.port: + self.port = int(self.prserv.address.rsplit(':', 1)[1]) def run_as_daemon(func, pidfile, logfile): """ @@ -226,7 +204,7 @@ def run_as_daemon(func, pidfile, logfile): os.remove(pidfile) os._exit(0) -def start_daemon(dbfile, host, port, logfile): +def start_daemon(dbfile, host, port, logfile, read_only=False): ip = socket.gethostbyname(host) pidfile = PIDPREFIX % (ip, port) try: @@ -240,15 +218,13 @@ def start_daemon(dbfile, host, port, logfile): % pidfile) return 1 - server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) - run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) + dbfile = os.path.abspath(dbfile) + def daemon_main(): + server = PRServer(dbfile, read_only=read_only) + server.start_tcp_server(ip, port) + server.serve_forever() - # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with - # the one the server actually is listening, so at least warn the user about it - _,rport = server.getinfo() - if port != rport: - sys.stdout.write("Server is listening at port %s instead of %s\n" - % (rport,port)) + run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) return 0 def stop_daemon(host, port): @@ -302,7 +278,7 @@ def is_running(pid): return True def is_local_special(host, port): - if host.strip().upper() == 'localhost'.upper() and (not port): + if (host == 'localhost' or host == '127.0.0.1') and not port: return True else: return False @@ -326,7 +302,9 @@ def auto_start(d): 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) raise PRServiceConfigError - if is_local_special(host_params[0], int(host_params[1])): + host = host_params[0].strip().lower() + port = int(host_params[1]) + if is_local_special(host, port): import bb.utils cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) if not cachedir: @@ -340,20 +318,16 @@ def auto_start(d): auto_shutdown() if not singleton: bb.utils.mkdirhier(cachedir) - singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) + singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) singleton.start() if singleton: - host, port = singleton.getinfo() - else: - host = host_params[0] - port = int(host_params[1]) + host = singleton.host + port = singleton.port try: - connection = PRServerConnection(host,port) - connection.ping() - realhost, realport = connection.getinfo() - return str(realhost) + ":" + str(realport) - + ping(host, port) + return str(host) + ":" + str(port) + except Exception: logger.critical("PRservice %s:%d not available" % (host, port)) raise PRServiceConfigError @@ -366,8 +340,21 @@ def auto_shutdown(): singleton = None def ping(host, port): - conn=PRServerConnection(host, port) + from . import client + + conn = client.PRClient() + conn.connect_tcp(host, port) return conn.ping() def connect(host, port): - return PRServerConnection(host, port) + from . import client + + global singleton + + if host.strip().lower() == 'localhost' and not port: + host = 'localhost' + port = singleton.port + + conn = client.PRClient() + conn.connect_tcp(host, port) + return conn |