diff options
Diffstat (limited to 'poky/bitbake/lib/hashserv')
-rw-r--r-- | poky/bitbake/lib/hashserv/__init__.py | 118 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/tests.py | 15 |
2 files changed, 111 insertions, 22 deletions
diff --git a/poky/bitbake/lib/hashserv/__init__.py b/poky/bitbake/lib/hashserv/__init__.py index fdc9ced9f..eb03c3221 100644 --- a/poky/bitbake/lib/hashserv/__init__.py +++ b/poky/bitbake/lib/hashserv/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 Garmin Ltd. +# Copyright (C) 2018-2019 Garmin Ltd. # # SPDX-License-Identifier: GPL-2.0-only # @@ -10,6 +10,12 @@ import sqlite3 import json import traceback import logging +import socketserver +import queue +import threading +import signal +import socket +import struct from datetime import datetime logger = logging.getLogger('hashserv') @@ -18,8 +24,17 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): def log_message(self, f, *args): logger.debug(f, *args) + def opendb(self): + self.db = sqlite3.connect(self.dbname) + self.db.row_factory = sqlite3.Row + self.db.execute("PRAGMA synchronous = OFF;") + self.db.execute("PRAGMA journal_mode = MEMORY;") + def do_GET(self): try: + if not self.db: + self.opendb() + p = urllib.parse.urlparse(self.path) if p.path != self.prefix + '/v1/equivalent': @@ -32,7 +47,7 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): d = None with contextlib.closing(self.db.cursor()) as cursor: - cursor.execute('SELECT taskhash, method, unihash FROM tasks_v1 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1', + cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1', {'method': method, 'taskhash': taskhash}) row = cursor.fetchone() @@ -52,6 +67,9 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): def do_POST(self): try: + if not self.db: + self.opendb() + p = urllib.parse.urlparse(self.path) if p.path != self.prefix + '/v1/equivalent': @@ -63,15 +81,29 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): with contextlib.closing(self.db.cursor()) as cursor: cursor.execute(''' - SELECT taskhash, method, unihash FROM tasks_v1 WHERE method=:method AND outhash=:outhash + -- Find tasks with a matching outhash (that is, tasks that + -- are equivalent) + SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash + + -- If there is an exact match on the taskhash, return it. + -- Otherwise return the oldest matching outhash of any + -- taskhash ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, created ASC + + -- Only return one row LIMIT 1 ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')}) row = cursor.fetchone() + # If no matching outhash was found, or one *was* found but it + # wasn't an exact match on the taskhash, a new entry for this + # taskhash should be added if row is None or row['taskhash'] != data['taskhash']: + # If a row matching the outhash was found, the unihash for + # the new taskhash should be the same as that one. + # Otherwise the caller provided unihash is used. unihash = data['unihash'] if row is not None: unihash = row['unihash'] @@ -88,18 +120,17 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): if k in data: insert_data[k] = data[k] - cursor.execute('''INSERT INTO tasks_v1 (%s) VALUES (%s)''' % ( + cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( ', '.join(sorted(insert_data.keys())), ', '.join(':' + k for k in sorted(insert_data.keys()))), insert_data) logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash) - cursor.execute('SELECT taskhash, method, unihash FROM tasks_v1 WHERE id=:id', {'id': cursor.lastrowid}) - row = cursor.fetchone() self.db.commit() - - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} + d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash} + else: + d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} self.send_response(200) self.send_header('Content-Type', 'application/json; charset=utf-8') @@ -110,17 +141,67 @@ class HashEquivalenceServer(BaseHTTPRequestHandler): self.send_error(400, explain=traceback.format_exc()) return -def create_server(addr, db, prefix=''): +class ThreadedHTTPServer(HTTPServer): + quit = False + + def serve_forever(self): + self.requestqueue = queue.Queue() + self.handlerthread = threading.Thread(target=self.process_request_thread) + self.handlerthread.daemon = False + + self.handlerthread.start() + + signal.signal(signal.SIGTERM, self.sigterm_exception) + super().serve_forever() + os._exit(0) + + def sigterm_exception(self, signum, stackframe): + self.server_close() + os._exit(0) + + def server_bind(self): + HTTPServer.server_bind(self) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0)) + + def process_request_thread(self): + while not self.quit: + try: + (request, client_address) = self.requestqueue.get(True) + except queue.Empty: + continue + if request is None: + continue + try: + self.finish_request(request, client_address) + except Exception: + self.handle_error(request, client_address) + finally: + self.shutdown_request(request) + os._exit(0) + + def process_request(self, request, client_address): + self.requestqueue.put((request, client_address)) + + def server_close(self): + super().server_close() + self.quit = True + self.requestqueue.put((None, None)) + self.handlerthread.join() + +def create_server(addr, dbname, prefix=''): class Handler(HashEquivalenceServer): pass - Handler.prefix = prefix - Handler.db = db + db = sqlite3.connect(dbname) db.row_factory = sqlite3.Row + Handler.prefix = prefix + Handler.db = None + Handler.dbname = dbname + with contextlib.closing(db.cursor()) as cursor: cursor.execute(''' - CREATE TABLE IF NOT EXISTS tasks_v1 ( + CREATE TABLE IF NOT EXISTS tasks_v2 ( id INTEGER PRIMARY KEY AUTOINCREMENT, method TEXT NOT NULL, outhash TEXT NOT NULL, @@ -134,9 +215,16 @@ def create_server(addr, db, prefix=''): PV TEXT, PR TEXT, task TEXT, - outhash_siginfo TEXT + outhash_siginfo TEXT, + + UNIQUE(method, outhash, taskhash) ) ''') + cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)') + cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)') + + ret = ThreadedHTTPServer(addr, Handler) + + logger.info('Starting server on %s\n', ret.server_port) - logger.info('Starting server on %s', addr) - return HTTPServer(addr, Handler) + return ret diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py index 8300a2559..6845b5388 100644 --- a/poky/bitbake/lib/hashserv/tests.py +++ b/poky/bitbake/lib/hashserv/tests.py @@ -6,30 +6,31 @@ # import unittest -import threading +import multiprocessing import sqlite3 import hashlib import urllib.request import json +import tempfile from . import create_server class TestHashEquivalenceServer(unittest.TestCase): def setUp(self): - # Start an in memory hash equivalence server in the background bound to + # Start a hash equivalence server in the background bound to # an ephemeral port - db = sqlite3.connect(':memory:', check_same_thread=False) - self.server = create_server(('localhost', 0), db) + self.dbfile = tempfile.NamedTemporaryFile(prefix="bb-hashserv-db-") + self.server = create_server(('localhost', 0), self.dbfile.name) self.server_addr = 'http://localhost:%d' % self.server.socket.getsockname()[1] - self.server_thread = threading.Thread(target=self.server.serve_forever) + self.server_thread = multiprocessing.Process(target=self.server.serve_forever) + self.server_thread.daemon = True self.server_thread.start() def tearDown(self): # Shutdown server s = getattr(self, 'server', None) if s is not None: - self.server.shutdown() + self.server_thread.terminate() self.server_thread.join() - self.server.server_close() def send_get(self, path): url = '%s/%s' % (self.server_addr, path) |