summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/hashserv
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/hashserv')
-rw-r--r--poky/bitbake/lib/hashserv/__init__.py118
-rw-r--r--poky/bitbake/lib/hashserv/tests.py15
2 files changed, 111 insertions, 22 deletions
diff --git a/poky/bitbake/lib/hashserv/__init__.py b/poky/bitbake/lib/hashserv/__init__.py
index fdc9ced9f..eb03c3221 100644
--- a/poky/bitbake/lib/hashserv/__init__.py
+++ b/poky/bitbake/lib/hashserv/__init__.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2018 Garmin Ltd.
+# Copyright (C) 2018-2019 Garmin Ltd.
#
# SPDX-License-Identifier: GPL-2.0-only
#
@@ -10,6 +10,12 @@ import sqlite3
import json
import traceback
import logging
+import socketserver
+import queue
+import threading
+import signal
+import socket
+import struct
from datetime import datetime
logger = logging.getLogger('hashserv')
@@ -18,8 +24,17 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
def log_message(self, f, *args):
logger.debug(f, *args)
+ def opendb(self):
+ self.db = sqlite3.connect(self.dbname)
+ self.db.row_factory = sqlite3.Row
+ self.db.execute("PRAGMA synchronous = OFF;")
+ self.db.execute("PRAGMA journal_mode = MEMORY;")
+
def do_GET(self):
try:
+ if not self.db:
+ self.opendb()
+
p = urllib.parse.urlparse(self.path)
if p.path != self.prefix + '/v1/equivalent':
@@ -32,7 +47,7 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
d = None
with contextlib.closing(self.db.cursor()) as cursor:
- cursor.execute('SELECT taskhash, method, unihash FROM tasks_v1 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
+ cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1',
{'method': method, 'taskhash': taskhash})
row = cursor.fetchone()
@@ -52,6 +67,9 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
def do_POST(self):
try:
+ if not self.db:
+ self.opendb()
+
p = urllib.parse.urlparse(self.path)
if p.path != self.prefix + '/v1/equivalent':
@@ -63,15 +81,29 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
with contextlib.closing(self.db.cursor()) as cursor:
cursor.execute('''
- SELECT taskhash, method, unihash FROM tasks_v1 WHERE method=:method AND outhash=:outhash
+ -- Find tasks with a matching outhash (that is, tasks that
+ -- are equivalent)
+ SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash
+
+ -- If there is an exact match on the taskhash, return it.
+ -- Otherwise return the oldest matching outhash of any
+ -- taskhash
ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END,
created ASC
+
+ -- Only return one row
LIMIT 1
''', {k: data[k] for k in ('method', 'outhash', 'taskhash')})
row = cursor.fetchone()
+ # If no matching outhash was found, or one *was* found but it
+ # wasn't an exact match on the taskhash, a new entry for this
+ # taskhash should be added
if row is None or row['taskhash'] != data['taskhash']:
+ # If a row matching the outhash was found, the unihash for
+ # the new taskhash should be the same as that one.
+ # Otherwise the caller provided unihash is used.
unihash = data['unihash']
if row is not None:
unihash = row['unihash']
@@ -88,18 +120,17 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
if k in data:
insert_data[k] = data[k]
- cursor.execute('''INSERT INTO tasks_v1 (%s) VALUES (%s)''' % (
+ cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % (
', '.join(sorted(insert_data.keys())),
', '.join(':' + k for k in sorted(insert_data.keys()))),
insert_data)
logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash)
- cursor.execute('SELECT taskhash, method, unihash FROM tasks_v1 WHERE id=:id', {'id': cursor.lastrowid})
- row = cursor.fetchone()
self.db.commit()
-
- d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
+ d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash}
+ else:
+ d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
@@ -110,17 +141,67 @@ class HashEquivalenceServer(BaseHTTPRequestHandler):
self.send_error(400, explain=traceback.format_exc())
return
-def create_server(addr, db, prefix=''):
+class ThreadedHTTPServer(HTTPServer):
+ quit = False
+
+ def serve_forever(self):
+ self.requestqueue = queue.Queue()
+ self.handlerthread = threading.Thread(target=self.process_request_thread)
+ self.handlerthread.daemon = False
+
+ self.handlerthread.start()
+
+ signal.signal(signal.SIGTERM, self.sigterm_exception)
+ super().serve_forever()
+ os._exit(0)
+
+ def sigterm_exception(self, signum, stackframe):
+ self.server_close()
+ os._exit(0)
+
+ def server_bind(self):
+ HTTPServer.server_bind(self)
+ self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
+
+ def process_request_thread(self):
+ while not self.quit:
+ try:
+ (request, client_address) = self.requestqueue.get(True)
+ except queue.Empty:
+ continue
+ if request is None:
+ continue
+ try:
+ self.finish_request(request, client_address)
+ except Exception:
+ self.handle_error(request, client_address)
+ finally:
+ self.shutdown_request(request)
+ os._exit(0)
+
+ def process_request(self, request, client_address):
+ self.requestqueue.put((request, client_address))
+
+ def server_close(self):
+ super().server_close()
+ self.quit = True
+ self.requestqueue.put((None, None))
+ self.handlerthread.join()
+
+def create_server(addr, dbname, prefix=''):
class Handler(HashEquivalenceServer):
pass
- Handler.prefix = prefix
- Handler.db = db
+ db = sqlite3.connect(dbname)
db.row_factory = sqlite3.Row
+ Handler.prefix = prefix
+ Handler.db = None
+ Handler.dbname = dbname
+
with contextlib.closing(db.cursor()) as cursor:
cursor.execute('''
- CREATE TABLE IF NOT EXISTS tasks_v1 (
+ CREATE TABLE IF NOT EXISTS tasks_v2 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method TEXT NOT NULL,
outhash TEXT NOT NULL,
@@ -134,9 +215,16 @@ def create_server(addr, db, prefix=''):
PV TEXT,
PR TEXT,
task TEXT,
- outhash_siginfo TEXT
+ outhash_siginfo TEXT,
+
+ UNIQUE(method, outhash, taskhash)
)
''')
+ cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)')
+ cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)')
+
+ ret = ThreadedHTTPServer(addr, Handler)
+
+ logger.info('Starting server on %s\n', ret.server_port)
- logger.info('Starting server on %s', addr)
- return HTTPServer(addr, Handler)
+ return ret
diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py
index 8300a2559..6845b5388 100644
--- a/poky/bitbake/lib/hashserv/tests.py
+++ b/poky/bitbake/lib/hashserv/tests.py
@@ -6,30 +6,31 @@
#
import unittest
-import threading
+import multiprocessing
import sqlite3
import hashlib
import urllib.request
import json
+import tempfile
from . import create_server
class TestHashEquivalenceServer(unittest.TestCase):
def setUp(self):
- # Start an in memory hash equivalence server in the background bound to
+ # Start a hash equivalence server in the background bound to
# an ephemeral port
- db = sqlite3.connect(':memory:', check_same_thread=False)
- self.server = create_server(('localhost', 0), db)
+ self.dbfile = tempfile.NamedTemporaryFile(prefix="bb-hashserv-db-")
+ self.server = create_server(('localhost', 0), self.dbfile.name)
self.server_addr = 'http://localhost:%d' % self.server.socket.getsockname()[1]
- self.server_thread = threading.Thread(target=self.server.serve_forever)
+ self.server_thread = multiprocessing.Process(target=self.server.serve_forever)
+ self.server_thread.daemon = True
self.server_thread.start()
def tearDown(self):
# Shutdown server
s = getattr(self, 'server', None)
if s is not None:
- self.server.shutdown()
+ self.server_thread.terminate()
self.server_thread.join()
- self.server.server_close()
def send_get(self, path):
url = '%s/%s' % (self.server_addr, path)