diff options
Diffstat (limited to 'poky/bitbake/lib/bb/asyncrpc')
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/client.py | 11 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/serv.py | 21 |
2 files changed, 29 insertions, 3 deletions
diff --git a/poky/bitbake/lib/bb/asyncrpc/client.py b/poky/bitbake/lib/bb/asyncrpc/client.py index 79919c5be..3eb4fdde8 100644 --- a/poky/bitbake/lib/bb/asyncrpc/client.py +++ b/poky/bitbake/lib/bb/asyncrpc/client.py @@ -11,13 +11,14 @@ from . import chunkify, DEFAULT_MAX_CHUNK class AsyncClient(object): - def __init__(self, proto_name, proto_version, logger): + def __init__(self, proto_name, proto_version, logger, timeout=30): self.reader = None self.writer = None self.max_chunk = DEFAULT_MAX_CHUNK self.proto_name = proto_name self.proto_version = proto_version self.logger = logger + self.timeout = timeout async def connect_tcp(self, address, port): async def connect_sock(): @@ -70,14 +71,18 @@ class AsyncClient(object): async def send_message(self, msg): async def get_line(): - line = await self.reader.readline() + try: + line = await asyncio.wait_for(self.reader.readline(), self.timeout) + except asyncio.TimeoutError: + raise ConnectionError("Timed out waiting for server") + if not line: raise ConnectionError("Connection closed") line = line.decode("utf-8") if not line.endswith("\n"): - raise ConnectionError("Bad message %r" % msg) + raise ConnectionError("Bad message %r" % (line)) return line diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py index ef20cb71d..4084f300d 100644 --- a/poky/bitbake/lib/bb/asyncrpc/serv.py +++ b/poky/bitbake/lib/bb/asyncrpc/serv.py @@ -9,6 +9,7 @@ import os import signal import socket import sys +import multiprocessing from . import chunkify, DEFAULT_MAX_CHUNK @@ -201,12 +202,14 @@ class AsyncServer(object): pass def signal_handler(self): + self.logger.debug("Got exit signal") self.loop.stop() def serve_forever(self): asyncio.set_event_loop(self.loop) try: self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) self.run_loop_forever() self.server.close() @@ -221,3 +224,21 @@ class AsyncServer(object): if self._cleanup_socket is not None: self._cleanup_socket() + + def serve_as_process(self, *, prefunc=None, args=()): + def run(): + if prefunc is not None: + prefunc(self, *args) + self.serve_forever() + + # Temporarily block SIGTERM. The server process will inherit this + # block which will ensure it doesn't receive the SIGTERM until the + # handler is ready for it + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM]) + try: + self.process = multiprocessing.Process(target=run) + self.process.start() + + return self.process + finally: + signal.pthread_sigmask(signal.SIG_SETMASK, mask) |