diff options
Diffstat (limited to 'poky/bitbake/lib/bb')
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/__init__.py | 33 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/client.py | 120 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/connection.py | 146 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/exceptions.py | 21 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/serv.py | 370 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/cache.py | 6 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/codeparser.py | 5 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/command.py | 8 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/cooker.py | 36 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/cookerdata.py | 11 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/__init__.py | 80 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/crate.py | 2 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/git.py | 28 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/gitsm.py | 4 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/hg.py | 1 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/npm.py | 1 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/npmsw.py | 3 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/runqueue.py | 93 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/fetch.py | 136 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/utils.py | 16 |
20 files changed, 783 insertions, 337 deletions
diff --git a/poky/bitbake/lib/bb/asyncrpc/__init__.py b/poky/bitbake/lib/bb/asyncrpc/__init__.py index 9a85e9965b..a4371643d7 100644 --- a/poky/bitbake/lib/bb/asyncrpc/__init__.py +++ b/poky/bitbake/lib/bb/asyncrpc/__init__.py @@ -4,30 +4,13 @@ # SPDX-License-Identifier: GPL-2.0-only # -import itertools -import json - -# The Python async server defaults to a 64K receive buffer, so we hardcode our -# maximum chunk size. It would be better if the client and server reported to -# each other what the maximum chunk sizes were, but that will slow down the -# connection setup with a round trip delay so I'd rather not do that unless it -# is necessary -DEFAULT_MAX_CHUNK = 32 * 1024 - - -def chunkify(msg, max_chunk): - if len(msg) < max_chunk - 1: - yield ''.join((msg, "\n")) - else: - yield ''.join((json.dumps({ - 'chunk-stream': None - }), "\n")) - - args = [iter(msg)] * (max_chunk - 1) - for m in map(''.join, itertools.zip_longest(*args, fillvalue='')): - yield ''.join(itertools.chain(m, "\n")) - yield "\n" - from .client import AsyncClient, Client -from .serv import AsyncServer, AsyncServerConnection, ClientError, ServerError +from .serv import AsyncServer, AsyncServerConnection +from .connection import DEFAULT_MAX_CHUNK +from .exceptions import ( + ClientError, + ServerError, + ConnectionClosedError, + InvokeError, +) diff --git a/poky/bitbake/lib/bb/asyncrpc/client.py b/poky/bitbake/lib/bb/asyncrpc/client.py index fa042bbe87..0d7cd85780 100644 --- a/poky/bitbake/lib/bb/asyncrpc/client.py +++ b/poky/bitbake/lib/bb/asyncrpc/client.py @@ -10,13 +10,13 @@ import json import os import socket import sys -from . import chunkify, DEFAULT_MAX_CHUNK +from .connection import StreamConnection, WebsocketConnection, DEFAULT_MAX_CHUNK +from .exceptions import ConnectionClosedError, InvokeError class AsyncClient(object): def __init__(self, proto_name, proto_version, logger, timeout=30): - self.reader = None - self.writer = None + self.socket = None self.max_chunk = DEFAULT_MAX_CHUNK self.proto_name = proto_name self.proto_version = proto_version @@ -25,7 +25,8 @@ class AsyncClient(object): async def connect_tcp(self, address, port): async def connect_sock(): - return await asyncio.open_connection(address, port) + reader, writer = await asyncio.open_connection(address, port) + return StreamConnection(reader, writer, self.timeout, self.max_chunk) self._connect_sock = connect_sock @@ -40,27 +41,39 @@ class AsyncClient(object): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) sock.connect(os.path.basename(path)) finally: - os.chdir(cwd) - return await asyncio.open_unix_connection(sock=sock) + os.chdir(cwd) + reader, writer = await asyncio.open_unix_connection(sock=sock) + return StreamConnection(reader, writer, self.timeout, self.max_chunk) + + self._connect_sock = connect_sock + + async def connect_websocket(self, uri): + import websockets + + async def connect_sock(): + websocket = await websockets.connect(uri, ping_interval=None) + return WebsocketConnection(websocket, self.timeout) self._connect_sock = connect_sock async def setup_connection(self): - s = '%s %s\n\n' % (self.proto_name, self.proto_version) - self.writer.write(s.encode("utf-8")) - await self.writer.drain() + # Send headers + await self.socket.send("%s %s" % (self.proto_name, self.proto_version)) + # End of headers + await self.socket.send("") async def connect(self): - if self.reader is None or self.writer is None: - (self.reader, self.writer) = await self._connect_sock() + if self.socket is None: + self.socket = await self._connect_sock() await self.setup_connection() - async def close(self): - self.reader = None + async def disconnect(self): + if self.socket is not None: + await self.socket.close() + self.socket = None - if self.writer is not None: - self.writer.close() - self.writer = None + async def close(self): + await self.disconnect() async def _send_wrapper(self, proc): count = 0 @@ -71,6 +84,7 @@ class AsyncClient(object): except ( OSError, ConnectionError, + ConnectionClosedError, json.JSONDecodeError, UnicodeDecodeError, ) as e: @@ -82,49 +96,27 @@ class AsyncClient(object): await self.close() count += 1 - async def send_message(self, msg): - async def get_line(): - try: - line = await asyncio.wait_for(self.reader.readline(), self.timeout) - except asyncio.TimeoutError: - raise ConnectionError("Timed out waiting for server") - - if not line: - raise ConnectionError("Connection closed") - - line = line.decode("utf-8") - - if not line.endswith("\n"): - raise ConnectionError("Bad message %r" % (line)) - - return line + def check_invoke_error(self, msg): + if isinstance(msg, dict) and "invoke-error" in msg: + raise InvokeError(msg["invoke-error"]["message"]) + async def invoke(self, msg): async def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode("utf-8")) - await self.writer.drain() - - l = await get_line() + await self.socket.send_message(msg) + return await self.socket.recv_message() - m = json.loads(l) - if m and "chunk-stream" in m: - lines = [] - while True: - l = (await get_line()).rstrip("\n") - if not l: - break - lines.append(l) + result = await self._send_wrapper(proc) + self.check_invoke_error(result) + return result - m = json.loads("".join(lines)) + async def ping(self): + return await self.invoke({"ping": {}}) - return m + async def __aenter__(self): + return self - return await self._send_wrapper(proc) - - async def ping(self): - return await self.send_message( - {'ping': {}} - ) + async def __aexit__(self, exc_type, exc_value, traceback): + await self.close() class Client(object): @@ -142,7 +134,7 @@ class Client(object): # required (but harmless) with it. asyncio.set_event_loop(self.loop) - self._add_methods('connect_tcp', 'ping') + self._add_methods("connect_tcp", "ping") @abc.abstractmethod def _get_async_client(self): @@ -171,8 +163,20 @@ class Client(object): def max_chunk(self, value): self.client.max_chunk = value - def close(self): + def disconnect(self): self.loop.run_until_complete(self.client.close()) - if sys.version_info >= (3, 6): - self.loop.run_until_complete(self.loop.shutdown_asyncgens()) - self.loop.close() + + def close(self): + if self.loop: + self.loop.run_until_complete(self.client.close()) + if sys.version_info >= (3, 6): + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + self.loop = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + return False diff --git a/poky/bitbake/lib/bb/asyncrpc/connection.py b/poky/bitbake/lib/bb/asyncrpc/connection.py new file mode 100644 index 0000000000..7f0cf6ba96 --- /dev/null +++ b/poky/bitbake/lib/bb/asyncrpc/connection.py @@ -0,0 +1,146 @@ +# +# Copyright BitBake Contributors +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import asyncio +import itertools +import json +from datetime import datetime +from .exceptions import ClientError, ConnectionClosedError + + +# The Python async server defaults to a 64K receive buffer, so we hardcode our +# maximum chunk size. It would be better if the client and server reported to +# each other what the maximum chunk sizes were, but that will slow down the +# connection setup with a round trip delay so I'd rather not do that unless it +# is necessary +DEFAULT_MAX_CHUNK = 32 * 1024 + + +def chunkify(msg, max_chunk): + if len(msg) < max_chunk - 1: + yield "".join((msg, "\n")) + else: + yield "".join((json.dumps({"chunk-stream": None}), "\n")) + + args = [iter(msg)] * (max_chunk - 1) + for m in map("".join, itertools.zip_longest(*args, fillvalue="")): + yield "".join(itertools.chain(m, "\n")) + yield "\n" + + +def json_serialize(obj): + if isinstance(obj, datetime): + return obj.isoformat() + raise TypeError("Type %s not serializeable" % type(obj)) + + +class StreamConnection(object): + def __init__(self, reader, writer, timeout, max_chunk=DEFAULT_MAX_CHUNK): + self.reader = reader + self.writer = writer + self.timeout = timeout + self.max_chunk = max_chunk + + @property + def address(self): + return self.writer.get_extra_info("peername") + + async def send_message(self, msg): + for c in chunkify(json.dumps(msg, default=json_serialize), self.max_chunk): + self.writer.write(c.encode("utf-8")) + await self.writer.drain() + + async def recv_message(self): + l = await self.recv() + + m = json.loads(l) + if not m: + return m + + if "chunk-stream" in m: + lines = [] + while True: + l = await self.recv() + if not l: + break + lines.append(l) + + m = json.loads("".join(lines)) + + return m + + async def send(self, msg): + self.writer.write(("%s\n" % msg).encode("utf-8")) + await self.writer.drain() + + async def recv(self): + if self.timeout < 0: + line = await self.reader.readline() + else: + try: + line = await asyncio.wait_for(self.reader.readline(), self.timeout) + except asyncio.TimeoutError: + raise ConnectionError("Timed out waiting for data") + + if not line: + raise ConnectionClosedError("Connection closed") + + line = line.decode("utf-8") + + if not line.endswith("\n"): + raise ConnectionError("Bad message %r" % (line)) + + return line.rstrip() + + async def close(self): + self.reader = None + if self.writer is not None: + self.writer.close() + self.writer = None + + +class WebsocketConnection(object): + def __init__(self, socket, timeout): + self.socket = socket + self.timeout = timeout + + @property + def address(self): + return ":".join(str(s) for s in self.socket.remote_address) + + async def send_message(self, msg): + await self.send(json.dumps(msg, default=json_serialize)) + + async def recv_message(self): + m = await self.recv() + return json.loads(m) + + async def send(self, msg): + import websockets.exceptions + + try: + await self.socket.send(msg) + except websockets.exceptions.ConnectionClosed: + raise ConnectionClosedError("Connection closed") + + async def recv(self): + import websockets.exceptions + + try: + if self.timeout < 0: + return await self.socket.recv() + + try: + return await asyncio.wait_for(self.socket.recv(), self.timeout) + except asyncio.TimeoutError: + raise ConnectionError("Timed out waiting for data") + except websockets.exceptions.ConnectionClosed: + raise ConnectionClosedError("Connection closed") + + async def close(self): + if self.socket is not None: + await self.socket.close() + self.socket = None diff --git a/poky/bitbake/lib/bb/asyncrpc/exceptions.py b/poky/bitbake/lib/bb/asyncrpc/exceptions.py new file mode 100644 index 0000000000..ae1043a38b --- /dev/null +++ b/poky/bitbake/lib/bb/asyncrpc/exceptions.py @@ -0,0 +1,21 @@ +# +# Copyright BitBake Contributors +# +# SPDX-License-Identifier: GPL-2.0-only +# + + +class ClientError(Exception): + pass + + +class InvokeError(Exception): + pass + + +class ServerError(Exception): + pass + + +class ConnectionClosedError(Exception): + pass diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py index d2de4891b8..f0be9a6cdb 100644 --- a/poky/bitbake/lib/bb/asyncrpc/serv.py +++ b/poky/bitbake/lib/bb/asyncrpc/serv.py @@ -12,241 +12,321 @@ import signal import socket import sys import multiprocessing -from . import chunkify, DEFAULT_MAX_CHUNK +import logging +from .connection import StreamConnection, WebsocketConnection +from .exceptions import ClientError, ServerError, ConnectionClosedError, InvokeError -class ClientError(Exception): - pass - - -class ServerError(Exception): - pass +class ClientLoggerAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return f"[Client {self.extra['address']}] {msg}", kwargs class AsyncServerConnection(object): - def __init__(self, reader, writer, proto_name, logger): - self.reader = reader - self.writer = writer + # If a handler returns this object (e.g. `return self.NO_RESPONSE`), no + # return message will be automatically be sent back to the client + NO_RESPONSE = object() + + def __init__(self, socket, proto_name, logger): + self.socket = socket self.proto_name = proto_name - self.max_chunk = DEFAULT_MAX_CHUNK self.handlers = { - 'chunk-stream': self.handle_chunk, - 'ping': self.handle_ping, + "ping": self.handle_ping, } - self.logger = logger + self.logger = ClientLoggerAdapter( + logger, + { + "address": socket.address, + }, + ) + + async def close(self): + await self.socket.close() async def process_requests(self): try: - self.addr = self.writer.get_extra_info('peername') - self.logger.debug('Client %r connected' % (self.addr,)) + self.logger.info("Client %r connected" % (self.socket.address,)) # Read protocol and version - client_protocol = await self.reader.readline() + client_protocol = await self.socket.recv() if not client_protocol: return - (client_proto_name, client_proto_version) = client_protocol.decode('utf-8').rstrip().split() + (client_proto_name, client_proto_version) = client_protocol.split() if client_proto_name != self.proto_name: - self.logger.debug('Rejecting invalid protocol %s' % (self.proto_name)) + self.logger.debug("Rejecting invalid protocol %s" % (self.proto_name)) return - self.proto_version = tuple(int(v) for v in client_proto_version.split('.')) + self.proto_version = tuple(int(v) for v in client_proto_version.split(".")) if not self.validate_proto_version(): - self.logger.debug('Rejecting invalid protocol version %s' % (client_proto_version)) + self.logger.debug( + "Rejecting invalid protocol version %s" % (client_proto_version) + ) return # Read headers. Currently, no headers are implemented, so look for # an empty line to signal the end of the headers while True: - line = await self.reader.readline() - if not line: - return - - line = line.decode('utf-8').rstrip() - if not line: + header = await self.socket.recv() + if not header: break # Handle messages while True: - d = await self.read_message() + d = await self.socket.recv_message() if d is None: break - await self.dispatch_message(d) - await self.writer.drain() - except ClientError as e: + try: + response = await self.dispatch_message(d) + except InvokeError as e: + await self.socket.send_message( + {"invoke-error": {"message": str(e)}} + ) + break + + if response is not self.NO_RESPONSE: + await self.socket.send_message(response) + + except ConnectionClosedError as e: + self.logger.info(str(e)) + except (ClientError, ConnectionError) as e: self.logger.error(str(e)) finally: - self.writer.close() + await self.close() async def dispatch_message(self, msg): for k in self.handlers.keys(): if k in msg: - self.logger.debug('Handling %s' % k) - await self.handlers[k](msg[k]) - return + self.logger.debug("Handling %s" % k) + return await self.handlers[k](msg[k]) raise ClientError("Unrecognized command %r" % msg) - def write_message(self, msg): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode('utf-8')) + async def handle_ping(self, request): + return {"alive": True} - async def read_message(self): - l = await self.reader.readline() - if not l: - return None - try: - message = l.decode('utf-8') +class StreamServer(object): + def __init__(self, handler, logger): + self.handler = handler + self.logger = logger + self.closed = False - if not message.endswith('\n'): - return None + async def handle_stream_client(self, reader, writer): + # writer.transport.set_write_buffer_limits(0) + socket = StreamConnection(reader, writer, -1) + if self.closed: + await socket.close() + return + + await self.handler(socket) + + async def stop(self): + self.closed = True + + +class TCPStreamServer(StreamServer): + def __init__(self, host, port, handler, logger): + super().__init__(handler, logger) + self.host = host + self.port = port + + def start(self, loop): + self.server = loop.run_until_complete( + asyncio.start_server(self.handle_stream_client, self.host, self.port) + ) + + for s in self.server.sockets: + self.logger.debug("Listening on %r" % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + # Enable keep alives. This prevents broken client connections + # from persisting on the server for long periods of time. + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + return [self.server.wait_closed()] + + async def stop(self): + await super().stop() + self.server.close() + + def cleanup(self): + pass - return json.loads(message) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self.logger.error('Bad message from client: %r' % message) - raise e - async def handle_chunk(self, request): - lines = [] - try: - while True: - l = await self.reader.readline() - l = l.rstrip(b"\n").decode("utf-8") - if not l: - break - lines.append(l) +class UnixStreamServer(StreamServer): + def __init__(self, path, handler, logger): + super().__init__(handler, logger) + self.path = path - msg = json.loads(''.join(lines)) - except (json.JSONDecodeError, UnicodeDecodeError) as e: - self.logger.error('Bad message from client: %r' % lines) - raise e + def start(self, loop): + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(self.path)) + self.server = loop.run_until_complete( + asyncio.start_unix_server( + self.handle_stream_client, os.path.basename(self.path) + ) + ) + finally: + os.chdir(cwd) - if 'chunk-stream' in msg: - raise ClientError("Nested chunks are not allowed") + self.logger.debug("Listening on %r" % self.path) + self.address = "unix://%s" % os.path.abspath(self.path) + return [self.server.wait_closed()] - await self.dispatch_message(msg) + async def stop(self): + await super().stop() + self.server.close() - async def handle_ping(self, request): - response = {'alive': True} - self.write_message(response) + def cleanup(self): + os.unlink(self.path) -class AsyncServer(object): - def __init__(self, logger): - self._cleanup_socket = None +class WebsocketsServer(object): + def __init__(self, host, port, handler, logger): + self.host = host + self.port = port + self.handler = handler self.logger = logger - self.start = None - self.address = None - self.loop = None - def start_tcp_server(self, host, port): - def start_tcp(): - self.server = self.loop.run_until_complete( - asyncio.start_server(self.handle_client, host, port) + def start(self, loop): + import websockets.server + + self.server = loop.run_until_complete( + websockets.server.serve( + self.client_handler, + self.host, + self.port, + ping_interval=None, ) + ) - for s in self.server.sockets: - self.logger.debug('Listening on %r' % (s.getsockname(),)) - # Newer python does this automatically. Do it manually here for - # maximum compatibility - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + for s in self.server.sockets: + self.logger.debug("Listening on %r" % (s.getsockname(),)) - # Enable keep alives. This prevents broken client connections - # from persisting on the server for long periods of time. - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) + # Enable keep alives. This prevents broken client connections + # from persisting on the server for long periods of time. + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 30) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) - name = self.server.sockets[0].getsockname() - if self.server.sockets[0].family == socket.AF_INET6: - self.address = "[%s]:%d" % (name[0], name[1]) - else: - self.address = "%s:%d" % (name[0], name[1]) + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "ws://[%s]:%d" % (name[0], name[1]) + else: + self.address = "ws://%s:%d" % (name[0], name[1]) - self.start = start_tcp + return [self.server.wait_closed()] - def start_unix_server(self, path): - def cleanup(): - os.unlink(path) + async def stop(self): + self.server.close() - def start_unix(): - cwd = os.getcwd() - try: - # Work around path length limits in AF_UNIX - os.chdir(os.path.dirname(path)) - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client, os.path.basename(path)) - ) - finally: - os.chdir(cwd) + def cleanup(self): + pass - self.logger.debug('Listening on %r' % path) + async def client_handler(self, websocket): + socket = WebsocketConnection(websocket, -1) + await self.handler(socket) - self._cleanup_socket = cleanup - self.address = "unix://%s" % os.path.abspath(path) - self.start = start_unix +class AsyncServer(object): + def __init__(self, logger): + self.logger = logger + self.loop = None + self.run_tasks = [] - @abc.abstractmethod - def accept_client(self, reader, writer): - pass + def start_tcp_server(self, host, port): + self.server = TCPStreamServer(host, port, self._client_handler, self.logger) - async def handle_client(self, reader, writer): - # writer.transport.set_write_buffer_limits(0) + def start_unix_server(self, path): + self.server = UnixStreamServer(path, self._client_handler, self.logger) + + def start_websocket_server(self, host, port): + self.server = WebsocketsServer(host, port, self._client_handler, self.logger) + + async def _client_handler(self, socket): + address = socket.address try: - client = self.accept_client(reader, writer) + client = self.accept_client(socket) await client.process_requests() except Exception as e: import traceback - self.logger.error('Error from client: %s' % str(e), exc_info=True) + + self.logger.error( + "Error from client %s: %s" % (address, str(e)), exc_info=True + ) traceback.print_exc() - writer.close() - self.logger.debug('Client disconnected') + finally: + self.logger.debug("Client %s disconnected", address) + await socket.close() - def run_loop_forever(self): - try: - self.loop.run_forever() - except KeyboardInterrupt: - pass + @abc.abstractmethod + def accept_client(self, socket): + pass + + async def stop(self): + self.logger.debug("Stopping server") + await self.server.stop() + + def start(self): + tasks = self.server.start(self.loop) + self.address = self.server.address + return tasks def signal_handler(self): self.logger.debug("Got exit signal") - self.loop.stop() + self.loop.create_task(self.stop()) - def _serve_forever(self): + def _serve_forever(self, tasks): try: self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + self.loop.add_signal_handler(signal.SIGINT, self.signal_handler) + self.loop.add_signal_handler(signal.SIGQUIT, self.signal_handler) signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) - self.run_loop_forever() - self.server.close() + self.loop.run_until_complete(asyncio.gather(*tasks)) - self.loop.run_until_complete(self.server.wait_closed()) - self.logger.debug('Server shutting down') + self.logger.debug("Server shutting down") finally: - if self._cleanup_socket is not None: - self._cleanup_socket() + self.server.cleanup() def serve_forever(self): """ Serve requests in the current process """ + self._create_loop() + tasks = self.start() + self._serve_forever(tasks) + self.loop.close() + + def _create_loop(self): # Create loop and override any loop that may have existed in # a parent process. It is possible that the usecases of # serve_forever might be constrained enough to allow using # get_event_loop here, but better safe than sorry for now. self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) - self.start() - self._serve_forever() - def serve_as_process(self, *, prefunc=None, args=()): + def serve_as_process(self, *, prefunc=None, args=(), log_level=None): """ Serve requests in a child process """ + def run(queue): # Create loop and override any loop that may have existed # in a parent process. Without doing this and instead @@ -259,18 +339,22 @@ class AsyncServer(object): # more general, though, as any potential use of asyncio in # Cooker could create a loop that needs to replaced in this # new process. - self.loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.loop) + self._create_loop() try: - self.start() + self.address = None + tasks = self.start() finally: + # Always put the server address to wake up the parent task queue.put(self.address) queue.close() if prefunc is not None: prefunc(self, *args) - self._serve_forever() + if log_level is not None: + self.logger.setLevel(log_level) + + self._serve_forever(tasks) if sys.version_info >= (3, 6): self.loop.run_until_complete(self.loop.shutdown_asyncgens()) diff --git a/poky/bitbake/lib/bb/cache.py b/poky/bitbake/lib/bb/cache.py index 5ea41c5de0..18d5574a31 100644 --- a/poky/bitbake/lib/bb/cache.py +++ b/poky/bitbake/lib/bb/cache.py @@ -344,9 +344,7 @@ def virtualfn2realfn(virtualfn): """ mc = "" if virtualfn.startswith('mc:') and virtualfn.count(':') >= 2: - elems = virtualfn.split(':') - mc = elems[1] - virtualfn = ":".join(elems[2:]) + (_, mc, virtualfn) = virtualfn.split(':', 2) fn = virtualfn cls = "" @@ -369,7 +367,7 @@ def realfn2virtual(realfn, cls, mc): def variant2virtual(realfn, variant): """ - Convert a real filename + the associated subclass keyword to a virtual filename + Convert a real filename + a variant to a virtual filename """ if variant == "": return realfn diff --git a/poky/bitbake/lib/bb/codeparser.py b/poky/bitbake/lib/bb/codeparser.py index eabeda591a..cd39409434 100644 --- a/poky/bitbake/lib/bb/codeparser.py +++ b/poky/bitbake/lib/bb/codeparser.py @@ -62,6 +62,7 @@ def check_indent(codestr): modulecode_deps = {} def add_module_functions(fn, functions, namespace): + import os fstat = os.stat(fn) fixedhash = fn + ":" + str(fstat.st_size) + ":" + str(fstat.st_mtime) for f in functions: @@ -255,8 +256,8 @@ class PythonParser(): def visit_Call(self, node): name = self.called_node_name(node.func) if name and (name.endswith(self.getvars) or name.endswith(self.getvarflags) or name in self.containsfuncs or name in self.containsanyfuncs): - if isinstance(node.args[0], ast.Str): - varname = node.args[0].s + if isinstance(node.args[0], ast.Constant) and isinstance(node.args[0].value, str): + varname = node.args[0].value if name in self.containsfuncs and isinstance(node.args[1], ast.Str): if varname not in self.contains: self.contains[varname] = set() diff --git a/poky/bitbake/lib/bb/command.py b/poky/bitbake/lib/bb/command.py index f2ee587161..79b6c0738f 100644 --- a/poky/bitbake/lib/bb/command.py +++ b/poky/bitbake/lib/bb/command.py @@ -550,8 +550,8 @@ class CommandsSync: and return a datastore object representing the environment for the recipe. """ - fn = params[0] - mc = bb.runqueue.mc_from_tid(fn) + virtualfn = params[0] + (fn, cls, mc) = bb.cache.virtualfn2realfn(virtualfn) appends = params[1] appendlist = params[2] if len(params) > 3: @@ -574,10 +574,10 @@ class CommandsSync: if config_data: # We have to use a different function here if we're passing in a datastore # NOTE: we took a copy above, so we don't do it here again - envdata = command.cooker.databuilder._parse_recipe(config_data, fn, appendfiles, mc, layername)[''] + envdata = command.cooker.databuilder._parse_recipe(config_data, fn, appendfiles, mc, layername)[cls] else: # Use the standard path - envdata = command.cooker.databuilder.parseRecipe(fn, appendfiles, layername) + envdata = command.cooker.databuilder.parseRecipe(virtualfn, appendfiles, layername) idx = command.remotedatastores.store(envdata) return DataStoreConnectionHandle(idx) parseRecipeFile.readonly = True diff --git a/poky/bitbake/lib/bb/cooker.py b/poky/bitbake/lib/bb/cooker.py index 599c7ddaa2..d658db9bd8 100644 --- a/poky/bitbake/lib/bb/cooker.py +++ b/poky/bitbake/lib/bb/cooker.py @@ -303,6 +303,10 @@ class BBCooker: self.data_hash = self.databuilder.data_hash self.extraconfigdata = {} + eventlog = self.data.getVar("BB_DEFAULT_EVENTLOG") + if not self.configuration.writeeventlog and eventlog: + self.setupEventLog(eventlog) + if consolelog: self.data.setVar("BB_CONSOLELOG", consolelog) @@ -345,7 +349,7 @@ class BBCooker: sync=False, upstream=upstream, ) - self.hashserv.serve_as_process() + self.hashserv.serve_as_process(log_level=logging.WARNING) for mc in self.databuilder.mcdata: self.databuilder.mcorigdata[mc].setVar("BB_HASHSERVE", self.hashservaddr) self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.hashservaddr) @@ -409,6 +413,18 @@ class BBCooker: self._parsecache_set(False) + def setupEventLog(self, eventlog): + if self.eventlog and self.eventlog[0] != eventlog: + bb.event.unregister_UIHhandler(self.eventlog[1]) + if not self.eventlog or self.eventlog[0] != eventlog: + # we log all events to a file if so directed + # register the log file writer as UI Handler + if not os.path.exists(os.path.dirname(eventlog)): + bb.utils.mkdirhier(os.path.dirname(eventlog)) + writer = EventWriter(self, eventlog) + EventLogWriteHandler = namedtuple('EventLogWriteHandler', ['event']) + self.eventlog = (eventlog, bb.event.register_UIHhandler(EventLogWriteHandler(writer))) + def updateConfigOpts(self, options, environment, cmdline): self.ui_cmdline = cmdline clean = True @@ -428,14 +444,7 @@ class BBCooker: setattr(self.configuration, o, options[o]) if self.configuration.writeeventlog: - if self.eventlog and self.eventlog[0] != self.configuration.writeeventlog: - bb.event.unregister_UIHhandler(self.eventlog[1]) - if not self.eventlog or self.eventlog[0] != self.configuration.writeeventlog: - # we log all events to a file if so directed - # register the log file writer as UI Handler - writer = EventWriter(self, self.configuration.writeeventlog) - EventLogWriteHandler = namedtuple('EventLogWriteHandler', ['event']) - self.eventlog = (self.configuration.writeeventlog, bb.event.register_UIHhandler(EventLogWriteHandler(writer))) + self.setupEventLog(self.configuration.writeeventlog) bb.msg.loggerDefaultLogLevel = self.configuration.default_loglevel bb.msg.loggerDefaultDomains = self.configuration.debug_domains @@ -1548,7 +1557,13 @@ class BBCooker: def getAllKeysWithFlags(self, flaglist): + def dummy_autorev(d): + return + dump = {} + # Horrible but for now we need to avoid any sideeffects of autorev being called + saved = bb.fetch2.get_autorev + bb.fetch2.get_autorev = dummy_autorev for k in self.data.keys(): try: expand = True @@ -1568,6 +1583,7 @@ class BBCooker: dump[k][d] = None except Exception as e: print(e) + bb.fetch2.get_autorev = saved return dump @@ -1787,7 +1803,7 @@ class CookerCollectFiles(object): for ignored in ('SCCS', 'CVS', '.svn'): if ignored in dirs: dirs.remove(ignored) - found += [os.path.join(dir, f) for f in files if (f.endswith(['.bb', '.bbappend']))] + found += [os.path.join(dir, f) for f in files if (f.endswith(('.bb', '.bbappend')))] return found diff --git a/poky/bitbake/lib/bb/cookerdata.py b/poky/bitbake/lib/bb/cookerdata.py index 42b8d64685..0649e40995 100644 --- a/poky/bitbake/lib/bb/cookerdata.py +++ b/poky/bitbake/lib/bb/cookerdata.py @@ -503,8 +503,8 @@ class CookerDataBuilder(object): if appends: bb_data.setVar('__BBAPPEND', " ".join(appends)) - bb_data = bb.parse.handle(bbfile, bb_data) - return bb_data + + return bb.parse.handle(bbfile, bb_data) def parseRecipeVariants(self, bbfile, appends, virtonly=False, mc=None, layername=None): """ @@ -516,8 +516,7 @@ class CookerDataBuilder(object): (bbfile, virtual, mc) = bb.cache.virtualfn2realfn(bbfile) bb_data = self.mcdata[mc].createCopy() bb_data.setVar("__ONLYFINALISE", virtual or "default") - datastores = self._parse_recipe(bb_data, bbfile, appends, mc, layername) - return datastores + return self._parse_recipe(bb_data, bbfile, appends, mc, layername) if mc is not None: bb_data = self.mcdata[mc].createCopy() @@ -543,5 +542,5 @@ class CookerDataBuilder(object): """ logger.debug("Parsing %s (full)" % virtualfn) (fn, virtual, mc) = bb.cache.virtualfn2realfn(virtualfn) - bb_data = self.parseRecipeVariants(virtualfn, appends, virtonly=True, layername=layername) - return bb_data[virtual] + datastores = self.parseRecipeVariants(virtualfn, appends, virtonly=True, layername=layername) + return datastores[virtual] diff --git a/poky/bitbake/lib/bb/fetch2/__init__.py b/poky/bitbake/lib/bb/fetch2/__init__.py index ffb1a92b26..22a2f80107 100644 --- a/poky/bitbake/lib/bb/fetch2/__init__.py +++ b/poky/bitbake/lib/bb/fetch2/__init__.py @@ -872,6 +872,8 @@ FETCH_EXPORT_VARS = ['HOME', 'PATH', 'AWS_PROFILE', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', + 'AWS_ROLE_ARN', + 'AWS_WEB_IDENTITY_TOKEN_FILE', 'AWS_DEFAULT_REGION', 'GIT_CACHE_PATH', 'REMOTE_CONTAINERS_IPC', @@ -1579,6 +1581,7 @@ class FetchMethod(object): unpackdir = rootdir if not unpack or not cmd: + urldata.unpack_tracer.unpack("file-copy", unpackdir) # If file == dest, then avoid any copies, as we already put the file into dest! dest = os.path.join(unpackdir, os.path.basename(file)) if file != dest and not (os.path.exists(dest) and os.path.samefile(file, dest)): @@ -1593,6 +1596,8 @@ class FetchMethod(object): destdir = urlpath.rsplit("/", 1)[0] + '/' bb.utils.mkdirhier("%s/%s" % (unpackdir, destdir)) cmd = 'cp -fpPRH "%s" "%s"' % (file, destdir) + else: + urldata.unpack_tracer.unpack("archive-extract", unpackdir) if not cmd: return @@ -1684,6 +1689,55 @@ class FetchMethod(object): """ return [] + +class DummyUnpackTracer(object): + """ + Abstract API definition for a class that traces unpacked source files back + to their respective upstream SRC_URI entries, for software composition + analysis, license compliance and detailed SBOM generation purposes. + User may load their own unpack tracer class (instead of the dummy + one) by setting the BB_UNPACK_TRACER_CLASS config parameter. + """ + def start(self, unpackdir, urldata_dict, d): + """ + Start tracing the core Fetch.unpack process, using an index to map + unpacked files to each SRC_URI entry. + This method is called by Fetch.unpack and it may receive nested calls by + gitsm and npmsw fetchers, that expand SRC_URI entries by adding implicit + URLs and by recursively calling Fetch.unpack from new (nested) Fetch + instances. + """ + return + def start_url(self, url): + """Start tracing url unpack process. + This method is called by Fetch.unpack before the fetcher-specific unpack + method starts, and it may receive nested calls by gitsm and npmsw + fetchers. + """ + return + def unpack(self, unpack_type, destdir): + """ + Set unpack_type and destdir for current url. + This method is called by the fetcher-specific unpack method after url + tracing started. + """ + return + def finish_url(self, url): + """Finish tracing url unpack process and update the file index. + This method is called by Fetch.unpack after the fetcher-specific unpack + method finished its job, and it may receive nested calls by gitsm + and npmsw fetchers. + """ + return + def complete(self): + """ + Finish tracing the Fetch.unpack process, and check if all nested + Fecth.unpack calls (if any) have been completed; if so, save collected + metadata. + """ + return + + class Fetch(object): def __init__(self, urls, d, cache = True, localonly = False, connection_cache = None): if localonly and cache: @@ -1704,10 +1758,30 @@ class Fetch(object): if key in urldata_cache: self.ud = urldata_cache[key] + # the unpack_tracer object needs to be made available to possible nested + # Fetch instances (when those are created by gitsm and npmsw fetchers) + # so we set it as a global variable + global unpack_tracer + try: + unpack_tracer + except NameError: + class_path = d.getVar("BB_UNPACK_TRACER_CLASS") + if class_path: + # use user-defined unpack tracer class + import importlib + module_name, _, class_name = class_path.rpartition(".") + module = importlib.import_module(module_name) + class_ = getattr(module, class_name) + unpack_tracer = class_() + else: + # fall back to the dummy/abstract class + unpack_tracer = DummyUnpackTracer() + for url in urls: if url not in self.ud: try: self.ud[url] = FetchData(url, d, localonly) + self.ud[url].unpack_tracer = unpack_tracer except NonLocalMethod: if localonly: self.ud[url] = None @@ -1883,6 +1957,8 @@ class Fetch(object): if not urls: urls = self.urls + unpack_tracer.start(root, self.ud, self.d) + for u in urls: ud = self.ud[u] ud.setup_localpath(self.d) @@ -1890,11 +1966,15 @@ class Fetch(object): if ud.lockfile: lf = bb.utils.lockfile(ud.lockfile) + unpack_tracer.start_url(u) ud.method.unpack(ud, root, self.d) + unpack_tracer.finish_url(u) if ud.lockfile: bb.utils.unlockfile(lf) + unpack_tracer.complete() + def clean(self, urls=None): """ Clean files that the fetcher gets or places diff --git a/poky/bitbake/lib/bb/fetch2/crate.py b/poky/bitbake/lib/bb/fetch2/crate.py index 3310ed0050..01d49435c3 100644 --- a/poky/bitbake/lib/bb/fetch2/crate.py +++ b/poky/bitbake/lib/bb/fetch2/crate.py @@ -101,8 +101,10 @@ class Crate(Wget): bp = d.getVar('BP') if bp == ud.parm.get('name'): cmd = "tar -xz --no-same-owner -f %s" % thefile + ud.unpack_tracer.unpack("crate-extract", rootdir) else: cargo_bitbake = self._cargo_bitbake_path(rootdir) + ud.unpack_tracer.unpack("cargo-extract", cargo_bitbake) cmd = "tar -xz --no-same-owner -f %s -C %s" % (thefile, cargo_bitbake) diff --git a/poky/bitbake/lib/bb/fetch2/git.py b/poky/bitbake/lib/bb/fetch2/git.py index 4385d0b37a..0deeb5cee1 100644 --- a/poky/bitbake/lib/bb/fetch2/git.py +++ b/poky/bitbake/lib/bb/fetch2/git.py @@ -48,10 +48,23 @@ Supported SRC_URI options are: instead of branch. The default is "0", set nobranch=1 if needed. +- subpath + Limit the checkout to a specific subpath of the tree. + By default, checkout the whole tree, set subpath=<path> if needed + +- destsuffix + The name of the path in which to place the checkout. + By default, the path is git/, set destsuffix=<suffix> if needed + - usehead For local git:// urls to use the current branch HEAD as the revision for use with AUTOREV. Implies nobranch. +- lfs + Enable the checkout to use LFS for large files. This will download all LFS files + in the download step, as the unpack step does not have network access. + The default is "1", set lfs=0 to skip. + """ # Copyright (C) 2005 Richard Purdie @@ -462,8 +475,8 @@ class Git(FetchMethod): # Only do this if the unpack resulted in a .git/lfs directory being # created; this only happens if at least one blob needed to be # downloaded. - if os.path.exists(os.path.join(tmpdir, "git", ".git", "lfs")): - runfetchcmd("tar -cf - lfs | tar -xf - -C %s" % ud.clonedir, d, workdir="%s/git/.git" % tmpdir) + if os.path.exists(os.path.join(ud.destdir, ".git", "lfs")): + runfetchcmd("tar -cf - lfs | tar -xf - -C %s" % ud.clonedir, d, workdir="%s/.git" % ud.destdir) def build_mirror_data(self, ud, d): @@ -589,6 +602,8 @@ class Git(FetchMethod): destdir = ud.destdir = os.path.join(destdir, destsuffix) if os.path.exists(destdir): bb.utils.prunedir(destdir) + if not ud.bareclone: + ud.unpack_tracer.unpack("git", destdir) need_lfs = self._need_lfs(ud) @@ -627,6 +642,8 @@ class Git(FetchMethod): raise bb.fetch2.FetchError("Repository %s has LFS content, install git-lfs on host to download (or set lfs=0 to ignore it)" % (repourl)) elif not need_lfs: bb.note("Repository %s has LFS content but it is not being fetched" % (repourl)) + else: + runfetchcmd("%s lfs install" % ud.basecmd, d, workdir=destdir) if not ud.nocheckout: if subpath: @@ -686,8 +703,11 @@ class Git(FetchMethod): Check if the repository has 'lfs' (large file) content """ - # The bare clonedir doesn't use the remote names; it has the branch immediately. - if wd == ud.clonedir: + if ud.nobranch: + # If no branch is specified, use the current git commit + refname = self._build_revision(ud, d, ud.names[0]) + elif wd == ud.clonedir: + # The bare clonedir doesn't use the remote names; it has the branch immediately. refname = ud.branches[ud.names[0]] else: refname = "origin/%s" % ud.branches[ud.names[0]] diff --git a/poky/bitbake/lib/bb/fetch2/gitsm.py b/poky/bitbake/lib/bb/fetch2/gitsm.py index a87361ccf3..f7f3af7212 100644 --- a/poky/bitbake/lib/bb/fetch2/gitsm.py +++ b/poky/bitbake/lib/bb/fetch2/gitsm.py @@ -218,6 +218,10 @@ class GitSM(Git): try: newfetch = Fetch([url], d, cache=False) + # modpath is needed by unpack tracer to calculate submodule + # checkout dir + new_ud = newfetch.ud[url] + new_ud.modpath = modpath newfetch.unpack(root=os.path.dirname(os.path.join(repo_conf, 'modules', module))) except Exception as e: logger.error('gitsm: submodule unpack failed: %s %s' % (type(e).__name__, str(e))) diff --git a/poky/bitbake/lib/bb/fetch2/hg.py b/poky/bitbake/lib/bb/fetch2/hg.py index 063e13008a..cbff8c490c 100644 --- a/poky/bitbake/lib/bb/fetch2/hg.py +++ b/poky/bitbake/lib/bb/fetch2/hg.py @@ -242,6 +242,7 @@ class Hg(FetchMethod): revflag = "-r %s" % ud.revision subdir = ud.parm.get("destsuffix", ud.module) codir = "%s/%s" % (destdir, subdir) + ud.unpack_tracer.unpack("hg", codir) scmdata = ud.parm.get("scmdata", "") if scmdata != "nokeep": diff --git a/poky/bitbake/lib/bb/fetch2/npm.py b/poky/bitbake/lib/bb/fetch2/npm.py index f83485ad85..15f3f19bc8 100644 --- a/poky/bitbake/lib/bb/fetch2/npm.py +++ b/poky/bitbake/lib/bb/fetch2/npm.py @@ -298,6 +298,7 @@ class Npm(FetchMethod): destsuffix = ud.parm.get("destsuffix", "npm") destdir = os.path.join(rootdir, destsuffix) npm_unpack(ud.localpath, destdir, d) + ud.unpack_tracer.unpack("npm", destdir) def clean(self, ud, d): """Clean any existing full or partial download""" diff --git a/poky/bitbake/lib/bb/fetch2/npmsw.py b/poky/bitbake/lib/bb/fetch2/npmsw.py index 4ff2c8ffc3..ff5f8dc755 100644 --- a/poky/bitbake/lib/bb/fetch2/npmsw.py +++ b/poky/bitbake/lib/bb/fetch2/npmsw.py @@ -191,7 +191,9 @@ class NpmShrinkWrap(FetchMethod): else: raise ParameterError("Unsupported dependency: %s" % name, ud.url) + # name is needed by unpack tracer for module mapping ud.deps.append({ + "name": name, "url": url, "localpath": localpath, "extrapaths": extrapaths, @@ -270,6 +272,7 @@ class NpmShrinkWrap(FetchMethod): destsuffix = ud.parm.get("destsuffix") if destsuffix: destdir = os.path.join(rootdir, destsuffix) + ud.unpack_tracer.unpack("npm-shrinkwrap", destdir) bb.utils.mkdirhier(destdir) bb.utils.copyfile(ud.shrinkwrap_file, diff --git a/poky/bitbake/lib/bb/runqueue.py b/poky/bitbake/lib/bb/runqueue.py index 56147c50a8..02d7ff9768 100644 --- a/poky/bitbake/lib/bb/runqueue.py +++ b/poky/bitbake/lib/bb/runqueue.py @@ -157,7 +157,7 @@ class RunQueueScheduler(object): (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) self.stamps[tid] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) if tid in self.rq.runq_buildable: - self.buildable.append(tid) + self.buildable.add(tid) self.rev_prio_map = None self.is_pressure_usable() @@ -1021,6 +1021,7 @@ class RunQueueData: for tid in list(runall_tids): mark_active(tid, 1) + self.target_tids.append(tid) if self.cooker.configuration.force: invalidate_task(tid, False) @@ -1318,6 +1319,16 @@ class RunQueue: self.worker = {} self.fakeworker = {} + @staticmethod + def send_pickled_data(worker, data, name): + msg = bytearray() + msg.extend(b"<" + name.encode() + b">") + pickled_data = pickle.dumps(data) + msg.extend(len(pickled_data).to_bytes(4, 'big')) + msg.extend(pickled_data) + msg.extend(b"</" + name.encode() + b">") + worker.stdin.write(msg) + def _start_worker(self, mc, fakeroot = False, rqexec = None): logger.debug("Starting bitbake-worker") magic = "decafbad" @@ -1355,9 +1366,9 @@ class RunQueue: "umask" : self.cfgData.getVar("BB_DEFAULT_UMASK"), } - worker.stdin.write(b"<cookerconfig>" + pickle.dumps(self.cooker.configuration) + b"</cookerconfig>") - worker.stdin.write(b"<extraconfigdata>" + pickle.dumps(self.cooker.extraconfigdata) + b"</extraconfigdata>") - worker.stdin.write(b"<workerdata>" + pickle.dumps(workerdata) + b"</workerdata>") + RunQueue.send_pickled_data(worker, self.cooker.configuration, "cookerconfig") + RunQueue.send_pickled_data(worker, self.cooker.extraconfigdata, "extraconfigdata") + RunQueue.send_pickled_data(worker, workerdata, "workerdata") worker.stdin.flush() return RunQueueWorker(worker, workerpipe) @@ -1367,7 +1378,7 @@ class RunQueue: return logger.debug("Teardown for bitbake-worker") try: - worker.process.stdin.write(b"<quit></quit>") + RunQueue.send_pickled_data(worker.process, b"", "quit") worker.process.stdin.flush() worker.process.stdin.close() except IOError: @@ -1756,20 +1767,20 @@ class RunQueue: for tid in invalidtasks: (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - h = self.rqdata.runtaskentries[tid].hash + h = self.rqdata.runtaskentries[tid].unihash matches = bb.siggen.find_siginfo(pn, taskname, [], self.cooker.databuilder.mcdata[mc]) match = None for m in matches: if h in m: match = m if match is None: - bb.fatal("Can't find a task we're supposed to have written out? (hash: %s)?" % h) + bb.fatal("Can't find a task we're supposed to have written out? (hash: %s tid: %s)?" % (h, tid)) matches = {k : v for k, v in iter(matches.items()) if h not in k} if matches: latestmatch = sorted(matches.keys(), key=lambda f: matches[f])[-1] prevh = __find_sha256__.search(latestmatch).group(0) output = bb.siggen.compare_sigfiles(latestmatch, match, recursecb) - bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, closest matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) + bb.plain("\nTask %s:%s couldn't be used from the cache because:\n We need hash %s, most recent matching task was %s\n " % (pn, taskname, h, prevh) + '\n '.join(output)) class RunQueueExecute: @@ -1851,11 +1862,6 @@ class RunQueueExecute: self.tasks_notcovered = set() self.scenequeue_notneeded = set() - # We can't skip specified target tasks which aren't setscene tasks - self.cantskip = set(self.rqdata.target_tids) - self.cantskip.difference_update(self.rqdata.runq_setscene_tids) - self.cantskip.intersection_update(self.rqdata.runtaskentries) - schedulers = self.get_schedulers() for scheduler in schedulers: if self.scheduler == scheduler.name: @@ -1868,7 +1874,23 @@ class RunQueueExecute: #if self.rqdata.runq_setscene_tids: self.sqdata = SQData() - build_scenequeue_data(self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self) + build_scenequeue_data(self.sqdata, self.rqdata, self) + + update_scenequeue_data(self.sqdata.sq_revdeps, self.sqdata, self.rqdata, self.rq, self.cooker, self.stampcache, self, summary=True) + + # Compute a list of 'stale' sstate tasks where the current hash does not match the one + # in any stamp files. Pass the list out to metadata as an event. + found = {} + for tid in self.rqdata.runq_setscene_tids: + (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) + stamps = bb.build.find_stale_stamps(taskname, taskfn) + if stamps: + if mc not in found: + found[mc] = {} + found[mc][tid] = stamps + for mc in found: + event = bb.event.StaleSetSceneTasks(found[mc]) + bb.event.fire(event, self.cooker.databuilder.mcdata[mc]) def runqueue_process_waitpid(self, task, status, fakerootlog=None): @@ -1894,14 +1916,14 @@ class RunQueueExecute: def finish_now(self): for mc in self.rq.worker: try: - self.rq.worker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, b"", "finishnow") self.rq.worker[mc].process.stdin.flush() except IOError: # worker must have died? pass for mc in self.rq.fakeworker: try: - self.rq.fakeworker[mc].process.stdin.write(b"<finishnow></finishnow>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, b"", "finishnow") self.rq.fakeworker[mc].process.stdin.flush() except IOError: # worker must have died? @@ -2196,10 +2218,10 @@ class RunQueueExecute: if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not self.cooker.configuration.dry_run: if not mc in self.rq.fakeworker: self.rq.start_fakeworker(self, mc) - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2297,10 +2319,10 @@ class RunQueueExecute: self.rq.state = runQueueFailed self.stats.taskFailed() return True - self.rq.fakeworker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, runtask, "runtask") self.rq.fakeworker[mc].process.stdin.flush() else: - self.rq.worker[mc].process.stdin.write(b"<runtask>" + pickle.dumps(runtask) + b"</runtask>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, runtask, "runtask") self.rq.worker[mc].process.stdin.flush() self.build_stamps[task] = bb.parse.siggen.stampfile_mcfn(taskname, taskfn, extrainfo=False) @@ -2388,7 +2410,7 @@ class RunQueueExecute: return notcovered = set(self.scenequeue_notcovered) - notcovered |= self.cantskip + notcovered |= self.sqdata.cantskip for tid in self.scenequeue_notcovered: notcovered |= self.sqdata.sq_covered_tasks[tid] notcovered |= self.sqdata.unskippable.difference(self.rqdata.runq_setscene_tids) @@ -2502,9 +2524,9 @@ class RunQueueExecute: if changed: for mc in self.rq.worker: - self.rq.worker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.worker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") for mc in self.rq.fakeworker: - self.rq.fakeworker[mc].process.stdin.write(b"<newtaskhashes>" + pickle.dumps(bb.parse.siggen.get_taskhashes()) + b"</newtaskhashes>") + RunQueue.send_pickled_data(self.rq.fakeworker[mc].process, bb.parse.siggen.get_taskhashes(), "newtaskhashes") hashequiv_logger.debug(pprint.pformat("Tasks changed:\n%s" % (changed))) @@ -2767,12 +2789,17 @@ class SQData(object): # A list of normal tasks a setscene task covers self.sq_covered_tasks = {} -def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): +def build_scenequeue_data(sqdata, rqdata, sqrq): sq_revdeps = {} sq_revdeps_squash = {} sq_collated_deps = {} + # We can't skip specified target tasks which aren't setscene tasks + sqdata.cantskip = set(rqdata.target_tids) + sqdata.cantskip.difference_update(rqdata.runq_setscene_tids) + sqdata.cantskip.intersection_update(rqdata.runtaskentries) + # We need to construct a dependency graph for the setscene functions. Intermediate # dependencies between the setscene tasks only complicate the code. This code # therefore aims to collapse the huge runqueue dependency tree into a smaller one @@ -2841,7 +2868,7 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): for tid in rqdata.runtaskentries: if not rqdata.runtaskentries[tid].revdeps: sqdata.unskippable.add(tid) - sqdata.unskippable |= sqrq.cantskip + sqdata.unskippable |= sqdata.cantskip while new: new = False orig = sqdata.unskippable.copy() @@ -2951,22 +2978,6 @@ def build_scenequeue_data(sqdata, rqdata, rq, cooker, stampcache, sqrq): sqrq.sq_deferred[tid] = sqdata.hashes[h] bb.debug(1, "Deferring %s after %s" % (tid, sqdata.hashes[h])) - update_scenequeue_data(sqdata.sq_revdeps, sqdata, rqdata, rq, cooker, stampcache, sqrq, summary=True) - - # Compute a list of 'stale' sstate tasks where the current hash does not match the one - # in any stamp files. Pass the list out to metadata as an event. - found = {} - for tid in rqdata.runq_setscene_tids: - (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) - stamps = bb.build.find_stale_stamps(taskname, taskfn) - if stamps: - if mc not in found: - found[mc] = {} - found[mc][tid] = stamps - for mc in found: - event = bb.event.StaleSetSceneTasks(found[mc]) - bb.event.fire(event, cooker.databuilder.mcdata[mc]) - def check_setscene_stamps(tid, rqdata, rq, stampcache, noexecstamp=False): (mc, fn, taskname, taskfn) = split_tid_mcfn(tid) diff --git a/poky/bitbake/lib/bb/tests/fetch.py b/poky/bitbake/lib/bb/tests/fetch.py index eeb7a31471..c7a23407c1 100644 --- a/poky/bitbake/lib/bb/tests/fetch.py +++ b/poky/bitbake/lib/bb/tests/fetch.py @@ -2277,7 +2277,7 @@ class GitLfsTest(FetcherTest): @skipIfNoGitLFS() @skipIfNoNetwork() - def test_real_git_lfs_repo_succeeds(self): + def test_real_git_lfs_repo_skips(self): self.d.setVar('SRC_URI', "git://gitlab.com/gitlab-examples/lfs.git;protocol=https;branch=master;lfs=0") f = self.get_real_git_lfs_file() # This is the actual non-smudged placeholder file on the repo if git-lfs does not run @@ -2290,24 +2290,41 @@ class GitLfsTest(FetcherTest): with open(f) as fh: self.assertEqual(lfs_file, fh.read()) + @skipIfNoGitLFS() def test_lfs_enabled(self): import shutil uri = 'git://%s;protocol=file;lfs=1;branch=master' % self.srcdir self.d.setVar('SRC_URI', uri) - # Careful: suppress initial attempt at downloading until - # we know whether git-lfs is installed. - fetcher, ud = self.fetch(uri=None, download=False) - self.assertIsNotNone(ud.method._find_git_lfs) + # With git-lfs installed, test that we can fetch and unpack + fetcher, ud = self.fetch() + shutil.rmtree(self.gitdir, ignore_errors=True) + fetcher.unpack(self.d.getVar('WORKDIR')) - # If git-lfs can be found, the unpack should be successful. Only - # attempt this with the real live copy of git-lfs installed. - if ud.method._find_git_lfs(self.d): - fetcher.download() - shutil.rmtree(self.gitdir, ignore_errors=True) - fetcher.unpack(self.d.getVar('WORKDIR')) + @skipIfNoGitLFS() + def test_lfs_disabled(self): + import shutil + + uri = 'git://%s;protocol=file;lfs=0;branch=master' % self.srcdir + self.d.setVar('SRC_URI', uri) + # Verify that the fetcher can survive even if the source + # repository has Git LFS usage configured. + fetcher, ud = self.fetch() + fetcher.unpack(self.d.getVar('WORKDIR')) + + def test_lfs_enabled_not_installed(self): + import shutil + + uri = 'git://%s;protocol=file;lfs=1;branch=master' % self.srcdir + self.d.setVar('SRC_URI', uri) + + # Careful: suppress initial attempt at downloading + fetcher, ud = self.fetch(uri=None, download=False) + + # Artificially assert that git-lfs is not installed, so + # we can verify a failure to unpack in it's absence. old_find_git_lfs = ud.method._find_git_lfs try: # If git-lfs cannot be found, the unpack should throw an error @@ -2319,29 +2336,21 @@ class GitLfsTest(FetcherTest): finally: ud.method._find_git_lfs = old_find_git_lfs - def test_lfs_disabled(self): + def test_lfs_disabled_not_installed(self): import shutil uri = 'git://%s;protocol=file;lfs=0;branch=master' % self.srcdir self.d.setVar('SRC_URI', uri) - # In contrast to test_lfs_enabled(), allow the implicit download - # done by self.fetch() to occur here. The point of this test case - # is to verify that the fetcher can survive even if the source - # repository has Git LFS usage configured. - fetcher, ud = self.fetch() - self.assertIsNotNone(ud.method._find_git_lfs) + # Careful: suppress initial attempt at downloading + fetcher, ud = self.fetch(uri=None, download=False) + # Artificially assert that git-lfs is not installed, so + # we can verify a failure to unpack in it's absence. old_find_git_lfs = ud.method._find_git_lfs try: - # If git-lfs can be found, the unpack should be successful. A - # live copy of git-lfs is not required for this case, so - # unconditionally forge its presence. - ud.method._find_git_lfs = lambda d: True - shutil.rmtree(self.gitdir, ignore_errors=True) - fetcher.unpack(self.d.getVar('WORKDIR')) - # If git-lfs cannot be found, the unpack should be successful - + # Even if git-lfs cannot be found, the unpack should be successful + fetcher.download() ud.method._find_git_lfs = lambda d: False shutil.rmtree(self.gitdir, ignore_errors=True) fetcher.unpack(self.d.getVar('WORKDIR')) @@ -3042,9 +3051,11 @@ class FetchPremirroronlyLocalTest(FetcherTest): self.d.setVar("BB_FETCH_PREMIRRORONLY", "1") self.d.setVar("BB_NO_NETWORK", "1") self.d.setVar("PREMIRRORS", self.recipe_url + " " + "file://{}".format(self.mirrordir) + " \n") + self.mirrorname = "git2_git.fake.repo.bitbake.tar.gz" + self.mirrorfile = os.path.join(self.mirrordir, self.mirrorname) + self.testfilename = "bitbake-fetch.test" def make_git_repo(self): - self.mirrorname = "git2_git.fake.repo.bitbake.tar.gz" recipeurl = "git:/git.fake.repo/bitbake" os.makedirs(self.gitdir) self.git_init(cwd=self.gitdir) @@ -3054,15 +3065,23 @@ class FetchPremirroronlyLocalTest(FetcherTest): def git_new_commit(self): import random - testfilename = "bibake-fetch.test" os.unlink(os.path.join(self.mirrordir, self.mirrorname)) - with open(os.path.join(self.gitdir, testfilename), "w") as testfile: - testfile.write("Useless random data {}".format(random.random())) - self.git("add {}".format(testfilename), self.gitdir) - self.git("commit -a -m \"This random commit {}. I'm useless.\"".format(random.random()), self.gitdir) + branch = self.git("branch --show-current", self.gitdir).split() + with open(os.path.join(self.gitdir, self.testfilename), "w") as testfile: + testfile.write("File {} from branch {}; Useless random data {}".format(self.testfilename, branch, random.random())) + self.git("add {}".format(self.testfilename), self.gitdir) + self.git("commit -a -m \"This random commit {} in branch {}. I'm useless.\"".format(random.random(), branch), self.gitdir) bb.process.run('tar -czvf {} .'.format(os.path.join(self.mirrordir, self.mirrorname)), cwd = self.gitdir) return self.git("rev-parse HEAD", self.gitdir).strip() + def git_new_branch(self, name): + self.git_new_commit() + head = self.git("rev-parse HEAD", self.gitdir).strip() + self.git("checkout -b {}".format(name), self.gitdir) + newrev = self.git_new_commit() + self.git("checkout {}".format(head), self.gitdir) + return newrev + def test_mirror_commit_nonexistent(self): self.make_git_repo() self.d.setVar("SRCREV", "0"*40) @@ -3083,6 +3102,59 @@ class FetchPremirroronlyLocalTest(FetcherTest): with self.assertRaises(bb.fetch2.NetworkAccess): fetcher.download() + def test_mirror_tarball_multiple_branches(self): + """ + test if PREMIRRORS can handle multiple name/branches correctly + both branches have required revisions + """ + self.make_git_repo() + branch1rev = self.git_new_branch("testbranch1") + branch2rev = self.git_new_branch("testbranch2") + self.recipe_url = "git://git.fake.repo/bitbake;branch=testbranch1,testbranch2;protocol=https;name=branch1,branch2" + self.d.setVar("SRCREV_branch1", branch1rev) + self.d.setVar("SRCREV_branch2", branch2rev) + fetcher = bb.fetch.Fetch([self.recipe_url], self.d) + self.assertTrue(os.path.exists(self.mirrorfile), "Mirror file doesn't exist") + fetcher.download() + fetcher.unpack(os.path.join(self.tempdir, "unpacked")) + unpacked = os.path.join(self.tempdir, "unpacked", "git", self.testfilename) + self.assertTrue(os.path.exists(unpacked), "Repo has not been unpackaged properly!") + with open(unpacked, 'r') as f: + content = f.read() + ## We expect to see testbranch1 in the file, not master, not testbranch2 + self.assertTrue(content.find("testbranch1") != -1, "Wrong branch has been checked out!") + + def test_mirror_tarball_multiple_branches_nobranch(self): + """ + test if PREMIRRORS can handle multiple name/branches correctly + Unbalanced name/branches raises ParameterError + """ + self.make_git_repo() + branch1rev = self.git_new_branch("testbranch1") + branch2rev = self.git_new_branch("testbranch2") + self.recipe_url = "git://git.fake.repo/bitbake;branch=testbranch1;protocol=https;name=branch1,branch2" + self.d.setVar("SRCREV_branch1", branch1rev) + self.d.setVar("SRCREV_branch2", branch2rev) + with self.assertRaises(bb.fetch2.ParameterError): + fetcher = bb.fetch.Fetch([self.recipe_url], self.d) + + def test_mirror_tarball_multiple_branches_norev(self): + """ + test if PREMIRRORS can handle multiple name/branches correctly + one of the branches specifies non existing SRCREV + """ + self.make_git_repo() + branch1rev = self.git_new_branch("testbranch1") + branch2rev = self.git_new_branch("testbranch2") + self.recipe_url = "git://git.fake.repo/bitbake;branch=testbranch1,testbranch2;protocol=https;name=branch1,branch2" + self.d.setVar("SRCREV_branch1", branch1rev) + self.d.setVar("SRCREV_branch2", "0"*40) + fetcher = bb.fetch.Fetch([self.recipe_url], self.d) + self.assertTrue(os.path.exists(self.mirrorfile), "Mirror file doesn't exist") + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + + class FetchPremirroronlyNetworkTest(FetcherTest): def setUp(self): diff --git a/poky/bitbake/lib/bb/utils.py b/poky/bitbake/lib/bb/utils.py index b401fa5ec7..61ffad92ce 100644 --- a/poky/bitbake/lib/bb/utils.py +++ b/poky/bitbake/lib/bb/utils.py @@ -50,7 +50,7 @@ def clean_context(): def get_context(): return _context - + def set_context(ctx): _context = ctx @@ -212,8 +212,8 @@ def explode_dep_versions2(s, *, sort=True): inversion = True # This list is based on behavior and supported comparisons from deb, opkg and rpm. # - # Even though =<, <<, ==, !=, =>, and >> may not be supported, - # we list each possibly valid item. + # Even though =<, <<, ==, !=, =>, and >> may not be supported, + # we list each possibly valid item. # The build system is responsible for validation of what it supports. if i.startswith(('<=', '=<', '<<', '==', '!=', '>=', '=>', '>>')): lastcmp = i[0:2] @@ -347,7 +347,7 @@ def _print_exception(t, value, tb, realfile, text, context): exception = traceback.format_exception_only(t, value) error.append('Error executing a python function in %s:\n' % realfile) - # Strip 'us' from the stack (better_exec call) unless that was where the + # Strip 'us' from the stack (better_exec call) unless that was where the # error came from if tb.tb_next is not None: tb = tb.tb_next @@ -746,9 +746,9 @@ def prunedir(topdir, ionice=False): # but thats possibly insane and suffixes is probably going to be small # def prune_suffix(var, suffixes, d): - """ + """ See if var ends with any of the suffixes listed and - remove it if found + remove it if found """ for suffix in suffixes: if suffix and var.endswith(suffix): @@ -1001,9 +1001,9 @@ def umask(new_mask): os.umask(current_mask) def to_boolean(string, default=None): - """ + """ Check input string and return boolean value True/False/None - depending upon the checks + depending upon the checks """ if not string: return default |