diff options
Diffstat (limited to 'poky/bitbake/lib/hashserv/client.py')
-rw-r--r-- | poky/bitbake/lib/hashserv/client.py | 147 |
1 files changed, 19 insertions, 128 deletions
diff --git a/poky/bitbake/lib/hashserv/client.py b/poky/bitbake/lib/hashserv/client.py index e05c1eb568..5311709677 100644 --- a/poky/bitbake/lib/hashserv/client.py +++ b/poky/bitbake/lib/hashserv/client.py @@ -8,110 +8,26 @@ import json import logging import socket import os -from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client +import bb.asyncrpc +from . import create_async_client logger = logging.getLogger("hashserv.client") -class HashConnectionError(Exception): - pass - - -class AsyncClient(object): +class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 def __init__(self): - self.reader = None - self.writer = None + super().__init__('OEHASHEQUIV', '1.1', logger) self.mode = self.MODE_NORMAL - self.max_chunk = DEFAULT_MAX_CHUNK - - async def connect_tcp(self, address, port): - async def connect_sock(): - return await asyncio.open_connection(address, port) - - self._connect_sock = connect_sock - - async def connect_unix(self, path): - async def connect_sock(): - return await asyncio.open_unix_connection(path) - - self._connect_sock = connect_sock - - async def connect(self): - if self.reader is None or self.writer is None: - (self.reader, self.writer) = await self._connect_sock() - - self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8")) - await self.writer.drain() - - cur_mode = self.mode - self.mode = self.MODE_NORMAL - await self._set_mode(cur_mode) - - async def close(self): - self.reader = None - - if self.writer is not None: - self.writer.close() - self.writer = None - - async def _send_wrapper(self, proc): - count = 0 - while True: - try: - await self.connect() - return await proc() - except ( - OSError, - HashConnectionError, - json.JSONDecodeError, - UnicodeDecodeError, - ) as e: - logger.warning("Error talking to server: %s" % e) - if count >= 3: - if not isinstance(e, HashConnectionError): - raise HashConnectionError(str(e)) - raise e - await self.close() - count += 1 - - async def send_message(self, msg): - async def get_line(): - line = await self.reader.readline() - if not line: - raise HashConnectionError("Connection closed") - - line = line.decode("utf-8") - - if not line.endswith("\n"): - raise HashConnectionError("Bad message %r" % message) - - return line - - async def proc(): - for c in chunkify(json.dumps(msg), self.max_chunk): - self.writer.write(c.encode("utf-8")) - await self.writer.drain() - - l = await get_line() - - m = json.loads(l) - if m and "chunk-stream" in m: - lines = [] - while True: - l = (await get_line()).rstrip("\n") - if not l: - break - lines.append(l) - - m = json.loads("".join(lines)) - - return m - return await self._send_wrapper(proc) + async def setup_connection(self): + await super().setup_connection() + cur_mode = self.mode + self.mode = self.MODE_NORMAL + await self._set_mode(cur_mode) async def send_stream(self, msg): async def proc(): @@ -119,7 +35,7 @@ class AsyncClient(object): await self.writer.drain() l = await self.reader.readline() if not l: - raise HashConnectionError("Connection closed") + raise ConnectionError("Connection closed") return l.decode("utf-8").rstrip() return await self._send_wrapper(proc) @@ -128,11 +44,11 @@ class AsyncClient(object): if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: r = await self.send_stream("END") if r != "ok": - raise HashConnectionError("Bad response from server %r" % r) + raise ConnectionError("Bad response from server %r" % r) elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: r = await self.send_message({"get-stream": None}) if r != "ok": - raise HashConnectionError("Bad response from server %r" % r) + raise ConnectionError("Bad response from server %r" % r) elif new_mode != self.mode: raise Exception( "Undefined mode transition %r -> %r" % (self.mode, new_mode) @@ -189,12 +105,10 @@ class AsyncClient(object): return (await self.send_message({"backfill-wait": None}))["tasks"] -class Client(object): +class Client(bb.asyncrpc.Client): def __init__(self): - self.client = AsyncClient() - self.loop = asyncio.new_event_loop() - - for call in ( + super().__init__() + self._add_methods( "connect_tcp", "close", "get_unihash", @@ -204,30 +118,7 @@ class Client(object): "get_stats", "reset_stats", "backfill_wait", - ): - downcall = getattr(self.client, call) - setattr(self, call, self._get_downcall_wrapper(downcall)) - - def _get_downcall_wrapper(self, downcall): - def wrapper(*args, **kwargs): - return self.loop.run_until_complete(downcall(*args, **kwargs)) - - return wrapper - - def connect_unix(self, path): - # AF_UNIX has path length issues so chdir here to workaround - cwd = os.getcwd() - try: - os.chdir(os.path.dirname(path)) - self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path))) - self.loop.run_until_complete(self.client.connect()) - finally: - os.chdir(cwd) - - @property - def max_chunk(self): - return self.client.max_chunk - - @max_chunk.setter - def max_chunk(self, value): - self.client.max_chunk = value + ) + + def _get_async_client(self): + return AsyncClient() |