summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/hashserv/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/hashserv/client.py')
-rw-r--r--poky/bitbake/lib/hashserv/client.py147
1 files changed, 19 insertions, 128 deletions
diff --git a/poky/bitbake/lib/hashserv/client.py b/poky/bitbake/lib/hashserv/client.py
index e05c1eb56..531170967 100644
--- a/poky/bitbake/lib/hashserv/client.py
+++ b/poky/bitbake/lib/hashserv/client.py
@@ -8,110 +8,26 @@ import json
import logging
import socket
import os
-from . import chunkify, DEFAULT_MAX_CHUNK, create_async_client
+import bb.asyncrpc
+from . import create_async_client
logger = logging.getLogger("hashserv.client")
-class HashConnectionError(Exception):
- pass
-
-
-class AsyncClient(object):
+class AsyncClient(bb.asyncrpc.AsyncClient):
MODE_NORMAL = 0
MODE_GET_STREAM = 1
def __init__(self):
- self.reader = None
- self.writer = None
+ super().__init__('OEHASHEQUIV', '1.1', logger)
self.mode = self.MODE_NORMAL
- self.max_chunk = DEFAULT_MAX_CHUNK
-
- async def connect_tcp(self, address, port):
- async def connect_sock():
- return await asyncio.open_connection(address, port)
-
- self._connect_sock = connect_sock
-
- async def connect_unix(self, path):
- async def connect_sock():
- return await asyncio.open_unix_connection(path)
-
- self._connect_sock = connect_sock
-
- async def connect(self):
- if self.reader is None or self.writer is None:
- (self.reader, self.writer) = await self._connect_sock()
-
- self.writer.write("OEHASHEQUIV 1.1\n\n".encode("utf-8"))
- await self.writer.drain()
-
- cur_mode = self.mode
- self.mode = self.MODE_NORMAL
- await self._set_mode(cur_mode)
-
- async def close(self):
- self.reader = None
-
- if self.writer is not None:
- self.writer.close()
- self.writer = None
-
- async def _send_wrapper(self, proc):
- count = 0
- while True:
- try:
- await self.connect()
- return await proc()
- except (
- OSError,
- HashConnectionError,
- json.JSONDecodeError,
- UnicodeDecodeError,
- ) as e:
- logger.warning("Error talking to server: %s" % e)
- if count >= 3:
- if not isinstance(e, HashConnectionError):
- raise HashConnectionError(str(e))
- raise e
- await self.close()
- count += 1
-
- async def send_message(self, msg):
- async def get_line():
- line = await self.reader.readline()
- if not line:
- raise HashConnectionError("Connection closed")
-
- line = line.decode("utf-8")
-
- if not line.endswith("\n"):
- raise HashConnectionError("Bad message %r" % message)
-
- return line
-
- async def proc():
- for c in chunkify(json.dumps(msg), self.max_chunk):
- self.writer.write(c.encode("utf-8"))
- await self.writer.drain()
-
- l = await get_line()
-
- m = json.loads(l)
- if m and "chunk-stream" in m:
- lines = []
- while True:
- l = (await get_line()).rstrip("\n")
- if not l:
- break
- lines.append(l)
-
- m = json.loads("".join(lines))
-
- return m
- return await self._send_wrapper(proc)
+ async def setup_connection(self):
+ await super().setup_connection()
+ cur_mode = self.mode
+ self.mode = self.MODE_NORMAL
+ await self._set_mode(cur_mode)
async def send_stream(self, msg):
async def proc():
@@ -119,7 +35,7 @@ class AsyncClient(object):
await self.writer.drain()
l = await self.reader.readline()
if not l:
- raise HashConnectionError("Connection closed")
+ raise ConnectionError("Connection closed")
return l.decode("utf-8").rstrip()
return await self._send_wrapper(proc)
@@ -128,11 +44,11 @@ class AsyncClient(object):
if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM:
r = await self.send_stream("END")
if r != "ok":
- raise HashConnectionError("Bad response from server %r" % r)
+ raise ConnectionError("Bad response from server %r" % r)
elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL:
r = await self.send_message({"get-stream": None})
if r != "ok":
- raise HashConnectionError("Bad response from server %r" % r)
+ raise ConnectionError("Bad response from server %r" % r)
elif new_mode != self.mode:
raise Exception(
"Undefined mode transition %r -> %r" % (self.mode, new_mode)
@@ -189,12 +105,10 @@ class AsyncClient(object):
return (await self.send_message({"backfill-wait": None}))["tasks"]
-class Client(object):
+class Client(bb.asyncrpc.Client):
def __init__(self):
- self.client = AsyncClient()
- self.loop = asyncio.new_event_loop()
-
- for call in (
+ super().__init__()
+ self._add_methods(
"connect_tcp",
"close",
"get_unihash",
@@ -204,30 +118,7 @@ class Client(object):
"get_stats",
"reset_stats",
"backfill_wait",
- ):
- downcall = getattr(self.client, call)
- setattr(self, call, self._get_downcall_wrapper(downcall))
-
- def _get_downcall_wrapper(self, downcall):
- def wrapper(*args, **kwargs):
- return self.loop.run_until_complete(downcall(*args, **kwargs))
-
- return wrapper
-
- def connect_unix(self, path):
- # AF_UNIX has path length issues so chdir here to workaround
- cwd = os.getcwd()
- try:
- os.chdir(os.path.dirname(path))
- self.loop.run_until_complete(self.client.connect_unix(os.path.basename(path)))
- self.loop.run_until_complete(self.client.connect())
- finally:
- os.chdir(cwd)
-
- @property
- def max_chunk(self):
- return self.client.max_chunk
-
- @max_chunk.setter
- def max_chunk(self, value):
- self.client.max_chunk = value
+ )
+
+ def _get_async_client(self):
+ return AsyncClient()