diff options
Diffstat (limited to 'poky/bitbake/lib/prserv')
-rw-r--r-- | poky/bitbake/lib/prserv/client.py | 48 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/db.py | 65 | ||||
-rw-r--r-- | poky/bitbake/lib/prserv/serv.py | 257 |
3 files changed, 221 insertions, 149 deletions
diff --git a/poky/bitbake/lib/prserv/client.py b/poky/bitbake/lib/prserv/client.py new file mode 100644 index 000000000..a3f19ddaf --- /dev/null +++ b/poky/bitbake/lib/prserv/client.py @@ -0,0 +1,48 @@ +# +# SPDX-License-Identifier: GPL-2.0-only +# + +import logging +import bb.asyncrpc + +logger = logging.getLogger("BitBake.PRserv") + +class PRAsyncClient(bb.asyncrpc.AsyncClient): + def __init__(self): + super().__init__('PRSERVICE', '1.0', logger) + + async def getPR(self, version, pkgarch, checksum): + response = await self.send_message( + {'get-pr': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum}} + ) + if response: + return response['value'] + + async def importone(self, version, pkgarch, checksum, value): + response = await self.send_message( + {'import-one': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'value': value}} + ) + if response: + return response['value'] + + async def export(self, version, pkgarch, checksum, colinfo): + response = await self.send_message( + {'export': {'version': version, 'pkgarch': pkgarch, 'checksum': checksum, 'colinfo': colinfo}} + ) + if response: + return (response['metainfo'], response['datainfo']) + + async def is_readonly(self): + response = await self.send_message( + {'is-readonly': {}} + ) + if response: + return response['readonly'] + +class PRClient(bb.asyncrpc.Client): + def __init__(self): + super().__init__() + self._add_methods('getPR', 'importone', 'export', 'is_readonly') + + def _get_async_client(self): + return PRAsyncClient() diff --git a/poky/bitbake/lib/prserv/db.py b/poky/bitbake/lib/prserv/db.py index cb2a2461e..2710d4a22 100644 --- a/poky/bitbake/lib/prserv/db.py +++ b/poky/bitbake/lib/prserv/db.py @@ -30,21 +30,29 @@ if sqlversion[0] < 3 or (sqlversion[0] == 3 and sqlversion[1] < 3): # class PRTable(object): - def __init__(self, conn, table, nohist): + def __init__(self, conn, table, nohist, read_only): self.conn = conn self.nohist = nohist + self.read_only = read_only self.dirty = False if nohist: self.table = "%s_nohist" % table else: self.table = "%s_hist" % table - self._execute("CREATE TABLE IF NOT EXISTS %s \ - (version TEXT NOT NULL, \ - pkgarch TEXT NOT NULL, \ - checksum TEXT NOT NULL, \ - value INTEGER, \ - PRIMARY KEY (version, pkgarch, checksum));" % self.table) + if self.read_only: + table_exists = self._execute( + "SELECT count(*) FROM sqlite_master \ + WHERE type='table' AND name='%s'" % (self.table)) + if not table_exists: + raise prserv.NotFoundError + else: + self._execute("CREATE TABLE IF NOT EXISTS %s \ + (version TEXT NOT NULL, \ + pkgarch TEXT NOT NULL, \ + checksum TEXT NOT NULL, \ + value INTEGER, \ + PRIMARY KEY (version, pkgarch, checksum));" % self.table) def _execute(self, *query): """Execute a query, waiting to acquire a lock if necessary""" @@ -59,8 +67,9 @@ class PRTable(object): raise exc def sync(self): - self.conn.commit() - self._execute("BEGIN EXCLUSIVE TRANSACTION") + if not self.read_only: + self.conn.commit() + self._execute("BEGIN EXCLUSIVE TRANSACTION") def sync_if_dirty(self): if self.dirty: @@ -75,6 +84,15 @@ class PRTable(object): return row[0] else: #no value found, try to insert + if self.read_only: + data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table), + (version, pkgarch)) + row = data.fetchone() + if row is not None: + return row[0] + else: + return 0 + try: self._execute("INSERT INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" % (self.table,self.table), @@ -103,6 +121,15 @@ class PRTable(object): return row[0] else: #no value found, try to insert + if self.read_only: + data = self._execute("SELECT ifnull(max(value)+1,0) FROM %s where version=? AND pkgarch=?;" % (self.table), + (version, pkgarch)) + row = data.fetchone() + if row is not None: + return row[0] + else: + return 0 + try: self._execute("INSERT OR REPLACE INTO %s VALUES (?, ?, ?, (select ifnull(max(value)+1,0) from %s where version=? AND pkgarch=?));" % (self.table,self.table), @@ -128,6 +155,9 @@ class PRTable(object): return self._getValueHist(version, pkgarch, checksum) def _importHist(self, version, pkgarch, checksum, value): + if self.read_only: + return None + val = None data = self._execute("SELECT value FROM %s WHERE version=? AND pkgarch=? AND checksum=?;" % self.table, (version, pkgarch, checksum)) @@ -152,6 +182,9 @@ class PRTable(object): return val def _importNohist(self, version, pkgarch, checksum, value): + if self.read_only: + return None + try: #try to insert self._execute("INSERT INTO %s VALUES (?, ?, ?, ?);" % (self.table), @@ -245,19 +278,23 @@ class PRTable(object): class PRData(object): """Object representing the PR database""" - def __init__(self, filename, nohist=True): + def __init__(self, filename, nohist=True, read_only=False): self.filename=os.path.abspath(filename) self.nohist=nohist + self.read_only = read_only #build directory hierarchy try: os.makedirs(os.path.dirname(self.filename)) except OSError as e: if e.errno != errno.EEXIST: raise e - self.connection=sqlite3.connect(self.filename, isolation_level="EXCLUSIVE", check_same_thread = False) + uri = "file:%s%s" % (self.filename, "?mode=ro" if self.read_only else "") + logger.debug("Opening PRServ database '%s'" % (uri)) + self.connection=sqlite3.connect(uri, uri=True, isolation_level="EXCLUSIVE", check_same_thread = False) self.connection.row_factory=sqlite3.Row - self.connection.execute("pragma synchronous = off;") - self.connection.execute("PRAGMA journal_mode = MEMORY;") + if not self.read_only: + self.connection.execute("pragma synchronous = off;") + self.connection.execute("PRAGMA journal_mode = MEMORY;") self._tables={} def disconnect(self): @@ -270,7 +307,7 @@ class PRData(object): if tblname in self._tables: return self._tables[tblname] else: - tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist) + tableobj = self._tables[tblname] = PRTable(self.connection, tblname, self.nohist, self.read_only) return tableobj def __delitem__(self, tblname): diff --git a/poky/bitbake/lib/prserv/serv.py b/poky/bitbake/lib/prserv/serv.py index 5e322bf83..0a20b927c 100644 --- a/poky/bitbake/lib/prserv/serv.py +++ b/poky/bitbake/lib/prserv/serv.py @@ -4,157 +4,135 @@ import os,sys,logging import signal, time -from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler import socket import io import sqlite3 -import bb.server.xmlrpcclient import prserv import prserv.db import errno -import multiprocessing +import bb.asyncrpc logger = logging.getLogger("BitBake.PRserv") -class Handler(SimpleXMLRPCRequestHandler): - def _dispatch(self,method,params): - try: - value=self.server.funcs[method](*params) - except: - import traceback - traceback.print_exc() - raise - return value - PIDPREFIX = "/tmp/PRServer_%s_%s.pid" singleton = None - -class PRServer(SimpleXMLRPCServer): - def __init__(self, dbfile, logfile, interface): - ''' constructor ''' +class PRServerClient(bb.asyncrpc.AsyncServerConnection): + def __init__(self, reader, writer, table, read_only): + super().__init__(reader, writer, 'PRSERVICE', logger) + self.handlers.update({ + 'get-pr': self.handle_get_pr, + 'import-one': self.handle_import_one, + 'export': self.handle_export, + 'is-readonly': self.handle_is_readonly, + }) + self.table = table + self.read_only = read_only + + def validate_proto_version(self): + return (self.proto_version == (1, 0)) + + async def dispatch_message(self, msg): try: - SimpleXMLRPCServer.__init__(self, interface, - logRequests=False, allow_none=True) - except socket.error: - ip=socket.gethostbyname(interface[0]) - port=interface[1] - msg="PR Server unable to bind to %s:%s\n" % (ip, port) - sys.stderr.write(msg) - raise PRServiceConfigError - - self.dbfile=dbfile - self.logfile=logfile - self.host, self.port = self.socket.getsockname() - - self.register_function(self.getPR, "getPR") - self.register_function(self.ping, "ping") - self.register_function(self.export, "export") - self.register_function(self.importone, "importone") - self.register_introspection_functions() - - self.iter_count = 0 - # 60 iterations between syncs or sync if dirty every ~30 seconds - self.iterations_between_sync = 60 - - def sigint_handler(self, signum, stack): - if self.table: - self.table.sync() - - def sigterm_handler(self, signum, stack): - if self.table: - self.table.sync() - raise(SystemExit) - - def process_request(self, request, client_address): - if request is None: - return - try: - self.finish_request(request, client_address) - self.shutdown_request(request) - self.iter_count = (self.iter_count + 1) % self.iterations_between_sync - if self.iter_count == 0: - self.table.sync_if_dirty() + await super().dispatch_message(msg) except: - self.handle_error(request, client_address) - self.shutdown_request(request) self.table.sync() - self.table.sync_if_dirty() + raise - def serve_forever(self, poll_interval=0.5): - signal.signal(signal.SIGINT, self.sigint_handler) - signal.signal(signal.SIGTERM, self.sigterm_handler) + self.table.sync_if_dirty() - self.db = prserv.db.PRData(self.dbfile) - self.table = self.db["PRMAIN"] - return super().serve_forever(poll_interval) + async def handle_get_pr(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] - def export(self, version=None, pkgarch=None, checksum=None, colinfo=True): + response = None try: - return self.table.export(version, pkgarch, checksum, colinfo) + value = self.table.getValue(version, pkgarch, checksum) + response = {'value': value} + except prserv.NotFoundError: + logger.error("can not find value for (%s, %s)",version, checksum) except sqlite3.Error as exc: logger.error(str(exc)) - return None - def importone(self, version, pkgarch, checksum, value): - return self.table.importone(version, pkgarch, checksum, value) + self.write_message(response) - def ping(self): - return True + async def handle_import_one(self, request): + response = None + if not self.read_only: + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + value = request['value'] - def getinfo(self): - return (self.host, self.port) + value = self.table.importone(version, pkgarch, checksum, value) + if value is not None: + response = {'value': value} + + self.write_message(response) + + async def handle_export(self, request): + version = request['version'] + pkgarch = request['pkgarch'] + checksum = request['checksum'] + colinfo = request['colinfo'] - def getPR(self, version, pkgarch, checksum): try: - return self.table.getValue(version, pkgarch, checksum) - except prserv.NotFoundError: - logger.error("can not find value for (%s, %s)",version, checksum) - return None + (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) except sqlite3.Error as exc: logger.error(str(exc)) - return None + metainfo = datainfo = None -class PRServSingleton(object): - def __init__(self, dbfile, logfile, interface): + response = {'metainfo': metainfo, 'datainfo': datainfo} + self.write_message(response) + + async def handle_is_readonly(self, request): + response = {'readonly': self.read_only} + self.write_message(response) + +class PRServer(bb.asyncrpc.AsyncServer): + def __init__(self, dbfile, read_only=False): + super().__init__(logger) self.dbfile = dbfile - self.logfile = logfile - self.interface = interface - self.host = None - self.port = None + self.table = None + self.read_only = read_only - def start(self): - self.prserv = PRServer(self.dbfile, self.logfile, self.interface) - self.process = multiprocessing.Process(target=self.prserv.serve_forever) - self.process.start() + def accept_client(self, reader, writer): + return PRServerClient(reader, writer, self.table, self.read_only) - self.host, self.port = self.prserv.getinfo() + def _serve_forever(self): + self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) + self.table = self.db["PRMAIN"] - def getinfo(self): - return (self.host, self.port) + logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % + (self.dbfile, self.address, str(os.getpid()))) -class PRServerConnection(object): - def __init__(self, host, port): - if is_local_special(host, port): - host, port = singleton.getinfo() - self.host = host - self.port = port - self.connection, self.transport = bb.server.xmlrpcclient._create_server(self.host, self.port) + super()._serve_forever() - def getPR(self, version, pkgarch, checksum): - return self.connection.getPR(version, pkgarch, checksum) + self.table.sync_if_dirty() + self.db.disconnect() - def ping(self): - return self.connection.ping() + def signal_handler(self): + super().signal_handler() + if self.table: + self.table.sync() - def export(self,version=None, pkgarch=None, checksum=None, colinfo=True): - return self.connection.export(version, pkgarch, checksum, colinfo) +class PRServSingleton(object): + def __init__(self, dbfile, logfile, host, port): + self.dbfile = dbfile + self.logfile = logfile + self.host = host + self.port = port - def importone(self, version, pkgarch, checksum, value): - return self.connection.importone(version, pkgarch, checksum, value) + def start(self): + self.prserv = PRServer(self.dbfile) + self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) + self.process = self.prserv.serve_as_process() - def getinfo(self): - return self.host, self.port + if not self.prserv.address: + raise PRServiceConfigError + if not self.port: + self.port = int(self.prserv.address.rsplit(':', 1)[1]) def run_as_daemon(func, pidfile, logfile): """ @@ -226,7 +204,7 @@ def run_as_daemon(func, pidfile, logfile): os.remove(pidfile) os._exit(0) -def start_daemon(dbfile, host, port, logfile): +def start_daemon(dbfile, host, port, logfile, read_only=False): ip = socket.gethostbyname(host) pidfile = PIDPREFIX % (ip, port) try: @@ -240,15 +218,13 @@ def start_daemon(dbfile, host, port, logfile): % pidfile) return 1 - server = PRServer(os.path.abspath(dbfile), os.path.abspath(logfile), (ip,port)) - run_as_daemon(server.serve_forever, pidfile, os.path.abspath(logfile)) + dbfile = os.path.abspath(dbfile) + def daemon_main(): + server = PRServer(dbfile, read_only=read_only) + server.start_tcp_server(ip, port) + server.serve_forever() - # Sometimes, the port (i.e. localhost:0) indicated by the user does not match with - # the one the server actually is listening, so at least warn the user about it - _,rport = server.getinfo() - if port != rport: - sys.stdout.write("Server is listening at port %s instead of %s\n" - % (rport,port)) + run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) return 0 def stop_daemon(host, port): @@ -302,7 +278,7 @@ def is_running(pid): return True def is_local_special(host, port): - if host.strip().upper() == 'localhost'.upper() and (not port): + if (host == 'localhost' or host == '127.0.0.1') and not port: return True else: return False @@ -326,7 +302,9 @@ def auto_start(d): 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) raise PRServiceConfigError - if is_local_special(host_params[0], int(host_params[1])): + host = host_params[0].strip().lower() + port = int(host_params[1]) + if is_local_special(host, port): import bb.utils cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) if not cachedir: @@ -340,20 +318,16 @@ def auto_start(d): auto_shutdown() if not singleton: bb.utils.mkdirhier(cachedir) - singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), ("localhost",0)) + singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) singleton.start() if singleton: - host, port = singleton.getinfo() - else: - host = host_params[0] - port = int(host_params[1]) + host = singleton.host + port = singleton.port try: - connection = PRServerConnection(host,port) - connection.ping() - realhost, realport = connection.getinfo() - return str(realhost) + ":" + str(realport) - + ping(host, port) + return str(host) + ":" + str(port) + except Exception: logger.critical("PRservice %s:%d not available" % (host, port)) raise PRServiceConfigError @@ -366,8 +340,21 @@ def auto_shutdown(): singleton = None def ping(host, port): - conn=PRServerConnection(host, port) + from . import client + + conn = client.PRClient() + conn.connect_tcp(host, port) return conn.ping() def connect(host, port): - return PRServerConnection(host, port) + from . import client + + global singleton + + if host.strip().lower() == 'localhost' and not port: + host = 'localhost' + port = singleton.port + + conn = client.PRClient() + conn.connect_tcp(host, port) + return conn |