summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/prserv/serv.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/prserv/serv.py')
-rw-r--r--poky/bitbake/lib/prserv/serv.py367
1 files changed, 114 insertions, 253 deletions
diff --git a/poky/bitbake/lib/prserv/serv.py b/poky/bitbake/lib/prserv/serv.py
index 25dcf8a0e..5e322bf83 100644
--- a/poky/bitbake/lib/prserv/serv.py
+++ b/poky/bitbake/lib/prserv/serv.py
@@ -5,8 +5,6 @@
import os,sys,logging
import signal, time
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
-import threading
-import queue
import socket
import io
import sqlite3
@@ -14,14 +12,10 @@ import bb.server.xmlrpcclient
import prserv
import prserv.db
import errno
-import select
+import multiprocessing
logger = logging.getLogger("BitBake.PRserv")
-if sys.hexversion < 0x020600F0:
- print("Sorry, python 2.6 or later is required.")
- sys.exit(1)
-
class Handler(SimpleXMLRPCRequestHandler):
def _dispatch(self,method,params):
try:
@@ -37,7 +31,7 @@ singleton = None
class PRServer(SimpleXMLRPCServer):
- def __init__(self, dbfile, logfile, interface, daemon=True):
+ def __init__(self, dbfile, logfile, interface):
''' constructor '''
try:
SimpleXMLRPCServer.__init__(self, interface,
@@ -50,57 +44,18 @@ class PRServer(SimpleXMLRPCServer):
raise PRServiceConfigError
self.dbfile=dbfile
- self.daemon=daemon
self.logfile=logfile
- self.working_thread=None
self.host, self.port = self.socket.getsockname()
- self.pidfile=PIDPREFIX % (self.host, self.port)
self.register_function(self.getPR, "getPR")
- self.register_function(self.quit, "quit")
self.register_function(self.ping, "ping")
self.register_function(self.export, "export")
- self.register_function(self.dump_db, "dump_db")
self.register_function(self.importone, "importone")
self.register_introspection_functions()
- self.quitpipein, self.quitpipeout = os.pipe()
-
- self.requestqueue = queue.Queue()
- self.handlerthread = threading.Thread(target = self.process_request_thread)
- self.handlerthread.daemon = False
-
- def process_request_thread(self):
- """Same as in BaseServer but as a thread.
-
- In addition, exception handling is done here.
-
- """
- iter_count = 1
+ self.iter_count = 0
# 60 iterations between syncs or sync if dirty every ~30 seconds
- iterations_between_sync = 60
-
- bb.utils.set_process_name("PRServ Handler")
-
- while not self.quitflag:
- try:
- (request, client_address) = self.requestqueue.get(True, 30)
- except queue.Empty:
- self.table.sync_if_dirty()
- continue
- if request is None:
- continue
- try:
- self.finish_request(request, client_address)
- self.shutdown_request(request)
- iter_count = (iter_count + 1) % iterations_between_sync
- if iter_count == 0:
- self.table.sync_if_dirty()
- except:
- self.handle_error(request, client_address)
- self.shutdown_request(request)
- self.table.sync()
- self.table.sync_if_dirty()
+ self.iterations_between_sync = 60
def sigint_handler(self, signum, stack):
if self.table:
@@ -109,11 +64,30 @@ class PRServer(SimpleXMLRPCServer):
def sigterm_handler(self, signum, stack):
if self.table:
self.table.sync()
- self.quit()
- self.requestqueue.put((None, None))
+ raise(SystemExit)
def process_request(self, request, client_address):
- self.requestqueue.put((request, client_address))
+ if request is None:
+ return
+ try:
+ self.finish_request(request, client_address)
+ self.shutdown_request(request)
+ self.iter_count = (self.iter_count + 1) % self.iterations_between_sync
+ if self.iter_count == 0:
+ self.table.sync_if_dirty()
+ except:
+ self.handle_error(request, client_address)
+ self.shutdown_request(request)
+ self.table.sync()
+ self.table.sync_if_dirty()
+
+ def serve_forever(self, poll_interval=0.5):
+ signal.signal(signal.SIGINT, self.sigint_handler)
+ signal.signal(signal.SIGTERM, self.sigterm_handler)
+
+ self.db = prserv.db.PRData(self.dbfile)
+ self.table = self.db["PRMAIN"]
+ return super().serve_forever(poll_interval)
def export(self, version=None, pkgarch=None, checksum=None, colinfo=True):
try:
@@ -122,31 +96,11 @@ class PRServer(SimpleXMLRPCServer):
logger.error(str(exc))
return None
- def dump_db(self):
- """
- Returns a script (string) that reconstructs the state of the
- entire database at the time this function is called. The script
- language is defined by the backing database engine, which is a
- function of server configuration.
- Returns None if the database engine does not support dumping to
- script or if some other error is encountered in processing.
- """
- buff = io.StringIO()
- try:
- self.table.sync()
- self.table.dump_db(buff)
- return buff.getvalue()
- except Exception as exc:
- logger.error(str(exc))
- return None
- finally:
- buff.close()
-
def importone(self, version, pkgarch, checksum, value):
return self.table.importone(version, pkgarch, checksum, value)
def ping(self):
- return not self.quitflag
+ return True
def getinfo(self):
return (self.host, self.port)
@@ -161,144 +115,6 @@ class PRServer(SimpleXMLRPCServer):
logger.error(str(exc))
return None
- def quit(self):
- self.quitflag=True
- os.write(self.quitpipeout, b"q")
- os.close(self.quitpipeout)
- return
-
- def work_forever(self,):
- self.quitflag = False
- # This timeout applies to the poll in TCPServer, we need the select
- # below to wake on our quit pipe closing. We only ever call into handle_request
- # if there is data there.
- self.timeout = 0.01
-
- bb.utils.set_process_name("PRServ")
-
- # DB connection must be created after all forks
- self.db = prserv.db.PRData(self.dbfile)
- self.table = self.db["PRMAIN"]
-
- logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
- (self.dbfile, self.host, self.port, str(os.getpid())))
-
- self.handlerthread.start()
- while not self.quitflag:
- ready = select.select([self.fileno(), self.quitpipein], [], [], 30)
- if self.quitflag:
- break
- if self.fileno() in ready[0]:
- self.handle_request()
- self.handlerthread.join()
- self.db.disconnect()
- logger.info("PRServer: stopping...")
- self.server_close()
- os.close(self.quitpipein)
- return
-
- def start(self):
- if self.daemon:
- pid = self.daemonize()
- else:
- pid = self.fork()
- self.pid = pid
-
- # Ensure both the parent sees this and the child from the work_forever log entry above
- logger.info("Started PRServer with DBfile: %s, IP: %s, PORT: %s, PID: %s" %
- (self.dbfile, self.host, self.port, str(pid)))
-
- def delpid(self):
- os.remove(self.pidfile)
-
- def daemonize(self):
- """
- See Advanced Programming in the UNIX, Sec 13.3
- """
- try:
- pid = os.fork()
- if pid > 0:
- os.waitpid(pid, 0)
- #parent return instead of exit to give control
- return pid
- except OSError as e:
- raise Exception("%s [%d]" % (e.strerror, e.errno))
-
- os.setsid()
- """
- fork again to make sure the daemon is not session leader,
- which prevents it from acquiring controlling terminal
- """
- try:
- pid = os.fork()
- if pid > 0: #parent
- os._exit(0)
- except OSError as e:
- raise Exception("%s [%d]" % (e.strerror, e.errno))
-
- self.cleanup_handles()
- os._exit(0)
-
- def fork(self):
- try:
- pid = os.fork()
- if pid > 0:
- self.socket.close() # avoid ResourceWarning in parent
- return pid
- except OSError as e:
- raise Exception("%s [%d]" % (e.strerror, e.errno))
-
- bb.utils.signal_on_parent_exit("SIGTERM")
- self.cleanup_handles()
- os._exit(0)
-
- def cleanup_handles(self):
- signal.signal(signal.SIGINT, self.sigint_handler)
- signal.signal(signal.SIGTERM, self.sigterm_handler)
- os.chdir("/")
-
- sys.stdout.flush()
- sys.stderr.flush()
-
- # We could be called from a python thread with io.StringIO as
- # stdout/stderr or it could be 'real' unix fd forking where we need
- # to physically close the fds to prevent the program launching us from
- # potentially hanging on a pipe. Handle both cases.
- si = open('/dev/null', 'r')
- try:
- os.dup2(si.fileno(),sys.stdin.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stdin = si
- so = open(self.logfile, 'a+')
- try:
- os.dup2(so.fileno(),sys.stdout.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stdout = so
- try:
- os.dup2(so.fileno(),sys.stderr.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stderr = so
-
- # Clear out all log handlers prior to the fork() to avoid calling
- # event handlers not part of the PRserver
- for logger_iter in logging.Logger.manager.loggerDict.keys():
- logging.getLogger(logger_iter).handlers = []
-
- # Ensure logging makes it to the logfile
- streamhandler = logging.StreamHandler()
- streamhandler.setLevel(logging.DEBUG)
- formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
- streamhandler.setFormatter(formatter)
- logger.addHandler(streamhandler)
-
- # write pidfile
- pid = str(os.getpid())
- with open(self.pidfile, 'w') as pf:
- pf.write("%s\n" % pid)
-
- self.work_forever()
- self.delpid()
-
class PRServSingleton(object):
def __init__(self, dbfile, logfile, interface):
self.dbfile = dbfile
@@ -308,8 +124,10 @@ class PRServSingleton(object):
self.port = None
def start(self):
- self.prserv = PRServer(self.dbfile, self.logfile, self.interface, daemon=False)
- self.prserv.start()
+ self.prserv = PRServer(self.dbfile, self.logfile, self.interface)
+ self.process = multiprocessing.Process(target=self.prserv.serve_forever)
+ self.process.start()
+
self.host, self.port = self.prserv.getinfo()
def getinfo(self):
@@ -323,13 +141,6 @@ class PRServerConnection(object):
self.port = port
self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port)
- def terminate(self):
- try:
- logger.info("Terminating PRServer...")
- self.connection.quit()
- except Exception as exc:
- sys.stderr.write("%s\n" % str(exc))
-
def getPR(self, version, pkgarch, checksum):
return self.connection.getPR(version, pkgarch, checksum)
@@ -339,15 +150,82 @@ class PRServerConnection(object):
def export(self,version=None, pkgarch=None, checksum=None, colinfo=True):
return self.connection.export(version, pkgarch, checksum, colinfo)
- def dump_db(self):
- return self.connection.dump_db()
-
def importone(self, version, pkgarch, checksum, value):
return self.connection.importone(version, pkgarch, checksum, value)
def getinfo(self):
return self.host, self.port
+def run_as_daemon(func, pidfile, logfile):
+ """
+ See Advanced Programming in the UNIX, Sec 13.3
+ """
+ try:
+ pid = os.fork()
+ if pid > 0:
+ os.waitpid(pid, 0)
+ #parent return instead of exit to give control
+ return pid
+ except OSError as e:
+ raise Exception("%s [%d]" % (e.strerror, e.errno))
+
+ os.setsid()
+ """
+ fork again to make sure the daemon is not session leader,
+ which prevents it from acquiring controlling terminal
+ """
+ try:
+ pid = os.fork()
+ if pid > 0: #parent
+ os._exit(0)
+ except OSError as e:
+ raise Exception("%s [%d]" % (e.strerror, e.errno))
+
+ os.chdir("/")
+
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ # We could be called from a python thread with io.StringIO as
+ # stdout/stderr or it could be 'real' unix fd forking where we need
+ # to physically close the fds to prevent the program launching us from
+ # potentially hanging on a pipe. Handle both cases.
+ si = open('/dev/null', 'r')
+ try:
+ os.dup2(si.fileno(),sys.stdin.fileno())
+ except (AttributeError, io.UnsupportedOperation):
+ sys.stdin = si
+ so = open(logfile, 'a+')
+ try:
+ os.dup2(so.fileno(),sys.stdout.fileno())
+ except (AttributeError, io.UnsupportedOperation):
+ sys.stdout = so
+ try:
+ os.dup2(so.fileno(),sys.stderr.fileno())
+ except (AttributeError, io.UnsupportedOperation):
+ sys.stderr = so
+
+ # Clear out all log handlers prior to the fork() to avoid calling
+ # event handlers not part of the PRserver
+ for logger_iter in logging.Logger.manager.loggerDict.keys():
+ logging.getLogger(logger_iter).handlers = []
+
+ # Ensure logging makes it to the logfile
+ streamhandler = logging.StreamHandler()
+ streamhandler.setLevel(logging.DEBUG)
+ formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
+ streamhandler.setFormatter(formatter)
+ logger.addHandler(streamhandler)
+
+ # write pidfile
+ pid = str(os.getpid())
+ with open(pidfile, 'w') as pf:
+ pf.write("%s\n" % pid)
+
+ func()
+ os.remove(pidfile)
+ os._exit(0)
+
def start_daemon(dbfile, host, port, logfile):
ip = socket.gethostbyname(host)
pidfile = PIDPREFIX % (ip, port)
@@ -363,7 +241,7 @@ def start_daemon(dbfile, host, port, logfile):
return 1
server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port))
- server.start()
+ run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile))
# Sometimes, the port (i.e. localhost:0) indicated by the user does not match with
# the one the server actually is listening, so at least warn the user about it
@@ -400,25 +278,13 @@ def stop_daemon(host, port):
return 1
try:
- PRServerConnection(ip, port).terminate()
- except:
- logger.critical("Stop PRService %s:%d failed" % (host,port))
-
- try:
- if pid:
- wait_timeout = 0
- print("Waiting for pr-server to exit.")
- while is_running(pid) and wait_timeout < 50:
- time.sleep(0.1)
- wait_timeout += 1
-
- if is_running(pid):
- print("Sending SIGTERM to pr-server.")
- os.kill(pid,signal.SIGTERM)
- time.sleep(0.1)
+ if is_running(pid):
+ print("Sending SIGTERM to pr-server.")
+ os.kill(pid, signal.SIGTERM)
+ time.sleep(0.1)
- if os.path.exists(pidfile):
- os.remove(pidfile)
+ if os.path.exists(pidfile):
+ os.remove(pidfile)
except OSError as e:
err = str(e)
@@ -494,19 +360,14 @@ def auto_start(d):
def auto_shutdown():
global singleton
- if singleton:
- host, port = singleton.getinfo()
- try:
- PRServerConnection(host, port).terminate()
- except:
- logger.critical("Stop PRService %s:%d failed" % (host,port))
-
- try:
- os.waitpid(singleton.prserv.pid, 0)
- except ChildProcessError:
- pass
+ if singleton and singleton.process:
+ singleton.process.terminate()
+ singleton.process.join()
singleton = None
def ping(host, port):
conn=PRServerConnection(host, port)
return conn.ping()
+
+def connect(host, port):
+ return PRServerConnection(host, port)