diff options
Diffstat (limited to 'poky/bitbake/lib/hashserv/server.py')
-rw-r--r-- | poky/bitbake/lib/hashserv/server.py | 340 |
1 files changed, 221 insertions, 119 deletions
diff --git a/poky/bitbake/lib/hashserv/server.py b/poky/bitbake/lib/hashserv/server.py index a059e52115..d40a2ab8f8 100644 --- a/poky/bitbake/lib/hashserv/server.py +++ b/poky/bitbake/lib/hashserv/server.py @@ -5,11 +5,12 @@ from contextlib import closing, contextmanager from datetime import datetime +import enum import asyncio import logging import math import time -from . import create_async_client, TABLE_COLUMNS +from . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS import bb.asyncrpc @@ -106,56 +107,64 @@ class Stats(object): return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} -def insert_task(cursor, data, ignore=False): +@enum.unique +class Resolve(enum.Enum): + FAIL = enum.auto() + IGNORE = enum.auto() + REPLACE = enum.auto() + + +def insert_table(cursor, table, data, on_conflict): + resolve = { + Resolve.FAIL: "", + Resolve.IGNORE: " OR IGNORE", + Resolve.REPLACE: " OR REPLACE", + }[on_conflict] + keys = sorted(data.keys()) - query = '''INSERT%s INTO tasks_v2 (%s) VALUES (%s)''' % ( - " OR IGNORE" if ignore else "", - ', '.join(keys), - ', '.join(':' + k for k in keys)) + query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( + resolve=resolve, + table=table, + fields=", ".join(keys), + values=", ".join(":" + k for k in keys), + ) + prevrowid = cursor.lastrowid cursor.execute(query, data) - -async def copy_from_upstream(client, db, method, taskhash): - d = await client.get_taskhash(method, taskhash, True) + logging.debug( + "Inserting %r into %s, %s", + data, + table, + on_conflict + ) + return (cursor.lastrowid, cursor.lastrowid != prevrowid) + +def insert_unihash(cursor, data, on_conflict): + return insert_table(cursor, "unihashes_v2", data, on_conflict) + +def insert_outhash(cursor, data, on_conflict): + return insert_table(cursor, "outhashes_v2", data, on_conflict) + +async def copy_unihash_from_upstream(client, db, method, taskhash): + d = await client.get_taskhash(method, taskhash) if d is not None: - # Filter out unknown columns - d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} - with closing(db.cursor()) as cursor: - insert_task(cursor, d) + insert_unihash( + cursor, + {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, + Resolve.IGNORE, + ) db.commit() - return d -async def copy_outhash_from_upstream(client, db, method, outhash, taskhash): - d = await client.get_outhash(method, outhash, taskhash) - if d is not None: - # Filter out unknown columns - d = {k: v for k, v in d.items() if k in TABLE_COLUMNS} - with closing(db.cursor()) as cursor: - insert_task(cursor, d) - db.commit() +class ServerCursor(object): + def __init__(self, db, cursor, upstream): + self.db = db + self.cursor = cursor + self.upstream = upstream - return d class ServerClient(bb.asyncrpc.AsyncServerConnection): - FAST_QUERY = 'SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' - ALL_QUERY = 'SELECT * FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1' - OUTHASH_QUERY = ''' - -- Find tasks with a matching outhash (that is, tasks that - -- are equivalent) - SELECT * FROM tasks_v2 WHERE method=:method AND outhash=:outhash - - -- If there is an exact match on the taskhash, return it. - -- Otherwise return the oldest matching outhash of any - -- taskhash - ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, - created ASC - - -- Only return one row - LIMIT 1 - ''' - def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): super().__init__(reader, writer, 'OEHASHEQUIV', logger) self.db = db @@ -210,36 +219,102 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def handle_get(self, request): method = request['method'] taskhash = request['taskhash'] + fetch_all = request.get('all', False) - if request.get('all', False): - row = self.query_equivalent(method, taskhash, self.ALL_QUERY) - else: - row = self.query_equivalent(method, taskhash, self.FAST_QUERY) + with closing(self.db.cursor()) as cursor: + d = await self.get_unihash(cursor, method, taskhash, fetch_all) - if row is not None: - logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) - d = {k: row[k] for k in row.keys()} - elif self.upstream_client is not None: - d = await copy_from_upstream(self.upstream_client, self.db, method, taskhash) + self.write_message(d) + + async def get_unihash(self, cursor, method, taskhash, fetch_all=False): + d = None + + if fetch_all: + cursor.execute( + ''' + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': method, + 'taskhash': taskhash, + } + + ) + row = cursor.fetchone() + + if row is not None: + d = {k: row[k] for k in row.keys()} + elif self.upstream_client is not None: + d = await self.upstream_client.get_taskhash(method, taskhash, True) + self.update_unified(cursor, d) + self.db.commit() else: - d = None + row = self.query_equivalent(cursor, method, taskhash) + + if row is not None: + d = {k: row[k] for k in row.keys()} + elif self.upstream_client is not None: + d = await self.upstream_client.get_taskhash(method, taskhash) + d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} + insert_unihash(cursor, d, Resolve.IGNORE) + self.db.commit() - self.write_message(d) + return d async def handle_get_outhash(self, request): + method = request['method'] + outhash = request['outhash'] + taskhash = request['taskhash'] + with closing(self.db.cursor()) as cursor: - cursor.execute(self.OUTHASH_QUERY, - {k: request[k] for k in ('method', 'outhash', 'taskhash')}) + d = await self.get_outhash(cursor, method, outhash, taskhash) - row = cursor.fetchone() + self.write_message(d) + + async def get_outhash(self, cursor, method, outhash, taskhash): + d = None + cursor.execute( + ''' + SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': method, + 'outhash': outhash, + } + ) + row = cursor.fetchone() if row is not None: - logger.debug('Found equivalent outhash %s -> %s', (row['outhash'], row['unihash'])) d = {k: row[k] for k in row.keys()} - else: - d = None + elif self.upstream_client is not None: + d = await self.upstream_client.get_outhash(method, outhash, taskhash) + self.update_unified(cursor, d) + self.db.commit() - self.write_message(d) + return d + + def update_unified(self, cursor, data): + if data is None: + return + + insert_unihash( + cursor, + {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, + Resolve.IGNORE + ) + insert_outhash( + cursor, + {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, + Resolve.IGNORE + ) async def handle_get_stream(self, request): self.write_message('ok') @@ -267,7 +342,12 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): (method, taskhash) = l.split() #logger.debug('Looking up %s %s' % (method, taskhash)) - row = self.query_equivalent(method, taskhash, self.FAST_QUERY) + cursor = self.db.cursor() + try: + row = self.query_equivalent(cursor, method, taskhash) + finally: + cursor.close() + if row is not None: msg = ('%s\n' % row['unihash']).encode('utf-8') #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) @@ -294,55 +374,82 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): async def handle_report(self, data): with closing(self.db.cursor()) as cursor: - cursor.execute(self.OUTHASH_QUERY, - {k: data[k] for k in ('method', 'outhash', 'taskhash')}) + outhash_data = { + 'method': data['method'], + 'outhash': data['outhash'], + 'taskhash': data['taskhash'], + 'created': datetime.now() + } - row = cursor.fetchone() + for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): + if k in data: + outhash_data[k] = data[k] + + # Insert the new entry, unless it already exists + (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) + + if inserted: + # If this row is new, check if it is equivalent to another + # output hash + cursor.execute( + ''' + SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 + INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash + -- Select any matching output hash except the one we just inserted + WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash + -- Pick the oldest hash + ORDER BY outhashes_v2.created ASC + LIMIT 1 + ''', + { + 'method': data['method'], + 'outhash': data['outhash'], + 'taskhash': data['taskhash'], + } + ) + row = cursor.fetchone() - if row is None and self.upstream_client: - # Try upstream - row = await copy_outhash_from_upstream(self.upstream_client, - self.db, - data['method'], - data['outhash'], - data['taskhash']) - - # If no matching outhash was found, or one *was* found but it - # wasn't an exact match on the taskhash, a new entry for this - # taskhash should be added - if row is None or row['taskhash'] != data['taskhash']: - # If a row matching the outhash was found, the unihash for - # the new taskhash should be the same as that one. - # Otherwise the caller provided unihash is used. - unihash = data['unihash'] if row is not None: + # A matching output hash was found. Set our taskhash to the + # same unihash since they are equivalent unihash = row['unihash'] + resolve = Resolve.IGNORE + else: + # No matching output hash was found. This is probably the + # first outhash to be added. + unihash = data['unihash'] + resolve = Resolve.IGNORE + + # Query upstream to see if it has a unihash we can use + if self.upstream_client is not None: + upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) + if upstream_data is not None: + unihash = upstream_data['unihash'] + + + insert_unihash( + cursor, + { + 'method': data['method'], + 'taskhash': data['taskhash'], + 'unihash': unihash, + }, + resolve + ) + + unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) + if unihash_data is not None: + unihash = unihash_data['unihash'] + else: + unihash = data['unihash'] - insert_data = { - 'method': data['method'], - 'outhash': data['outhash'], - 'taskhash': data['taskhash'], - 'unihash': unihash, - 'created': datetime.now() - } - - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - insert_data[k] = data[k] - - insert_task(cursor, insert_data) - self.db.commit() - - logger.info('Adding taskhash %s with unihash %s', - data['taskhash'], unihash) + self.db.commit() - d = { - 'taskhash': data['taskhash'], - 'method': data['method'], - 'unihash': unihash - } - else: - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} + d = { + 'taskhash': data['taskhash'], + 'method': data['method'], + 'unihash': unihash, + } self.write_message(d) @@ -350,23 +457,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): with closing(self.db.cursor()) as cursor: insert_data = { 'method': data['method'], - 'outhash': "", 'taskhash': data['taskhash'], 'unihash': data['unihash'], - 'created': datetime.now() } - - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - insert_data[k] = data[k] - - insert_task(cursor, insert_data, ignore=True) + insert_unihash(cursor, insert_data, Resolve.IGNORE) self.db.commit() # Fetch the unihash that will be reported for the taskhash. If the # unihash matches, it means this row was inserted (or the mapping # was already valid) - row = self.query_equivalent(data['method'], data['taskhash'], self.FAST_QUERY) + row = self.query_equivalent(cursor, data['method'], data['taskhash']) if row['unihash'] == data['unihash']: logger.info('Adding taskhash equivalence for %s with unihash %s', @@ -399,14 +499,16 @@ class ServerClient(bb.asyncrpc.AsyncServerConnection): await self.backfill_queue.join() self.write_message(d) - def query_equivalent(self, method, taskhash, query): + def query_equivalent(self, cursor, method, taskhash): # This is part of the inner loop and must be as fast as possible - try: - cursor = self.db.cursor() - cursor.execute(query, {'method': method, 'taskhash': taskhash}) - return cursor.fetchone() - except: - cursor.close() + cursor.execute( + 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', + { + 'method': method, + 'taskhash': taskhash, + } + ) + return cursor.fetchone() class Server(bb.asyncrpc.AsyncServer): @@ -435,7 +537,7 @@ class Server(bb.asyncrpc.AsyncServer): self.backfill_queue.task_done() break method, taskhash = item - await copy_from_upstream(client, self.db, method, taskhash) + await copy_unihash_from_upstream(client, self.db, method, taskhash) self.backfill_queue.task_done() finally: await client.close() |