summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/bb/asyncrpc
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/bb/asyncrpc')
-rw-r--r--poky/bitbake/lib/bb/asyncrpc/client.py11
-rw-r--r--poky/bitbake/lib/bb/asyncrpc/serv.py21
2 files changed, 29 insertions, 3 deletions
diff --git a/poky/bitbake/lib/bb/asyncrpc/client.py b/poky/bitbake/lib/bb/asyncrpc/client.py
index 79919c5be..3eb4fdde8 100644
--- a/poky/bitbake/lib/bb/asyncrpc/client.py
+++ b/poky/bitbake/lib/bb/asyncrpc/client.py
@@ -11,13 +11,14 @@ from . import chunkify, DEFAULT_MAX_CHUNK
class AsyncClient(object):
- def __init__(self, proto_name, proto_version, logger):
+ def __init__(self, proto_name, proto_version, logger, timeout=30):
self.reader = None
self.writer = None
self.max_chunk = DEFAULT_MAX_CHUNK
self.proto_name = proto_name
self.proto_version = proto_version
self.logger = logger
+ self.timeout = timeout
async def connect_tcp(self, address, port):
async def connect_sock():
@@ -70,14 +71,18 @@ class AsyncClient(object):
async def send_message(self, msg):
async def get_line():
- line = await self.reader.readline()
+ try:
+ line = await asyncio.wait_for(self.reader.readline(), self.timeout)
+ except asyncio.TimeoutError:
+ raise ConnectionError("Timed out waiting for server")
+
if not line:
raise ConnectionError("Connection closed")
line = line.decode("utf-8")
if not line.endswith("\n"):
- raise ConnectionError("Bad message %r" % msg)
+ raise ConnectionError("Bad message %r" % (line))
return line
diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py
index ef20cb71d..4084f300d 100644
--- a/poky/bitbake/lib/bb/asyncrpc/serv.py
+++ b/poky/bitbake/lib/bb/asyncrpc/serv.py
@@ -9,6 +9,7 @@ import os
import signal
import socket
import sys
+import multiprocessing
from . import chunkify, DEFAULT_MAX_CHUNK
@@ -201,12 +202,14 @@ class AsyncServer(object):
pass
def signal_handler(self):
+ self.logger.debug("Got exit signal")
self.loop.stop()
def serve_forever(self):
asyncio.set_event_loop(self.loop)
try:
self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
self.run_loop_forever()
self.server.close()
@@ -221,3 +224,21 @@ class AsyncServer(object):
if self._cleanup_socket is not None:
self._cleanup_socket()
+
+ def serve_as_process(self, *, prefunc=None, args=()):
+ def run():
+ if prefunc is not None:
+ prefunc(self, *args)
+ self.serve_forever()
+
+ # Temporarily block SIGTERM. The server process will inherit this
+ # block which will ensure it doesn't receive the SIGTERM until the
+ # handler is ready for it
+ mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
+ try:
+ self.process = multiprocessing.Process(target=run)
+ self.process.start()
+
+ return self.process
+ finally:
+ signal.pthread_sigmask(signal.SIG_SETMASK, mask)