diff options
Diffstat (limited to 'poky/bitbake/lib')
-rw-r--r-- | poky/bitbake/lib/bb/__init__.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/__init__.py | 31 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/client.py | 145 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/serv.py | 218 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/command.py | 12 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/git.py | 8 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/s3.py | 41 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/svn.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/progress.py | 7 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/runqueue.py | 24 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/server/process.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/siggen.py | 8 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/fetch.py | 90 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tinfoil.py | 4 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/ui/knotty.py | 8 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/utils.py | 18 | ||||
-rw-r--r-- | poky/bitbake/lib/bblayers/layerindex.py | 16 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/client.py | 147 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/server.py | 210 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/tests.py | 3 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/serv.py | 367 |
21 files changed, 740 insertions, 623 deletions
diff --git a/poky/bitbake/lib/bb/__init__.py b/poky/bitbake/lib/bb/__init__.py index 9f61dae14..a144bd609 100644 --- a/poky/bitbake/lib/bb/__init__.py +++ b/poky/bitbake/lib/bb/__init__.py @@ -9,7 +9,7 @@ # SPDX-License-Identifier: GPL-2.0-only # -__version__ = "1.50.0" +__version__ = "1.51.0" import sys if sys.version_info < (3, 5, 0): diff --git a/poky/bitbake/lib/bb/asyncrpc/__init__.py b/poky/bitbake/lib/bb/asyncrpc/__init__.py new file mode 100644 index 000000000..b2bec31ab --- /dev/null +++ b/poky/bitbake/lib/bb/asyncrpc/__init__.py @@ -0,0 +1,31 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import itertools +import json + +# The Python async server defaults to a 64K receive buffer, so we hardcode our +# maximum chunk size. It would be better if the client and server reported to +# each other what the maximum chunk sizes were, but that will slow down the +# connection setup with a round trip delay so I'd rather not do that unless it +# is necessary +DEFAULT_MAX_CHUNK = 32 * 1024 + + +def chunkify(msg, max_chunk): + if len(msg) < max_chunk - 1: + yield ''.join((msg, "\n")) + else: + yield ''.join((json.dumps({ + 'chunk-stream': None + }), "\n")) + + args = [iter(msg)] * (max_chunk - 1) + for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): + yield ''.join(itertools.chain(m, "\n")) + yield "\n" + + +from .client import AsyncClient, Client +from .serv import AsyncServer, AsyncServerConnection diff --git a/poky/bitbake/lib/bb/asyncrpc/client.py b/poky/bitbake/lib/bb/asyncrpc/client.py new file mode 100644 index 000000000..4cdad9ac3 --- /dev/null +++ b/poky/bitbake/lib/bb/asyncrpc/client.py @@ -0,0 +1,145 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import abc +import asyncio +import json +import os +import socket +from . import chunkify, DEFAULT_MAX_CHUNK + + +class AsyncClient(object): + def __init__(self, proto_name, proto_version, logger): + self.reader = None + self.writer = None + self.max_chunk = DEFAULT_MAX_CHUNK + self.proto_name = proto_name + self.proto_version = proto_version + self.logger = logger + + async def connect_tcp(self, address, port): + async def connect_sock(): + return await asyncio.open_connection(address, port) + + self._connect_sock = connect_sock + + async def connect_unix(self, path): + async def connect_sock(): + return await asyncio.open_unix_connection(path) + + self._connect_sock = connect_sock + + async def setup_connection(self): + s = '%s %s\n\n' % (self.proto_name, self.proto_version) + self.writer.write(s.encode("utf-8")) + await self.writer.drain() + + async def connect(self): + if self.reader is None or self.writer is None: + (self.reader, self.writer) = await self._connect_sock() + await self.setup_connection() + + async def close(self): + self.reader = None + + if self.writer is not None: + self.writer.close() + self.writer = None + + async def _send_wrapper(self, proc): + count = 0 + while True: + try: + await self.connect() + return await proc() + except ( + OSError, + ConnectionError, + json.JSONDecodeError, + UnicodeDecodeError, + ) as e: + self.logger.warning("Error talking to server: %s" % e) + if count >= 3: + if not isinstance(e, ConnectionError): + raise ConnectionError(str(e)) + raise e + await self.close() + count += 1 + + async def send_message(self, msg): + async def get_line(): + line = await self.reader.readline() + if not line: + raise ConnectionError("Connection closed") + + line = line.decode("utf-8") + + if not line.endswith("\n"): + raise ConnectionError("Bad message %r" % msg) + + return line + + async def proc(): + for c in chunkify(json.dumps(msg), self.max_chunk): + self.writer.write(c.encode("utf-8")) + await self.writer.drain() + + l = await get_line() + + m = json.loads(l) + if m and "chunk-stream" in m: + lines = [] + while True: + l = (await get_line()).rstrip("\n") + if not l: + break + lines.append(l) + + m = json.loads("".join(lines)) + + return m + + return await self._send_wrapper(proc) + + +class Client(object): + def __init__(self): + self.client = self._get_async_client() + self.loop = asyncio.new_event_loop() + + self._add_methods('connect_tcp', 'close') + + @abc.abstractmethod + def _get_async_client(self): + pass + + def _get_downcall_wrapper(self, downcall): + def wrapper(*args, **kwargs): + return self.loop.run_until_complete(downcall(*args, **kwargs)) + + return wrapper + + def _add_methods(self, *methods): + for m in methods: + downcall = getattr(self.client, m) + setattr(self, m, self._get_downcall_wrapper(downcall)) + + def connect_unix(self, path): + # AF_UNIX has path length issues so chdir here to workaround + cwd = os.getcwd() + try: + os.chdir(os.path.dirname(path)) + self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) + self.loop.run_until_complete(self.client.connect()) + finally: + os.chdir(cwd) + + @property + def max_chunk(self): + return self.client.max_chunk + + @max_chunk.setter + def max_chunk(self, value): + self.client.max_chunk = value diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py new file mode 100644 index 000000000..cb3384639 --- /dev/null +++ b/poky/bitbake/lib/bb/asyncrpc/serv.py @@ -0,0 +1,218 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import abc +import asyncio +import json +import os +import signal +import socket +import sys +from . import chunkify, DEFAULT_MAX_CHUNK + + +class ClientError(Exception): + pass + + +class ServerError(Exception): + pass + + +class AsyncServerConnection(object): + def __init__(self, reader, writer, proto_name, logger): + self.reader = reader + self.writer = writer + self.proto_name = proto_name + self.max_chunk = DEFAULT_MAX_CHUNK + self.handlers = { + 'chunk-stream': self.handle_chunk, + } + self.logger = logger + + async def process_requests(self): + try: + self.addr = self.writer.get_extra_info('peername') + self.logger.debug('Client %r connected' % (self.addr,)) + + # Read protocol and version + client_protocol = await self.reader.readline() + if client_protocol is None: + return + + (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() + if client_proto_name != self.proto_name: + self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) + return + + self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) + if not self.validate_proto_version(): + self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) + return + + # Read headers. Currently, no headers are implemented, so look for + # an empty line to signal the end of the headers + while True: + line = await self.reader.readline() + if line is None: + return + + line = line.decode('utf-8').rstrip() + if not line: + break + + # Handle messages + while True: + d = await self.read_message() + if d is None: + break + await self.dispatch_message(d) + await self.writer.drain() + except ClientError as e: + self.logger.error(str(e)) + finally: + self.writer.close() + + async def dispatch_message(self, msg): + for k in self.handlers.keys(): + if k in msg: + self.logger.debug('Handling %s' % k) + await self.handlers[k](msg[k]) + return + + raise ClientError("Unrecognized command %r" % msg) + + def write_message(self, msg): + for c in chunkify(json.dumps(msg), self.max_chunk): + self.writer.write(c.encode('utf-8')) + + async def read_message(self): + l = await self.reader.readline() + if not l: + return None + + try: + message = l.decode('utf-8') + + if not message.endswith('\n'): + return None + + return json.loads(message) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self.logger.error('Bad message from client: %r' % message) + raise e + + async def handle_chunk(self, request): + lines = [] + try: + while True: + l = await self.reader.readline() + l = l.rstrip(b"\n").decode("utf-8") + if not l: + break + lines.append(l) + + msg = json.loads(''.join(lines)) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + self.logger.error('Bad message from client: %r' % lines) + raise e + + if 'chunk-stream' in msg: + raise ClientError("Nested chunks are not allowed") + + await self.dispatch_message(msg) + + +class AsyncServer(object): + def __init__(self, logger, loop=None): + if loop is None: + self.loop = asyncio.new_event_loop() + self.close_loop = True + else: + self.loop = loop + self.close_loop = False + + self._cleanup_socket = None + self.logger = logger + + def start_tcp_server(self, host, port): + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, host, port, loop=self.loop) + ) + + for s in self.server.sockets: + self.logger.info('Listening on %r' % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + def start_unix_server(self, path): + def cleanup(): + os.unlink(path) + + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(path)) + self.server = self.loop.run_until_complete( + asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) + ) + finally: + os.chdir(cwd) + + self.logger.info('Listening on %r' % path) + + self._cleanup_socket = cleanup + self.address = "unix://%s" % os.path.abspath(path) + + @abc.abstractmethod + def accept_client(self, reader, writer): + pass + + async def handle_client(self, reader, writer): + # writer.transport.set_write_buffer_limits(0) + try: + client = self.accept_client(reader, writer) + await client.process_requests() + except Exception as e: + import traceback + self.logger.error('Error from client: %s' % str(e), exc_info=True) + traceback.print_exc() + writer.close() + self.logger.info('Client disconnected') + + def run_loop_forever(self): + try: + self.loop.run_forever() + except KeyboardInterrupt: + pass + + def signal_handler(self): + self.loop.stop() + + def serve_forever(self): + asyncio.set_event_loop(self.loop) + try: + self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + + self.run_loop_forever() + self.server.close() + + self.loop.run_until_complete(self.server.wait_closed()) + self.logger.info('Server shutting down') + finally: + if self.close_loop: + if sys.version_info >= (3, 6): + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + + if self._cleanup_socket is not None: + self._cleanup_socket() diff --git a/poky/bitbake/lib/bb/command.py b/poky/bitbake/lib/bb/command.py index dd77cdd6e..f530cf844 100644 --- a/poky/bitbake/lib/bb/command.py +++ b/poky/bitbake/lib/bb/command.py @@ -20,6 +20,7 @@ Commands are queued in a CommandQueue from collections import OrderedDict, defaultdict +import io import bb.event import bb.cooker import bb.remotedata @@ -500,6 +501,17 @@ class CommandsSync: d = command.remotedatastores[dsindex].varhistory return getattr(d, method)(*args, **kwargs) + def dataStoreConnectorVarHistCmdEmit(self, command, params): + dsindex = params[0] + var = params[1] + oval = params[2] + val = params[3] + d = command.remotedatastores[params[4]] + + o = io.StringIO() + command.remotedatastores[dsindex].varhistory.emit(var, oval, val, o, d) + return o.getvalue() + def dataStoreConnectorIncHistCmd(self, command, params): dsindex = params[0] method = params[1] diff --git a/poky/bitbake/lib/bb/fetch2/git.py b/poky/bitbake/lib/bb/fetch2/git.py index e3ba80a3f..5e65c83c6 100644 --- a/poky/bitbake/lib/bb/fetch2/git.py +++ b/poky/bitbake/lib/bb/fetch2/git.py @@ -168,7 +168,11 @@ class Git(FetchMethod): if len(branches) != len(ud.names): raise bb.fetch2.ParameterError("The number of name and branch parameters is not balanced", ud.url) - ud.cloneflags = "-s -n" + ud.noshared = d.getVar("BB_GIT_NOSHARED") == "1" + + ud.cloneflags = "-n" + if not ud.noshared: + ud.cloneflags += " -s" if ud.bareclone: ud.cloneflags += " --mirror" @@ -394,7 +398,7 @@ class Git(FetchMethod): tmpdir = tempfile.mkdtemp(dir=d.getVar('DL_DIR')) try: # Do the checkout. This implicitly involves a Git LFS fetch. - self.unpack(ud, tmpdir, d) + Git.unpack(self, ud, tmpdir, d) # Scoop up a copy of any stuff that Git LFS downloaded. Merge them into # the bare clonedir. diff --git a/poky/bitbake/lib/bb/fetch2/s3.py b/poky/bitbake/lib/bb/fetch2/s3.py index ffca73c8e..6b8ffd535 100644 --- a/poky/bitbake/lib/bb/fetch2/s3.py +++ b/poky/bitbake/lib/bb/fetch2/s3.py @@ -18,10 +18,47 @@ The aws tool must be correctly installed and configured prior to use. import os import bb import urllib.request, urllib.parse, urllib.error +import re from bb.fetch2 import FetchMethod from bb.fetch2 import FetchError from bb.fetch2 import runfetchcmd +def convertToBytes(value, unit): + value = float(value) + if (unit == "KiB"): + value = value*1024.0; + elif (unit == "MiB"): + value = value*1024.0*1024.0; + elif (unit == "GiB"): + value = value*1024.0*1024.0*1024.0; + return value + +class S3ProgressHandler(bb.progress.LineFilterProgressHandler): + """ + Extract progress information from s3 cp output, e.g.: + Completed 5.1 KiB/8.8 GiB (12.0 MiB/s) with 1 file(s) remaining + """ + def __init__(self, d): + super(S3ProgressHandler, self).__init__(d) + # Send an initial progress event so the bar gets shown + self._fire_progress(0) + + def writeline(self, line): + percs = re.findall(r'^Completed (\d+.{0,1}\d*) (\w+)\/(\d+.{0,1}\d*) (\w+) (\(.+\)) with\s+', line) + if percs: + completed = (percs[-1][0]) + completedUnit = (percs[-1][1]) + total = (percs[-1][2]) + totalUnit = (percs[-1][3]) + completed = convertToBytes(completed, completedUnit) + total = convertToBytes(total, totalUnit) + progress = (completed/total)*100.0 + rate = percs[-1][4] + self.update(progress, rate) + return False + return True + + class S3(FetchMethod): """Class to fetch urls via 'aws s3'""" @@ -52,7 +89,9 @@ class S3(FetchMethod): cmd = '%s cp s3://%s%s %s' % (ud.basecmd, ud.host, ud.path, ud.localpath) bb.fetch2.check_network_access(d, cmd, ud.url) - runfetchcmd(cmd, d) + + progresshandler = S3ProgressHandler(d) + runfetchcmd(cmd, d, False, log=progresshandler) # Additional sanity checks copied from the wget class (although there # are no known issues which mean these are required, treat the aws cli diff --git a/poky/bitbake/lib/bb/fetch2/svn.py b/poky/bitbake/lib/bb/fetch2/svn.py index 8856ef1c6..80102b44f 100644 --- a/poky/bitbake/lib/bb/fetch2/svn.py +++ b/poky/bitbake/lib/bb/fetch2/svn.py @@ -86,7 +86,7 @@ class Svn(FetchMethod): if command == "info": svncmd = "%s info %s %s://%s/%s/" % (ud.basecmd, " ".join(options), proto, svnroot, ud.module) elif command == "log1": - svncmd = "%s log --limit 1 %s %s://%s/%s/" % (ud.basecmd, " ".join(options), proto, svnroot, ud.module) + svncmd = "%s log --limit 1 --quiet %s %s://%s/%s/" % (ud.basecmd, " ".join(options), proto, svnroot, ud.module) else: suffix = "" diff --git a/poky/bitbake/lib/bb/progress.py b/poky/bitbake/lib/bb/progress.py index d051ba019..52d704d64 100644 --- a/poky/bitbake/lib/bb/progress.py +++ b/poky/bitbake/lib/bb/progress.py @@ -94,12 +94,15 @@ class LineFilterProgressHandler(ProgressHandler): while True: breakpos = self._linebuffer.find('\n') + 1 if breakpos == 0: - break + # for the case when the line with progress ends with only '\r' + breakpos = self._linebuffer.find('\r') + 1 + if breakpos == 0: + break line = self._linebuffer[:breakpos] self._linebuffer = self._linebuffer[breakpos:] # Drop any line feeds and anything that precedes them lbreakpos = line.rfind('\r') + 1 - if lbreakpos: + if lbreakpos and lbreakpos != breakpos: line = line[lbreakpos:] if self.writeline(filter_color(line)): super().write(line) diff --git a/poky/bitbake/lib/bb/runqueue.py b/poky/bitbake/lib/bb/runqueue.py index cd56a5547..6c41fe6d4 100644 --- a/poky/bitbake/lib/bb/runqueue.py +++ b/poky/bitbake/lib/bb/runqueue.py @@ -2030,8 +2030,6 @@ class RunQueueExecute: logger.debug("%s didn't become valid, skipping setscene" % nexttask) self.sq_task_failoutright(nexttask) return True - else: - self.sqdata.outrightfail.remove(nexttask) if nexttask in self.sqdata.outrightfail: logger.debug2('No package found, so skipping setscene task %s', nexttask) self.sq_task_failoutright(nexttask) @@ -2296,10 +2294,16 @@ class RunQueueExecute: self.updated_taskhash_queue.remove((tid, unihash)) if unihash != self.rqdata.runtaskentries[tid].unihash: - hashequiv_logger.verbose("Task %s unihash changed to %s" % (tid, unihash)) - self.rqdata.runtaskentries[tid].unihash = unihash - bb.parse.siggen.set_unihash(tid, unihash) - toprocess.add(tid) + # Make sure we rehash any other tasks with the same task hash that we're deferred against. + torehash = [tid] + for deftid in self.sq_deferred: + if self.sq_deferred[deftid] == tid: + torehash.append(deftid) + for hashtid in torehash: + hashequiv_logger.verbose("Task %s unihash changed to %s" % (hashtid, unihash)) + self.rqdata.runtaskentries[hashtid].unihash = unihash + bb.parse.siggen.set_unihash(hashtid, unihash) + toprocess.add(hashtid) # Work out all tasks which depend upon these total = set() @@ -2827,6 +2831,8 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s sqdata.stamppresent.remove(tid) if tid in sqdata.valid: sqdata.valid.remove(tid) + if tid in sqdata.outrightfail: + sqdata.outrightfail.remove(tid) noexec, stamppresent = check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=True) @@ -2845,6 +2851,7 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s sqdata.valid |= rq.validate_hashes(tocheck, cooker.data, len(sqdata.stamppresent), False, summary=summary) sqdata.hashes = {} + sqrq.sq_deferred = {} for mc in sorted(sqdata.multiconfigs): for tid in sorted(sqdata.sq_revdeps): if mc_from_tid(tid) != mc: @@ -2857,10 +2864,13 @@ def update_scenequeue_data(tids, sqdata, rqdata, rq, cooker, stampcache, sqrq, s continue if tid in sqrq.scenequeue_notcovered: continue - sqdata.outrightfail.add(tid) + if tid in sqrq.scenequeue_covered: + continue h = pending_hash_index(tid, rqdata) if h not in sqdata.hashes: + if tid in tids: + sqdata.outrightfail.add(tid) sqdata.hashes[h] = tid else: sqrq.sq_deferred[tid] = sqdata.hashes[h] diff --git a/poky/bitbake/lib/bb/server/process.py b/poky/bitbake/lib/bb/server/process.py index b27b4aefe..3e99bcef8 100644 --- a/poky/bitbake/lib/bb/server/process.py +++ b/poky/bitbake/lib/bb/server/process.py @@ -509,7 +509,7 @@ class BitBakeServer(object): os.set_inheritable(self.bitbake_lock.fileno(), True) os.set_inheritable(self.readypipein, True) serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") - os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) + os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): diff --git a/poky/bitbake/lib/bb/siggen.py b/poky/bitbake/lib/bb/siggen.py index 0d88c6ec6..07692e673 100644 --- a/poky/bitbake/lib/bb/siggen.py +++ b/poky/bitbake/lib/bb/siggen.py @@ -402,7 +402,7 @@ class SignatureGeneratorBasic(SignatureGenerator): p = pickle.dump(data, stream, -1) stream.flush() os.chmod(tmpfile, 0o664) - os.rename(tmpfile, sigfile) + bb.utils.rename(tmpfile, sigfile) except (OSError, IOError) as err: try: os.unlink(tmpfile) @@ -542,7 +542,7 @@ class SignatureGeneratorUniHashMixIn(object): hashequiv_logger.debug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: hashequiv_logger.debug2('No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except hashserv.client.HashConnectionError as e: + except ConnectionError as e: bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) self.set_unihash(tid, unihash) @@ -621,7 +621,7 @@ class SignatureGeneratorUniHashMixIn(object): d.setVar('BB_UNIHASH', new_unihash) else: hashequiv_logger.debug('Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server)) - except hashserv.client.HashConnectionError as e: + except ConnectionError as e: bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) finally: if sigfile: @@ -661,7 +661,7 @@ class SignatureGeneratorUniHashMixIn(object): # TODO: What to do here? hashequiv_logger.verbose('Task %s unihash reported as unwanted hash %s' % (tid, finalunihash)) - except hashserv.client.HashConnectionError as e: + except ConnectionError as e: bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) return False diff --git a/poky/bitbake/lib/bb/tests/fetch.py b/poky/bitbake/lib/bb/tests/fetch.py index ddf6e9743..9291ce4a0 100644 --- a/poky/bitbake/lib/bb/tests/fetch.py +++ b/poky/bitbake/lib/bb/tests/fetch.py @@ -390,6 +390,7 @@ class FetcherTest(unittest.TestCase): if os.environ.get("BB_TMPDIR_NOCLEAN") == "yes": print("Not cleaning up %s. Please remove manually." % self.tempdir) else: + bb.process.run('chmod u+rw -R %s' % self.tempdir) bb.utils.prunedir(self.tempdir) class MirrorUriTest(FetcherTest): @@ -673,12 +674,14 @@ class FetcherLocalTest(FetcherTest): with self.assertRaises(bb.fetch2.UnpackError): self.fetchUnpack(['file://a;subdir=/bin/sh']) - def test_local_gitfetch_usehead(self): + def dummyGitTest(self, suffix): # Create dummy local Git repo src_dir = tempfile.mkdtemp(dir=self.tempdir, prefix='gitfetch_localusehead_') src_dir = os.path.abspath(src_dir) bb.process.run("git init", cwd=src_dir) + bb.process.run("git config user.email 'you@example.com'", cwd=src_dir) + bb.process.run("git config user.name 'Your Name'", cwd=src_dir) bb.process.run("git commit --allow-empty -m'Dummy commit'", cwd=src_dir) # Use other branch than master @@ -690,7 +693,7 @@ class FetcherLocalTest(FetcherTest): # Fetch and check revision self.d.setVar("SRCREV", "AUTOINC") - url = "git://" + src_dir + ";protocol=file;usehead=1" + url = "git://" + src_dir + ";protocol=file;" + suffix fetcher = bb.fetch.Fetch([url], self.d) fetcher.download() fetcher.unpack(self.unpackdir) @@ -699,31 +702,23 @@ class FetcherLocalTest(FetcherTest): unpack_rev = stdout[0].strip() self.assertEqual(orig_rev, unpack_rev) + def test_local_gitfetch_usehead(self): + self.dummyGitTest("usehead=1") + def test_local_gitfetch_usehead_withname(self): - # Create dummy local Git repo - src_dir = tempfile.mkdtemp(dir=self.tempdir, - prefix='gitfetch_localusehead_') - src_dir = os.path.abspath(src_dir) - bb.process.run("git init", cwd=src_dir) - bb.process.run("git commit --allow-empty -m'Dummy commit'", - cwd=src_dir) - # Use other branch than master - bb.process.run("git checkout -b my-devel", cwd=src_dir) - bb.process.run("git commit --allow-empty -m'Dummy commit 2'", - cwd=src_dir) - stdout = bb.process.run("git rev-parse HEAD", cwd=src_dir) - orig_rev = stdout[0].strip() + self.dummyGitTest("usehead=1;name=newName") - # Fetch and check revision - self.d.setVar("SRCREV", "AUTOINC") - url = "git://" + src_dir + ";protocol=file;usehead=1;name=newName" - fetcher = bb.fetch.Fetch([url], self.d) - fetcher.download() - fetcher.unpack(self.unpackdir) - stdout = bb.process.run("git rev-parse HEAD", - cwd=os.path.join(self.unpackdir, 'git')) - unpack_rev = stdout[0].strip() - self.assertEqual(orig_rev, unpack_rev) + def test_local_gitfetch_shared(self): + self.dummyGitTest("usehead=1;name=sharedName") + alt = os.path.join(self.unpackdir, 'git/.git/objects/info/alternates') + self.assertTrue(os.path.exists(alt)) + + def test_local_gitfetch_noshared(self): + self.d.setVar('BB_GIT_NOSHARED', '1') + self.unpackdir += '_noshared' + self.dummyGitTest("usehead=1;name=noSharedName") + alt = os.path.join(self.unpackdir, 'git/.git/objects/info/alternates') + self.assertFalse(os.path.exists(alt)) class FetcherNoNetworkTest(FetcherTest): def setUp(self): @@ -1390,6 +1385,8 @@ class GitMakeShallowTest(FetcherTest): self.gitdir = os.path.join(self.tempdir, 'gitshallow') bb.utils.mkdirhier(self.gitdir) bb.process.run('git init', cwd=self.gitdir) + bb.process.run('git config user.email "you@example.com"', cwd=self.gitdir) + bb.process.run('git config user.name "Your Name"', cwd=self.gitdir) def assertRefs(self, expected_refs): actual_refs = self.git(['for-each-ref', '--format=%(refname)']).splitlines() @@ -1513,6 +1510,8 @@ class GitShallowTest(FetcherTest): bb.utils.mkdirhier(self.srcdir) self.git('init', cwd=self.srcdir) + self.git('config user.email "you@example.com"', cwd=self.srcdir) + self.git('config user.name "Your Name"', cwd=self.srcdir) self.d.setVar('WORKDIR', self.tempdir) self.d.setVar('S', self.gitdir) self.d.delVar('PREMIRRORS') @@ -1594,6 +1593,7 @@ class GitShallowTest(FetcherTest): # fetch and unpack, from the shallow tarball bb.utils.remove(self.gitdir, recurse=True) + bb.process.run('chmod u+w -R "%s"' % ud.clonedir) bb.utils.remove(ud.clonedir, recurse=True) bb.utils.remove(ud.clonedir.replace('gitsource', 'gitsubmodule'), recurse=True) @@ -1746,6 +1746,8 @@ class GitShallowTest(FetcherTest): smdir = os.path.join(self.tempdir, 'gitsubmodule') bb.utils.mkdirhier(smdir) self.git('init', cwd=smdir) + self.git('config user.email "you@example.com"', cwd=smdir) + self.git('config user.name "Your Name"', cwd=smdir) # Make this look like it was cloned from a remote... self.git('config --add remote.origin.url "%s"' % smdir, cwd=smdir) self.git('config --add remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"', cwd=smdir) @@ -1776,6 +1778,8 @@ class GitShallowTest(FetcherTest): smdir = os.path.join(self.tempdir, 'gitsubmodule') bb.utils.mkdirhier(smdir) self.git('init', cwd=smdir) + self.git('config user.email "you@example.com"', cwd=smdir) + self.git('config user.name "Your Name"', cwd=smdir) # Make this look like it was cloned from a remote... self.git('config --add remote.origin.url "%s"' % smdir, cwd=smdir) self.git('config --add remote.origin.fetch "+refs/heads/*:refs/remotes/origin/*"', cwd=smdir) @@ -1794,7 +1798,7 @@ class GitShallowTest(FetcherTest): # Set up the mirror mirrordir = os.path.join(self.tempdir, 'mirror') - os.rename(self.dldir, mirrordir) + bb.utils.rename(self.dldir, mirrordir) self.d.setVar('PREMIRRORS', 'gitsm://.*/.* file://%s/\n' % mirrordir) # Fetch from the mirror @@ -1818,8 +1822,8 @@ class GitShallowTest(FetcherTest): self.git('annex init', cwd=self.srcdir) open(os.path.join(self.srcdir, 'c'), 'w').close() self.git('annex add c', cwd=self.srcdir) - self.git('commit -m annex-c -a', cwd=self.srcdir) - bb.process.run('chmod u+w -R %s' % os.path.join(self.srcdir, '.git', 'annex')) + self.git('commit --author "Foo Bar <foo@bar>" -m annex-c -a', cwd=self.srcdir) + bb.process.run('chmod u+w -R %s' % self.srcdir) uri = 'gitannex://%s;protocol=file;subdir=${S}' % self.srcdir fetcher, ud = self.fetch_shallow(uri) @@ -1912,7 +1916,7 @@ class GitShallowTest(FetcherTest): bb.utils.mkdirhier(mirrordir) self.d.setVar('PREMIRRORS', 'git://.*/.* file://%s/\n' % mirrordir) - os.rename(os.path.join(self.dldir, mirrortarball), + bb.utils.rename(os.path.join(self.dldir, mirrortarball), os.path.join(mirrordir, mirrortarball)) # Fetch from the mirror @@ -2094,6 +2098,8 @@ class GitLfsTest(FetcherTest): bb.utils.mkdirhier(self.srcdir) self.git('init', cwd=self.srcdir) + self.git('config user.email "you@example.com"', cwd=self.srcdir) + self.git('config user.name "Your Name"', cwd=self.srcdir) with open(os.path.join(self.srcdir, '.gitattributes'), 'wt') as attrs: attrs.write('*.mp3 filter=lfs -text') self.git(['add', '.gitattributes'], cwd=self.srcdir) @@ -2634,3 +2640,29 @@ class NPMTest(FetcherTest): fetcher = bb.fetch.Fetch(['npmsw://' + swfile], self.d) fetcher.download() self.assertTrue(os.path.exists(ud.localpath)) + +class GitSharedTest(FetcherTest): + def setUp(self): + super(GitSharedTest, self).setUp() + self.recipe_url = "git://git.openembedded.org/bitbake" + self.d.setVar('SRCREV', '82ea737a0b42a8b53e11c9cde141e9e9c0bd8c40') + + @skipIfNoNetwork() + def test_shared_unpack(self): + fetcher = bb.fetch.Fetch([self.recipe_url], self.d) + + fetcher.download() + fetcher.unpack(self.unpackdir) + alt = os.path.join(self.unpackdir, 'git/.git/objects/info/alternates') + self.assertTrue(os.path.exists(alt)) + + @skipIfNoNetwork() + def test_noshared_unpack(self): + self.d.setVar('BB_GIT_NOSHARED', '1') + self.unpackdir += '_noshared' + fetcher = bb.fetch.Fetch([self.recipe_url], self.d) + + fetcher.download() + fetcher.unpack(self.unpackdir) + alt = os.path.join(self.unpackdir, 'git/.git/objects/info/alternates') + self.assertFalse(os.path.exists(alt)) diff --git a/poky/bitbake/lib/bb/tinfoil.py b/poky/bitbake/lib/bb/tinfoil.py index 796a98f05..27a341541 100644 --- a/poky/bitbake/lib/bb/tinfoil.py +++ b/poky/bitbake/lib/bb/tinfoil.py @@ -52,6 +52,10 @@ class TinfoilDataStoreConnectorVarHistory: def remoteCommand(self, cmd, *args, **kwargs): return self.tinfoil.run_command('dataStoreConnectorVarHistCmd', self.dsindex, cmd, args, kwargs) + def emit(self, var, oval, val, o, d): + ret = self.tinfoil.run_command('dataStoreConnectorVarHistCmdEmit', self.dsindex, var, oval, val, d.dsindex) + o.write(ret) + def __getattr__(self, name): if not hasattr(bb.data_smart.VariableHistory, name): raise AttributeError("VariableHistory has no such method %s" % name) diff --git a/poky/bitbake/lib/bb/ui/knotty.py b/poky/bitbake/lib/bb/ui/knotty.py index 0efa614df..65ff2727d 100644 --- a/poky/bitbake/lib/bb/ui/knotty.py +++ b/poky/bitbake/lib/bb/ui/knotty.py @@ -21,6 +21,7 @@ import fcntl import struct import copy import atexit +from itertools import groupby from bb.ui import uihelper @@ -539,6 +540,13 @@ def main(server, eventHandler, params, tf = TerminalFilter): except OSError: pass + # Add the logging domains specified by the user on the command line + for (domainarg, iterator) in groupby(params.debug_domains): + dlevel = len(tuple(iterator)) + l = logconfig["loggers"].setdefault("BitBake.%s" % domainarg, {}) + l["level"] = logging.DEBUG - dlevel + 1 + l.setdefault("handlers", []).extend(["BitBake.verbconsole"]) + conf = bb.msg.setLoggingConfig(logconfig, logconfigfile) if sys.stdin.isatty() and sys.stdout.isatty(): diff --git a/poky/bitbake/lib/bb/utils.py b/poky/bitbake/lib/bb/utils.py index b282d09ab..6ba1d2a37 100644 --- a/poky/bitbake/lib/bb/utils.py +++ b/poky/bitbake/lib/bb/utils.py @@ -782,7 +782,7 @@ def movefile(src, dest, newmtime = None, sstat = None): if sstat[stat.ST_DEV] == dstat[stat.ST_DEV]: try: - os.rename(src, destpath) + bb.utils.rename(src, destpath) renamefailed = 0 except Exception as e: if e.errno != errno.EXDEV: @@ -796,7 +796,7 @@ def movefile(src, dest, newmtime = None, sstat = None): if stat.S_ISREG(sstat[stat.ST_MODE]): try: # For safety copy then move it over. shutil.copyfile(src, destpath + "#new") - os.rename(destpath + "#new", destpath) + bb.utils.rename(destpath + "#new", destpath) didcopy = 1 except Exception as e: print('movefile: copy', src, '->', dest, 'failed.', e) @@ -874,7 +874,7 @@ def copyfile(src, dest, newmtime = None, sstat = None): # For safety copy then move it over. shutil.copyfile(src, dest + "#new") - os.rename(dest + "#new", dest) + bb.utils.rename(dest + "#new", dest) except Exception as e: logger.warning("copyfile: copy %s to %s failed (%s)" % (src, dest, e)) return False @@ -1669,3 +1669,15 @@ def is_semver(version): return False return True + +# Wrapper around os.rename which can handle cross device problems +# e.g. from container filesystems +def rename(src, dst): + try: + os.rename(src, dst) + except OSError as err: + if err.errno == 18: + # Invalid cross-device link error + shutil.move(src, dst) + else: + raise err diff --git a/poky/bitbake/lib/bblayers/layerindex.py b/poky/bitbake/lib/bblayers/layerindex.py index b2f27b21e..793651620 100644 --- a/poky/bitbake/lib/bblayers/layerindex.py +++ b/poky/bitbake/lib/bblayers/layerindex.py @@ -159,12 +159,17 @@ class LayerIndexPlugin(ActionPlugin): logger.plain(' recommended by: %s' % ' '.join(recommendedby)) if dependencies: - fetchdir = self.tinfoil.config_data.getVar('BBLAYERS_FETCH_DIR') - if not fetchdir: - logger.error("Cannot get BBLAYERS_FETCH_DIR") - return 1 + if args.fetchdir: + fetchdir = args.fetchdir + else: + fetchdir = self.tinfoil.config_data.getVar('BBLAYERS_FETCH_DIR') + if not fetchdir: + logger.error("Cannot get BBLAYERS_FETCH_DIR") + return 1 + if not os.path.exists(fetchdir): os.makedirs(fetchdir) + addlayers = [] for deplayerbranch in dependencies: @@ -206,6 +211,8 @@ class LayerIndexPlugin(ActionPlugin): """ args.show_only = True args.ignore = [] + args.fetchdir = "" + args.shallow = True self.do_layerindex_fetch(args) def register_commands(self, sp): @@ -214,6 +221,7 @@ class LayerIndexPlugin(ActionPlugin): parser_layerindex_fetch.add_argument('-b', '--branch', help='branch name to fetch') parser_layerindex_fetch.add_argument('-s', '--shallow', help='do only shallow clones (--depth=1)', action='store_true') parser_layerindex_fetch.add_argument('-i', '--ignore', help='assume the specified layers do not need to be fetched/added (separate multiple layers with commas, no spaces)', metavar='LAYER') + parser_layerindex_fetch.add_argument('-f', '--fetchdir', help='directory to fetch the layer(s) into (will be created if it does not exist)') parser_layerindex_fetch.add_argument('layername', nargs='+', help='layer to fetch') parser_layerindex_show_depends = self.add_command(sp, 'layerindex-show-depends', self.do_layerindex_show_depends, parserecipes=False) diff --git a/poky/bitbake/lib/hashserv/client.py b/poky/bitbake/lib/hashserv/client.py index e05c1eb56..531170967 100644 --- a/poky/bitbake/lib/hashserv/client.py +++ b/poky/bitbake/lib/hashserv/client.py @@ -8,110 +8,26 @@ import json import logging import socket import os -from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client +import bb.asyncrpc +from . import create_async_client logger = logging.getLogger("hashserv.client") -class HashConnectionError(Exception): - pass - - -class AsyncClient(object): +class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 def __init__(self): - self.reader = None - self.writer = None + super().__init__('OEHASHEQUIV', '1.1', logger) self.mode = self.MODE_NORMAL - self.max_chunk = DEFAULT_MAX_CHUNK - - async def connect_tcp(self, address, port): - async def connect_sock(): - return await asyncio.open_connection(address, port) - - self._connect_sock = connect_sock - - async def connect_unix(self, path): - async def connect_sock(): - return await asyncio.open_unix_connection(path) - - self._connect_sock = connect_sock - - async def connect(self): - if self.reader is None or self.writer is None: - (self.reader, self.writer) = await self._connect_sock() - - self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8")) - await self.writer.drain() - - cur_mode = self.mode - self.mode = self.MODE_NORMAL - await self._set_mode(cur_mode) - - async def close(self): - self.reader = None - - if self.writer is not None: - self.writer.close() - self.writer = None - - async def _send_wrapper(self, proc): - count = 0 - while True: - try: - await self.connect() - return await proc() - except ( - OSError, - HashConnectionError, - json.JSONDecodeError, - UnicodeDecodeError, - ) as e: - logger.warning("Error talking to server: %s" % e) - if count >= 3: - if not isinstance(e, HashConnectionError): - raise HashConnectionError(str(e)) - raise e - await self.close() - count += 1 - - async def send_message(self, msg): - async def get_line(): - line = await self.reader.readline() - if not line: - raise HashConnectionError("Connection closed") - - line = line.decode("utf-8") - - if not line.endswith("\n"): - raise HashConnectionError("Bad message %r" % message) - - return line - - async def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode("utf-8")) - await self.writer.drain() - - l = await get_line() - - m = json.loads(l) - if m and "chunk-stream" in m: - lines = [] - while True: - l = (await get_line()).rstrip("\n") - if not l: - break - lines.append(l) - - m = json.loads("".join(lines)) - - return m - return await self._send_wrapper(proc) + async def setup_connection(self): + await super().setup_connection() + cur_mode = self.mode + self.mode = self.MODE_NORMAL + await self._set_mode(cur_mode) async def send_stream(self, msg): async def proc(): @@ -119,7 +35,7 @@ class AsyncClient(object): await self.writer.drain() l = await self.reader.readline() if not l: - raise HashConnectionError("Connection closed") + raise ConnectionError("Connection closed") return l.decode("utf-8").rstrip() return await self._send_wrapper(proc) @@ -128,11 +44,11 @@ class AsyncClient(object): if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: r = await self.send_stream("END") if r != "ok": - raise HashConnectionError("Bad response from server %r" % r) + raise ConnectionError("Bad response from server %r" % r) elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: r = await self.send_message({"get-stream": None}) if r != "ok": - raise HashConnectionError("Bad response from server %r" % r) + raise ConnectionError("Bad response from server %r" % r) elif new_mode != self.mode: raise Exception( "Undefined mode transition %r -> %r" % (self.mode, new_mode) @@ -189,12 +105,10 @@ class AsyncClient(object): return (await self.send_message({"backfill-wait": None}))["tasks"] -class Client(object): +class Client(bb.asyncrpc.Client): def __init__(self): - self.client = AsyncClient() - self.loop = asyncio.new_event_loop() - - for call in ( + super().__init__() + self._add_methods( "connect_tcp", "close", "get_unihash", @@ -204,30 +118,7 @@ class Client(object): "get_stats", "reset_stats", "backfill_wait", - ): - downcall = getattr(self.client, call) - setattr(self, call, self._get_downcall_wrapper(downcall)) - - def _get_downcall_wrapper(self, downcall): - def wrapper(*args, **kwargs): - return self.loop.run_until_complete(downcall(*args, **kwargs)) - - return wrapper - - def connect_unix(self, path): - # AF_UNIX has path length issues so chdir here to workaround - cwd = os.getcwd() - try: - os.chdir(os.path.dirname(path)) - self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) - self.loop.run_until_complete(self.client.connect()) - finally: - os.chdir(cwd) - - @property - def max_chunk(self): - return self.client.max_chunk - - @max_chunk.setter - def max_chunk(self, value): - self.client.max_chunk = value + ) + + def _get_async_client(self): + return AsyncClient() diff --git a/poky/bitbake/lib/hashserv/server.py b/poky/bitbake/lib/hashserv/server.py index a0dc0c170..c941c0e9d 100644 --- a/poky/bitbake/lib/hashserv/server.py +++ b/poky/bitbake/lib/hashserv/server.py @@ -14,7 +14,9 @@ import signal import socket import sys import time -from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client, TABLE_COLUMNS +from . import create_async_client, TABLE_COLUMNS +import bb.asyncrpc + logger = logging.getLogger('hashserv.server') @@ -109,12 +111,6 @@ class Stats(object): return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} -class ClientError(Exception): - pass - -class ServerError(Exception): - pass - def insert_task(cursor, data, ignore=False): keys = sorted(data.keys()) query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( @@ -149,7 +145,7 @@ async def copy_outhash_from_upstream(client, db, method, outhash, taskhash): return d -class ServerClient(object): +class ServerClient(bb.asyncrpc.AsyncServerConnection): FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' OUTHASH_QUERY = ''' @@ -168,21 +164,19 @@ class ServerClient(object): ''' def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): - self.reader = reader - self.writer = writer + super().__init__(reader, writer, 'OEHASHEQUIV', logger) self.db = db self.request_stats = request_stats - self.max_chunk = DEFAULT_MAX_CHUNK + self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK self.backfill_queue = backfill_queue self.upstream = upstream - self.handlers = { + self.handlers.update({ 'get': self.handle_get, 'get-outhash': self.handle_get_outhash, 'get-stream': self.handle_get_stream, 'get-stats': self.handle_get_stats, - 'chunk-stream': self.handle_chunk, - } + }) if not read_only: self.handlers.update({ @@ -192,56 +186,19 @@ class ServerClient(object): 'backfill-wait': self.handle_backfill_wait, }) + def validate_proto_version(self): + return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) + async def process_requests(self): if self.upstream is not None: self.upstream_client = await create_async_client(self.upstream) else: self.upstream_client = None - try: - - - self.addr = self.writer.get_extra_info('peername') - logger.debug('Client %r connected' % (self.addr,)) - - # Read protocol and version - protocol = await self.reader.readline() - if protocol is None: - return - - (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split() - if proto_name != 'OEHASHEQUIV': - return - - proto_version = tuple(int(v) for v in proto_version.split('.')) - if proto_version < (1, 0) or proto_version > (1, 1): - return - - # Read headers. Currently, no headers are implemented, so look for - # an empty line to signal the end of the headers - while True: - line = await self.reader.readline() - if line is None: - return + await super().process_requests() - line = line.decode('utf-8').rstrip() - if not line: - break - - # Handle messages - while True: - d = await self.read_message() - if d is None: - break - await self.dispatch_message(d) - await self.writer.drain() - except ClientError as e: - logger.error(str(e)) - finally: - if self.upstream_client is not None: - await self.upstream_client.close() - - self.writer.close() + if self.upstream_client is not None: + await self.upstream_client.close() async def dispatch_message(self, msg): for k in self.handlers.keys(): @@ -255,47 +212,7 @@ class ServerClient(object): await self.handlers[k](msg[k]) return - raise ClientError("Unrecognized command %r" % msg) - - def write_message(self, msg): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode('utf-8')) - - async def read_message(self): - l = await self.reader.readline() - if not l: - return None - - try: - message = l.decode('utf-8') - - if not message.endswith('\n'): - return None - - return json.loads(message) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - logger.error('Bad message from client: %r' % message) - raise e - - async def handle_chunk(self, request): - lines = [] - try: - while True: - l = await self.reader.readline() - l = l.rstrip(b"\n").decode("utf-8") - if not l: - break - lines.append(l) - - msg = json.loads(''.join(lines)) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - logger.error('Bad message from client: %r' % message) - raise e - - if 'chunk-stream' in msg: - raise ClientError("Nested chunks are not allowed") - - await self.dispatch_message(msg) + raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) async def handle_get(self, request): method = request['method'] @@ -499,74 +416,20 @@ class ServerClient(object): cursor.close() -class Server(object): +class Server(bb.asyncrpc.AsyncServer): def __init__(self, db, loop=None, upstream=None, read_only=False): if upstream and read_only: - raise ServerError("Read-only hashserv cannot pull from an upstream server") + raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") + + super().__init__(logger, loop) self.request_stats = Stats() self.db = db - - if loop is None: - self.loop = asyncio.new_event_loop() - self.close_loop = True - else: - self.loop = loop - self.close_loop = False - self.upstream = upstream self.read_only = read_only - self._cleanup_socket = None - - def start_tcp_server(self, host, port): - self.server = self.loop.run_until_complete( - asyncio.start_server(self.handle_client, host, port, loop=self.loop) - ) - - for s in self.server.sockets: - logger.info('Listening on %r' % (s.getsockname(),)) - # Newer python does this automatically. Do it manually here for - # maximum compatibility - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) - - name = self.server.sockets[0].getsockname() - if self.server.sockets[0].family == socket.AF_INET6: - self.address = "[%s]:%d" % (name[0], name[1]) - else: - self.address = "%s:%d" % (name[0], name[1]) - - def start_unix_server(self, path): - def cleanup(): - os.unlink(path) - - cwd = os.getcwd() - try: - # Work around path length limits in AF_UNIX - os.chdir(os.path.dirname(path)) - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) - ) - finally: - os.chdir(cwd) - - logger.info('Listening on %r' % path) - - self._cleanup_socket = cleanup - self.address = "unix://%s" % os.path.abspath(path) - - async def handle_client(self, reader, writer): - # writer.transport.set_write_buffer_limits(0) - try: - client = ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) - await client.process_requests() - except Exception as e: - import traceback - logger.error('Error from client: %s' % str(e), exc_info=True) - traceback.print_exc() - writer.close() - logger.info('Client disconnected') + def accept_client(self, reader, writer): + return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) @contextmanager def _backfill_worker(self): @@ -597,31 +460,8 @@ class Server(object): else: yield - def serve_forever(self): - def signal_handler(): - self.loop.stop() - - asyncio.set_event_loop(self.loop) - try: - self.backfill_queue = asyncio.Queue() - - self.loop.add_signal_handler(signal.SIGTERM, signal_handler) - - with self._backfill_worker(): - try: - self.loop.run_forever() - except KeyboardInterrupt: - pass - - self.server.close() - - self.loop.run_until_complete(self.server.wait_closed()) - logger.info('Server shutting down') - finally: - if self.close_loop: - if sys.version_info >= (3, 6): - self.loop.run_until_complete(self.loop.shutdown_asyncgens()) - self.loop.close() + def run_loop_forever(self): + self.backfill_queue = asyncio.Queue() - if self._cleanup_socket is not None: - self._cleanup_socket() + with self._backfill_worker(): + super().run_loop_forever() diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py index 1a696481e..e2b762dbf 100644 --- a/poky/bitbake/lib/hashserv/tests.py +++ b/poky/bitbake/lib/hashserv/tests.py @@ -6,7 +6,6 @@ # from . import create_server, create_client -from .client import HashConnectionError import hashlib import logging import multiprocessing @@ -277,7 +276,7 @@ class HashEquivalenceCommonTests(object): outhash2 = '3c979c3db45c569f51ab7626a4651074be3a9d11a84b1db076f5b14f7d39db44' unihash2 = '90e9bc1d1f094c51824adca7f8ea79a048d68824' - with self.assertRaises(HashConnectionError): + with self.assertRaises(ConnectionError): ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2) # Ensure that the database was not modified diff --git a/poky/bitbake/lib/prserv/serv.py b/poky/bitbake/lib/prserv/serv.py index 25dcf8a0e..5e322bf83 100644 --- a/poky/bitbake/lib/prserv/serv.py +++ b/poky/bitbake/lib/prserv/serv.py @@ -5,8 +5,6 @@ import os,sys,logging import signal, time from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler -import threading -import queue import socket import io import sqlite3 @@ -14,14 +12,10 @@ import bb.server.xmlrpcclient import prserv import prserv.db import errno -import select +import multiprocessing logger = logging.getLogger("BitBake.PRserv") -if sys.hexversion < 0x020600F0: - print("Sorry, python 2.6 or later is required.") - sys.exit(1) - class Handler(SimpleXMLRPCRequestHandler): def _dispatch(self,method,params): try: @@ -37,7 +31,7 @@ singleton = None class PRServer(SimpleXMLRPCServer): - def __init__(self, dbfile, logfile, interface, daemon=True): + def __init__(self, dbfile, logfile, interface): ''' constructor ''' try: SimpleXMLRPCServer.__init__(self, interface, @@ -50,57 +44,18 @@ class PRServer(SimpleXMLRPCServer): raise PRServiceConfigError self.dbfile=dbfile - self.daemon=daemon self.logfile=logfile - self.working_thread=None self.host, self.port = self.socket.getsockname() - self.pidfile=PIDPREFIX % (self.host, self.port) self.register_function(self.getPR, "getPR") - self.register_function(self.quit, "quit") self.register_function(self.ping, "ping") self.register_function(self.export, "export") - self.register_function(self.dump_db, "dump_db") self.register_function(self.importone, "importone") self.register_introspection_functions() - self.quitpipein, self.quitpipeout = os.pipe() - - self.requestqueue = queue.Queue() - self.handlerthread = threading.Thread(target = self.process_request_thread) - self.handlerthread.daemon = False - - def process_request_thread(self): - """Same as in BaseServer but as a thread. - - In addition, exception handling is done here. - - """ - iter_count = 1 + self.iter_count = 0 # 60 iterations between syncs or sync if dirty every ~30 seconds - iterations_between_sync = 60 - - bb.utils.set_process_name("PRServ Handler") - - while not self.quitflag: - try: - (request, client_address) = self.requestqueue.get(True, 30) - except queue.Empty: - self.table.sync_if_dirty() - continue - if request is None: - continue - try: - self.finish_request(request, client_address) - self.shutdown_request(request) - iter_count = (iter_count + 1) % iterations_between_sync - if iter_count == 0: - self.table.sync_if_dirty() - except: - self.handle_error(request, client_address) - self.shutdown_request(request) - self.table.sync() - self.table.sync_if_dirty() + self.iterations_between_sync = 60 def sigint_handler(self, signum, stack): if self.table: @@ -109,11 +64,30 @@ class PRServer(SimpleXMLRPCServer): def sigterm_handler(self, signum, stack): if self.table: self.table.sync() - self.quit() - self.requestqueue.put((None, None)) + raise(SystemExit) def process_request(self, request, client_address): - self.requestqueue.put((request, client_address)) + if request is None: + return + try: + self.finish_request(request, client_address) + self.shutdown_request(request) + self.iter_count = (self.iter_count + 1) % self.iterations_between_sync + if self.iter_count == 0: + self.table.sync_if_dirty() + except: + self.handle_error(request, client_address) + self.shutdown_request(request) + self.table.sync() + self.table.sync_if_dirty() + + def serve_forever(self, poll_interval=0.5): + signal.signal(signal.SIGINT, self.sigint_handler) + signal.signal(signal.SIGTERM, self.sigterm_handler) + + self.db = prserv.db.PRData(self.dbfile) + self.table = self.db["PRMAIN"] + return super().serve_forever(poll_interval) def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): try: @@ -122,31 +96,11 @@ class PRServer(SimpleXMLRPCServer): logger.error(str(exc)) return None - def dump_db(self): - """ - Returns a script (string) that reconstructs the state of the - entire database at the time this function is called. The script - language is defined by the backing database engine, which is a - function of server configuration. - Returns None if the database engine does not support dumping to - script or if some other error is encountered in processing. - """ - buff = io.StringIO() - try: - self.table.sync() - self.table.dump_db(buff) - return buff.getvalue() - except Exception as exc: - logger.error(str(exc)) - return None - finally: - buff.close() - def importone(self, version, pkgarch, checksum, value): return self.table.importone(version, pkgarch, checksum, value) def ping(self): - return not self.quitflag + return True def getinfo(self): return (self.host, self.port) @@ -161,144 +115,6 @@ class PRServer(SimpleXMLRPCServer): logger.error(str(exc)) return None - def quit(self): - self.quitflag=True - os.write(self.quitpipeout, b"q") - os.close(self.quitpipeout) - return - - def work_forever(self,): - self.quitflag = False - # This timeout applies to the poll in TCPServer, we need the select - # below to wake on our quit pipe closing. We only ever call into handle_request - # if there is data there. - self.timeout = 0.01 - - bb.utils.set_process_name("PRServ") - - # DB connection must be created after all forks - self.db = prserv.db.PRData(self.dbfile) - self.table = self.db["PRMAIN"] - - logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % - (self.dbfile, self.host, self.port, str(os.getpid()))) - - self.handlerthread.start() - while not self.quitflag: - ready = select.select([self.fileno(), self.quitpipein], [], [], 30) - if self.quitflag: - break - if self.fileno() in ready[0]: - self.handle_request() - self.handlerthread.join() - self.db.disconnect() - logger.info("PRServer: stopping...") - self.server_close() - os.close(self.quitpipein) - return - - def start(self): - if self.daemon: - pid = self.daemonize() - else: - pid = self.fork() - self.pid = pid - - # Ensure both the parent sees this and the child from the work_forever log entry above - logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % - (self.dbfile, self.host, self.port, str(pid))) - - def delpid(self): - os.remove(self.pidfile) - - def daemonize(self): - """ - See Advanced Programming in the UNIX, Sec 13.3 - """ - try: - pid = os.fork() - if pid > 0: - os.waitpid(pid, 0) - #parent return instead of exit to give control - return pid - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - os.setsid() - """ - fork again to make sure the daemon is not session leader, - which prevents it from acquiring controlling terminal - """ - try: - pid = os.fork() - if pid > 0: #parent - os._exit(0) - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - self.cleanup_handles() - os._exit(0) - - def fork(self): - try: - pid = os.fork() - if pid > 0: - self.socket.close() # avoid ResourceWarning in parent - return pid - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - bb.utils.signal_on_parent_exit("SIGTERM") - self.cleanup_handles() - os._exit(0) - - def cleanup_handles(self): - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) - os.chdir("/") - - sys.stdout.flush() - sys.stderr.flush() - - # We could be called from a python thread with io.StringIO as - # stdout/stderr or it could be 'real' unix fd forking where we need - # to physically close the fds to prevent the program launching us from - # potentially hanging on a pipe. Handle both cases. - si = open('/dev/null', 'r') - try: - os.dup2(si.fileno(),sys.stdin.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stdin = si - so = open(self.logfile, 'a+') - try: - os.dup2(so.fileno(),sys.stdout.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stdout = so - try: - os.dup2(so.fileno(),sys.stderr.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stderr = so - - # Clear out all log handlers prior to the fork() to avoid calling - # event handlers not part of the PRserver - for logger_iter in logging.Logger.manager.loggerDict.keys(): - logging.getLogger(logger_iter).handlers = [] - - # Ensure logging makes it to the logfile - streamhandler = logging.StreamHandler() - streamhandler.setLevel(logging.DEBUG) - formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") - streamhandler.setFormatter(formatter) - logger.addHandler(streamhandler) - - # write pidfile - pid = str(os.getpid()) - with open(self.pidfile, 'w') as pf: - pf.write("%s\n" % pid) - - self.work_forever() - self.delpid() - class PRServSingleton(object): def __init__(self, dbfile, logfile, interface): self.dbfile = dbfile @@ -308,8 +124,10 @@ class PRServSingleton(object): self.port = None def start(self): - self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False) - self.prserv.start() + self.prserv = PRServer(self.dbfile, self.logfile, self.interface) + self.process = multiprocessing.Process(target=self.prserv.serve_forever) + self.process.start() + self.host, self.port = self.prserv.getinfo() def getinfo(self): @@ -323,13 +141,6 @@ class PRServerConnection(object): self.port = port self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) - def terminate(self): - try: - logger.info("Terminating PRServer...") - self.connection.quit() - except Exception as exc: - sys.stderr.write("%s\n" % str(exc)) - def getPR(self, version, pkgarch, checksum): return self.connection.getPR(version, pkgarch, checksum) @@ -339,15 +150,82 @@ class PRServerConnection(object): def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): return self.connection.export(version, pkgarch, checksum, colinfo) - def dump_db(self): - return self.connection.dump_db() - def importone(self, version, pkgarch, checksum, value): return self.connection.importone(version, pkgarch, checksum, value) def getinfo(self): return self.host, self.port +def run_as_daemon(func, pidfile, logfile): + """ + See Advanced Programming in the UNIX, Sec 13.3 + """ + try: + pid = os.fork() + if pid > 0: + os.waitpid(pid, 0) + #parent return instead of exit to give control + return pid + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + os.setsid() + """ + fork again to make sure the daemon is not session leader, + which prevents it from acquiring controlling terminal + """ + try: + pid = os.fork() + if pid > 0: #parent + os._exit(0) + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + os.chdir("/") + + sys.stdout.flush() + sys.stderr.flush() + + # We could be called from a python thread with io.StringIO as + # stdout/stderr or it could be 'real' unix fd forking where we need + # to physically close the fds to prevent the program launching us from + # potentially hanging on a pipe. Handle both cases. + si = open('/dev/null', 'r') + try: + os.dup2(si.fileno(),sys.stdin.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stdin = si + so = open(logfile, 'a+') + try: + os.dup2(so.fileno(),sys.stdout.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stdout = so + try: + os.dup2(so.fileno(),sys.stderr.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stderr = so + + # Clear out all log handlers prior to the fork() to avoid calling + # event handlers not part of the PRserver + for logger_iter in logging.Logger.manager.loggerDict.keys(): + logging.getLogger(logger_iter).handlers = [] + + # Ensure logging makes it to the logfile + streamhandler = logging.StreamHandler() + streamhandler.setLevel(logging.DEBUG) + formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") + streamhandler.setFormatter(formatter) + logger.addHandler(streamhandler) + + # write pidfile + pid = str(os.getpid()) + with open(pidfile, 'w') as pf: + pf.write("%s\n" % pid) + + func() + os.remove(pidfile) + os._exit(0) + def start_daemon(dbfile, host, port, logfile): ip = socket.gethostbyname(host) pidfile = PIDPREFIX % (ip, port) @@ -363,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile): return 1 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) - server.start() + run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with # the one the server actually is listening, so at least warn the user about it @@ -400,25 +278,13 @@ def stop_daemon(host, port): return 1 try: - PRServerConnection(ip, port).terminate() - except: - logger.critical("Stop PRService %s:%d failed" % (host,port)) - - try: - if pid: - wait_timeout = 0 - print("Waiting for pr-server to exit.") - while is_running(pid) and wait_timeout < 50: - time.sleep(0.1) - wait_timeout += 1 - - if is_running(pid): - print("Sending SIGTERM to pr-server.") - os.kill(pid,signal.SIGTERM) - time.sleep(0.1) + if is_running(pid): + print("Sending SIGTERM to pr-server.") + os.kill(pid, signal.SIGTERM) + time.sleep(0.1) - if os.path.exists(pidfile): - os.remove(pidfile) + if os.path.exists(pidfile): + os.remove(pidfile) except OSError as e: err = str(e) @@ -494,19 +360,14 @@ def auto_start(d): def auto_shutdown(): global singleton - if singleton: - host, port = singleton.getinfo() - try: - PRServerConnection(host, port).terminate() - except: - logger.critical("Stop PRService %s:%d failed" % (host,port)) - - try: - os.waitpid(singleton.prserv.pid, 0) - except ChildProcessError: - pass + if singleton and singleton.process: + singleton.process.terminate() + singleton.process.join() singleton = None def ping(host, port): conn=PRServerConnection(host, port) return conn.ping() + +def connect(host, port): + return PRServerConnection(host, port) |