diff options
Diffstat (limited to 'poky/bitbake/lib/prserv/serv.py')
-rw-r--r-- | poky/bitbake/lib/prserv/serv.py | 367 |
1 files changed, 114 insertions, 253 deletions
diff --git a/poky/bitbake/lib/prserv/serv.py b/poky/bitbake/lib/prserv/serv.py index 25dcf8a0e..5e322bf83 100644 --- a/poky/bitbake/lib/prserv/serv.py +++ b/poky/bitbake/lib/prserv/serv.py @@ -5,8 +5,6 @@ import os,sys,logging import signal, time from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler -import threading -import queue import socket import io import sqlite3 @@ -14,14 +12,10 @@ import bb.server.xmlrpcclient import prserv import prserv.db import errno -import select +import multiprocessing logger = logging.getLogger("BitBake.PRserv") -if sys.hexversion < 0x020600F0: - print("Sorry, python 2.6 or later is required.") - sys.exit(1) - class Handler(SimpleXMLRPCRequestHandler): def _dispatch(self,method,params): try: @@ -37,7 +31,7 @@ singleton = None class PRServer(SimpleXMLRPCServer): - def __init__(self, dbfile, logfile, interface, daemon=True): + def __init__(self, dbfile, logfile, interface): ''' constructor ''' try: SimpleXMLRPCServer.__init__(self, interface, @@ -50,57 +44,18 @@ class PRServer(SimpleXMLRPCServer): raise PRServiceConfigError self.dbfile=dbfile - self.daemon=daemon self.logfile=logfile - self.working_thread=None self.host, self.port = self.socket.getsockname() - self.pidfile=PIDPREFIX % (self.host, self.port) self.register_function(self.getPR, "getPR") - self.register_function(self.quit, "quit") self.register_function(self.ping, "ping") self.register_function(self.export, "export") - self.register_function(self.dump_db, "dump_db") self.register_function(self.importone, "importone") self.register_introspection_functions() - self.quitpipein, self.quitpipeout = os.pipe() - - self.requestqueue = queue.Queue() - self.handlerthread = threading.Thread(target = self.process_request_thread) - self.handlerthread.daemon = False - - def process_request_thread(self): - """Same as in BaseServer but as a thread. - - In addition, exception handling is done here. - - """ - iter_count = 1 + self.iter_count = 0 # 60 iterations between syncs or sync if dirty every ~30 seconds - iterations_between_sync = 60 - - bb.utils.set_process_name("PRServ Handler") - - while not self.quitflag: - try: - (request, client_address) = self.requestqueue.get(True, 30) - except queue.Empty: - self.table.sync_if_dirty() - continue - if request is None: - continue - try: - self.finish_request(request, client_address) - self.shutdown_request(request) - iter_count = (iter_count + 1) % iterations_between_sync - if iter_count == 0: - self.table.sync_if_dirty() - except: - self.handle_error(request, client_address) - self.shutdown_request(request) - self.table.sync() - self.table.sync_if_dirty() + self.iterations_between_sync = 60 def sigint_handler(self, signum, stack): if self.table: @@ -109,11 +64,30 @@ class PRServer(SimpleXMLRPCServer): def sigterm_handler(self, signum, stack): if self.table: self.table.sync() - self.quit() - self.requestqueue.put((None, None)) + raise(SystemExit) def process_request(self, request, client_address): - self.requestqueue.put((request, client_address)) + if request is None: + return + try: + self.finish_request(request, client_address) + self.shutdown_request(request) + self.iter_count = (self.iter_count + 1) % self.iterations_between_sync + if self.iter_count == 0: + self.table.sync_if_dirty() + except: + self.handle_error(request, client_address) + self.shutdown_request(request) + self.table.sync() + self.table.sync_if_dirty() + + def serve_forever(self, poll_interval=0.5): + signal.signal(signal.SIGINT, self.sigint_handler) + signal.signal(signal.SIGTERM, self.sigterm_handler) + + self.db = prserv.db.PRData(self.dbfile) + self.table = self.db["PRMAIN"] + return super().serve_forever(poll_interval) def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): try: @@ -122,31 +96,11 @@ class PRServer(SimpleXMLRPCServer): logger.error(str(exc)) return None - def dump_db(self): - """ - Returns a script (string) that reconstructs the state of the - entire database at the time this function is called. The script - language is defined by the backing database engine, which is a - function of server configuration. - Returns None if the database engine does not support dumping to - script or if some other error is encountered in processing. - """ - buff = io.StringIO() - try: - self.table.sync() - self.table.dump_db(buff) - return buff.getvalue() - except Exception as exc: - logger.error(str(exc)) - return None - finally: - buff.close() - def importone(self, version, pkgarch, checksum, value): return self.table.importone(version, pkgarch, checksum, value) def ping(self): - return not self.quitflag + return True def getinfo(self): return (self.host, self.port) @@ -161,144 +115,6 @@ class PRServer(SimpleXMLRPCServer): logger.error(str(exc)) return None - def quit(self): - self.quitflag=True - os.write(self.quitpipeout, b"q") - os.close(self.quitpipeout) - return - - def work_forever(self,): - self.quitflag = False - # This timeout applies to the poll in TCPServer, we need the select - # below to wake on our quit pipe closing. We only ever call into handle_request - # if there is data there. - self.timeout = 0.01 - - bb.utils.set_process_name("PRServ") - - # DB connection must be created after all forks - self.db = prserv.db.PRData(self.dbfile) - self.table = self.db["PRMAIN"] - - logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % - (self.dbfile, self.host, self.port, str(os.getpid()))) - - self.handlerthread.start() - while not self.quitflag: - ready = select.select([self.fileno(), self.quitpipein], [], [], 30) - if self.quitflag: - break - if self.fileno() in ready[0]: - self.handle_request() - self.handlerthread.join() - self.db.disconnect() - logger.info("PRServer: stopping...") - self.server_close() - os.close(self.quitpipein) - return - - def start(self): - if self.daemon: - pid = self.daemonize() - else: - pid = self.fork() - self.pid = pid - - # Ensure both the parent sees this and the child from the work_forever log entry above - logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" % - (self.dbfile, self.host, self.port, str(pid))) - - def delpid(self): - os.remove(self.pidfile) - - def daemonize(self): - """ - See Advanced Programming in the UNIX, Sec 13.3 - """ - try: - pid = os.fork() - if pid > 0: - os.waitpid(pid, 0) - #parent return instead of exit to give control - return pid - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - os.setsid() - """ - fork again to make sure the daemon is not session leader, - which prevents it from acquiring controlling terminal - """ - try: - pid = os.fork() - if pid > 0: #parent - os._exit(0) - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - self.cleanup_handles() - os._exit(0) - - def fork(self): - try: - pid = os.fork() - if pid > 0: - self.socket.close() # avoid ResourceWarning in parent - return pid - except OSError as e: - raise Exception("%s [%d]" % (e.strerror, e.errno)) - - bb.utils.signal_on_parent_exit("SIGTERM") - self.cleanup_handles() - os._exit(0) - - def cleanup_handles(self): - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) - os.chdir("/") - - sys.stdout.flush() - sys.stderr.flush() - - # We could be called from a python thread with io.StringIO as - # stdout/stderr or it could be 'real' unix fd forking where we need - # to physically close the fds to prevent the program launching us from - # potentially hanging on a pipe. Handle both cases. - si = open('/dev/null', 'r') - try: - os.dup2(si.fileno(),sys.stdin.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stdin = si - so = open(self.logfile, 'a+') - try: - os.dup2(so.fileno(),sys.stdout.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stdout = so - try: - os.dup2(so.fileno(),sys.stderr.fileno()) - except (AttributeError, io.UnsupportedOperation): - sys.stderr = so - - # Clear out all log handlers prior to the fork() to avoid calling - # event handlers not part of the PRserver - for logger_iter in logging.Logger.manager.loggerDict.keys(): - logging.getLogger(logger_iter).handlers = [] - - # Ensure logging makes it to the logfile - streamhandler = logging.StreamHandler() - streamhandler.setLevel(logging.DEBUG) - formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") - streamhandler.setFormatter(formatter) - logger.addHandler(streamhandler) - - # write pidfile - pid = str(os.getpid()) - with open(self.pidfile, 'w') as pf: - pf.write("%s\n" % pid) - - self.work_forever() - self.delpid() - class PRServSingleton(object): def __init__(self, dbfile, logfile, interface): self.dbfile = dbfile @@ -308,8 +124,10 @@ class PRServSingleton(object): self.port = None def start(self): - self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False) - self.prserv.start() + self.prserv = PRServer(self.dbfile, self.logfile, self.interface) + self.process = multiprocessing.Process(target=self.prserv.serve_forever) + self.process.start() + self.host, self.port = self.prserv.getinfo() def getinfo(self): @@ -323,13 +141,6 @@ class PRServerConnection(object): self.port = port self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) - def terminate(self): - try: - logger.info("Terminating PRServer...") - self.connection.quit() - except Exception as exc: - sys.stderr.write("%s\n" % str(exc)) - def getPR(self, version, pkgarch, checksum): return self.connection.getPR(version, pkgarch, checksum) @@ -339,15 +150,82 @@ class PRServerConnection(object): def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): return self.connection.export(version, pkgarch, checksum, colinfo) - def dump_db(self): - return self.connection.dump_db() - def importone(self, version, pkgarch, checksum, value): return self.connection.importone(version, pkgarch, checksum, value) def getinfo(self): return self.host, self.port +def run_as_daemon(func, pidfile, logfile): + """ + See Advanced Programming in the UNIX, Sec 13.3 + """ + try: + pid = os.fork() + if pid > 0: + os.waitpid(pid, 0) + #parent return instead of exit to give control + return pid + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + os.setsid() + """ + fork again to make sure the daemon is not session leader, + which prevents it from acquiring controlling terminal + """ + try: + pid = os.fork() + if pid > 0: #parent + os._exit(0) + except OSError as e: + raise Exception("%s [%d]" % (e.strerror, e.errno)) + + os.chdir("/") + + sys.stdout.flush() + sys.stderr.flush() + + # We could be called from a python thread with io.StringIO as + # stdout/stderr or it could be 'real' unix fd forking where we need + # to physically close the fds to prevent the program launching us from + # potentially hanging on a pipe. Handle both cases. + si = open('/dev/null', 'r') + try: + os.dup2(si.fileno(),sys.stdin.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stdin = si + so = open(logfile, 'a+') + try: + os.dup2(so.fileno(),sys.stdout.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stdout = so + try: + os.dup2(so.fileno(),sys.stderr.fileno()) + except (AttributeError, io.UnsupportedOperation): + sys.stderr = so + + # Clear out all log handlers prior to the fork() to avoid calling + # event handlers not part of the PRserver + for logger_iter in logging.Logger.manager.loggerDict.keys(): + logging.getLogger(logger_iter).handlers = [] + + # Ensure logging makes it to the logfile + streamhandler = logging.StreamHandler() + streamhandler.setLevel(logging.DEBUG) + formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") + streamhandler.setFormatter(formatter) + logger.addHandler(streamhandler) + + # write pidfile + pid = str(os.getpid()) + with open(pidfile, 'w') as pf: + pf.write("%s\n" % pid) + + func() + os.remove(pidfile) + os._exit(0) + def start_daemon(dbfile, host, port, logfile): ip = socket.gethostbyname(host) pidfile = PIDPREFIX % (ip, port) @@ -363,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile): return 1 server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) - server.start() + run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with # the one the server actually is listening, so at least warn the user about it @@ -400,25 +278,13 @@ def stop_daemon(host, port): return 1 try: - PRServerConnection(ip, port).terminate() - except: - logger.critical("Stop PRService %s:%d failed" % (host,port)) - - try: - if pid: - wait_timeout = 0 - print("Waiting for pr-server to exit.") - while is_running(pid) and wait_timeout < 50: - time.sleep(0.1) - wait_timeout += 1 - - if is_running(pid): - print("Sending SIGTERM to pr-server.") - os.kill(pid,signal.SIGTERM) - time.sleep(0.1) + if is_running(pid): + print("Sending SIGTERM to pr-server.") + os.kill(pid, signal.SIGTERM) + time.sleep(0.1) - if os.path.exists(pidfile): - os.remove(pidfile) + if os.path.exists(pidfile): + os.remove(pidfile) except OSError as e: err = str(e) @@ -494,19 +360,14 @@ def auto_start(d): def auto_shutdown(): global singleton - if singleton: - host, port = singleton.getinfo() - try: - PRServerConnection(host, port).terminate() - except: - logger.critical("Stop PRService %s:%d failed" % (host,port)) - - try: - os.waitpid(singleton.prserv.pid, 0) - except ChildProcessError: - pass + if singleton and singleton.process: + singleton.process.terminate() + singleton.process.join() singleton = None def ping(host, port): conn=PRServerConnection(host, port) return conn.ping() + +def connect(host, port): + return PRServerConnection(host, port) |