summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/bb/asyncrpc/serv.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/bb/asyncrpc/serv.py')
-rw-r--r--poky/bitbake/lib/bb/asyncrpc/serv.py149
1 files changed, 106 insertions, 43 deletions
diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py
index ef20cb71d..b4cffff21 100644
--- a/poky/bitbake/lib/bb/asyncrpc/serv.py
+++ b/poky/bitbake/lib/bb/asyncrpc/serv.py
@@ -9,6 +9,7 @@ import os
import signal
import socket
import sys
+import multiprocessing
from . import chunkify, DEFAULT_MAX_CHUNK
@@ -130,53 +131,55 @@ class AsyncServerConnection(object):
class AsyncServer(object):
- def __init__(self, logger, loop=None):
- if loop is None:
- self.loop = asyncio.new_event_loop()
- self.close_loop = True
- else:
- self.loop = loop
- self.close_loop = False
-
+ def __init__(self, logger):
self._cleanup_socket = None
self.logger = logger
+ self.start = None
+ self.address = None
+ self.loop = None
def start_tcp_server(self, host, port):
- self.server = self.loop.run_until_complete(
- asyncio.start_server(self.handle_client, host, port, loop=self.loop)
- )
-
- for s in self.server.sockets:
- self.logger.debug('Listening on %r' % (s.getsockname(),))
- # Newer python does this automatically. Do it manually here for
- # maximum compatibility
- s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
- s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
-
- name = self.server.sockets[0].getsockname()
- if self.server.sockets[0].family == socket.AF_INET6:
- self.address = "[%s]:%d" % (name[0], name[1])
- else:
- self.address = "%s:%d" % (name[0], name[1])
+ def start_tcp():
+ self.server = self.loop.run_until_complete(
+ asyncio.start_server(self.handle_client, host, port)
+ )
+
+ for s in self.server.sockets:
+ self.logger.debug('Listening on %r' % (s.getsockname(),))
+ # Newer python does this automatically. Do it manually here for
+ # maximum compatibility
+ s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
+ s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1)
+
+ name = self.server.sockets[0].getsockname()
+ if self.server.sockets[0].family == socket.AF_INET6:
+ self.address = "[%s]:%d" % (name[0], name[1])
+ else:
+ self.address = "%s:%d" % (name[0], name[1])
+
+ self.start = start_tcp
def start_unix_server(self, path):
def cleanup():
os.unlink(path)
- cwd = os.getcwd()
- try:
- # Work around path length limits in AF_UNIX
- os.chdir(os.path.dirname(path))
- self.server = self.loop.run_until_complete(
- asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop)
- )
- finally:
- os.chdir(cwd)
+ def start_unix():
+ cwd = os.getcwd()
+ try:
+ # Work around path length limits in AF_UNIX
+ os.chdir(os.path.dirname(path))
+ self.server = self.loop.run_until_complete(
+ asyncio.start_unix_server(self.handle_client, os.path.basename(path))
+ )
+ finally:
+ os.chdir(cwd)
- self.logger.debug('Listening on %r' % path)
+ self.logger.debug('Listening on %r' % path)
- self._cleanup_socket = cleanup
- self.address = "unix://%s" % os.path.abspath(path)
+ self._cleanup_socket = cleanup
+ self.address = "unix://%s" % os.path.abspath(path)
+
+ self.start = start_unix
@abc.abstractmethod
def accept_client(self, reader, writer):
@@ -201,12 +204,13 @@ class AsyncServer(object):
pass
def signal_handler(self):
+ self.logger.debug("Got exit signal")
self.loop.stop()
- def serve_forever(self):
- asyncio.set_event_loop(self.loop)
+ def _serve_forever(self):
try:
self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM])
self.run_loop_forever()
self.server.close()
@@ -214,10 +218,69 @@ class AsyncServer(object):
self.loop.run_until_complete(self.server.wait_closed())
self.logger.debug('Server shutting down')
finally:
- if self.close_loop:
- if sys.version_info >= (3, 6):
- self.loop.run_until_complete(self.loop.shutdown_asyncgens())
- self.loop.close()
-
if self._cleanup_socket is not None:
self._cleanup_socket()
+
+ def serve_forever(self):
+ """
+ Serve requests in the current process
+ """
+ # Create loop and override any loop that may have existed in
+ # a parent process. It is possible that the usecases of
+ # serve_forever might be constrained enough to allow using
+ # get_event_loop here, but better safe than sorry for now.
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+ self.start()
+ self._serve_forever()
+
+ def serve_as_process(self, *, prefunc=None, args=()):
+ """
+ Serve requests in a child process
+ """
+ def run(queue):
+ # Create loop and override any loop that may have existed
+ # in a parent process. Without doing this and instead
+ # using get_event_loop, at the very minimum the hashserv
+ # unit tests will hang when running the second test.
+ # This happens since get_event_loop in the spawned server
+ # process for the second testcase ends up with the loop
+ # from the hashserv client created in the unit test process
+ # when running the first testcase. The problem is somewhat
+ # more general, though, as any potential use of asyncio in
+ # Cooker could create a loop that needs to replaced in this
+ # new process.
+ self.loop = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.loop)
+ try:
+ self.start()
+ finally:
+ queue.put(self.address)
+ queue.close()
+
+ if prefunc is not None:
+ prefunc(self, *args)
+
+ self._serve_forever()
+
+ if sys.version_info >= (3, 6):
+ self.loop.run_until_complete(self.loop.shutdown_asyncgens())
+ self.loop.close()
+
+ queue = multiprocessing.Queue()
+
+ # Temporarily block SIGTERM. The server process will inherit this
+ # block which will ensure it doesn't receive the SIGTERM until the
+ # handler is ready for it
+ mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM])
+ try:
+ self.process = multiprocessing.Process(target=run, args=(queue,))
+ self.process.start()
+
+ self.address = queue.get()
+ queue.close()
+ queue.join_thread()
+
+ return self.process
+ finally:
+ signal.pthread_sigmask(signal.SIG_SETMASK, mask)