From a34c030e5ec7021e7fb452410d38abfb3993ec68 Mon Sep 17 00:00:00 2001 From: Brad Bishop Date: Mon, 23 Sep 2019 22:34:48 -0400 Subject: poky: subtree update:745e38ff0f..81f9e815d3 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adrian Bunk (6): openssl: Upgrade 1.1.1c -> 1.1.1d glib-2.0: Upgrade 2.60.6 -> 2.60.7 lttng-modules: Upgrade 2.10.10 -> 2.10.11 lttng-ust: Upgrade 2.10.4 -> 2.10.5 squashfs-tools: Remove UPSTREAM_CHECK_COMMITS libmpc: Remove dead UPSTREAM_CHECK_URI Alexander Kanavin (2): runqemu: decouple gtk and gl options strace: add a timeout for running ptests Alistair Francis (1): gdb: Mark gdbserver as ALLOW_EMPTY for riscv32 Andre McCurdy (9): busybox: drop unused mount.busybox and umount.busybox wrappers busybox: drop inittab from SRC_URI ( now moved to busybox-inittab ) busybox-inittab: minor formatting tweaks base-files: drop legacy empty file /etc/default/usbd busybox: rcS and rcK should not be writeable by everyone ffmpeg: add PACKAGECONFIG controls for alsa and zlib (enable by default) libwebp: apply ARM specific config options to big endian ARM initscripts: enable alignment.sh init script for big endian ARM libunwind: apply configure over-ride to both big and little endian ARM Andrew F. Davis (4): libepoxy: Disable x11 when not building for x11 cogl: Set depends to the virtual needed not explicitly on Mesa gtk+3: Set depends to the virtual needed not explicitly on Mesa weston: Set depends to the virtual needed not explicitly on Mesa Armin Kuster (1): gcc: Security fix for CVE-2019-15847 Changhyeok Bae (1): iw: upgrade to 5.3 Changqing Li (2): classextend.py: don't extend file for file dependency report-error.bbclass: add local.conf/auto.conf into error report Chen Qi (1): python-numpy: fix build for libn32 Daniel Gomez (1): lttng-modules: Add missing SRCREV_FORMAT Diego Rondini (1): initramfs-framework: support PARTLABEL option Dmitry Eremin-Solenikov (7): image-uefi.conf: add config file holding configuration for UEFI images grub-bootconf: switch to image-uefi.conf grub-efi: switch to image-uefi.conf grub-efi.bbclass: switch to image-uefi.conf systemd-boot: switch to image-uefi.conf systemd-boot.bbclass: switch to image-uefi.conf live-vm-common.bbclass: provide efi population functions for live images Hector Palacios (1): udev-extraconf: skip mounting partitions already mounted by systemd Henning Schild (6): oe-git-proxy: allow setting SOCAT from outside oeqa: add case for oe-git-proxy Revert "oe-git-proxy: Avoid resolving NO_PROXY against local files" oe-git-proxy: disable shell pathname expansion for the whole script oe-git-proxy: NO_PROXY suffix matching without wildcard for match_host oe-git-proxy: fix dash "Bad substitution" Hongxu Jia (1): elfutils: 0.176 -> 0.177 Jack Mitchell (1): iptables: add systemd helper unit to load/restore rules Jaewon Lee (1): populate_sdk_ext: Introduce mechanism to keep nativesdk* sstate in esdk Jason Wessel (1): gnupg: Extend -native wrapper to fix gpgme-native's gpgconf problems Jiang Lu (2): glib-networking:enable glib-networking build as native package libsoup:enable libsoup build as native package Joshua Watt (4): sstatesig: Update server URI Remove SSTATE_HASHEQUIV_SERVER bitbake: bitbake: Rework hash equivalence classes/archiver: Fix WORKDIR for shared source Kai Kang (1): systemd: provides ${base_sbindir}/udevadm Khem Raj (10): ptrace: Drop ptrace aid for musl/ppc elfutils: Fix build on ppc/musl cogl: Do not depend PN-dev on empty PN musl: Update to latest master glibc: Move DISTRO_FEATURE specific do_install code for target recipe only populate_sdk_base.bbclass: nativesdk-glibc-locale is required on musl too nativesdk.bbclass: Clear out LIBCEXTENSION and ABIEXTENSION openssl: Enable os option for with-rand-seed as well weston-init: Add possibility to run weston as non-root user layer.conf: Remove weston-conf from SIGGEN_EXCLUDE_SAFE_RECIPE_DEPS Li Zhou (1): qemu: Security Advisory - qemu - CVE-2019-15890 Limeng (1): tune-cortexa57-cortexa53: add tunes for ARM Cortex-A53-Cortex-A57 Martin Jansa (2): perf: fix build on kernels which don't have ${S}/tools/include/linux/bits.h bitbake: Revert "bitbake: cooker: Ensure bbappends are found in stable order" Maxime Roussin-BĂ©langer (1): meta: add missing descriptions and homepage in bsp Mikko Rapeli (2): busybox.inc: handle empty DEBUG_PREFIX_MAP bitbake: svn fetcher: allow "svn propget svn:externals" to fail Nathan Rossi (7): resulttool: Handle multiple series containing ptestresults gcc-cross.inc: Process binaries in build dir to be relocatable oeqa/core/case.py: Add OEPTestResultTestCase for ptestresult helpers oeqa/selftest: Rework toolchain tests to use OEPTestResultTestCase glibc-testsuite: SkipRecipe if libc is not glibc cmake: 3.15.2 -> 3.15.3 meson.bbclass: Handle microblaze* mapping to cpu family Oleksandr Kravchuk (5): python3-pygobject: update to 3.34.0 font-util: update to 1.3.2 expat: update to 2.2.8 curl: update to 7.66.0 python3-dbus: update to 1.2.12 Otavio Salvador (1): mesa: Upgrade 19.1.1 -> 19.1.6 Peter Kjellerstedt (3): glibc: Make it build without ldconfig in DISTRO_FEATURES package_rpm.bbclass: Remove a misleading bb.note() tzdata: Correct the packaging of /etc/localtime and /etc/timezone Quentin Schulz (1): externalsrc: stop rebuilds of 2+ externalsrc recipes sharing the same git repo Randy MacLeod (4): valgrind: enable ~500 more ptests valgrind: make a few more ptests pass valgrind: ptest improvements to run-ptest and more valgrind: disable 256 ptests for aarch64 Richard Purdie (8): bitbake: runqueue/siggen: Optimise hash equiv queries runqemu: Mention snapshot in the help output initramfs-framework: support PARTLABEL option systemd: Handle slow to boot mips hwdb update timeouts meta-extsdk: Either an sstate task is a proper task or it isn't oeqa/concurrenttest: Use ionice to delete build directories bitbake: utils: Add ionice option to prunedir build-appliance-image: Update to master head revision Robert Yang (2): conf/multilib.conf: Add ovmf to NON_MULTILIB_RECIPES bitbake: runqueue: validate_hashes(): currentcount should be a number Ross Burton (16): libtasn1: fix build with api-documentation enabled gstreamer1.0-libav: enable gtk-doc again python3: handle STAGING_LIBDIR/INCDIR being unset mesa: no need to depend on target python3 adwaita-icon-theme: fix rare install race oeqa/selftest/wic: improve assert messages in test_fixed_size oeqa/selftest/imagefeatures: dump the JSON if it can't be parsed libical: upgrade to 3.0.6 acpica: upgrade 20190509 -> 20190816 gdk-pixbuf: upgrade 2.38.1 -> 2.38.2 piglit: upgrade to latest revision libinput: upgrade 1.14.0 -> 1.14.1 rootfs-postcommands: check /etc/gconf exists before working on it systemd-systemctl-native: don't care about line endings opkg-utils: respect SOURCE_DATE_EPOCH when building ipkgs bitbake: fetch2/git: add git-lfs toggle option Scott Murray (1): systemd: upgrade to 243 Stefan Ghinea (1): ghostscript: CVE-2019-14811, CVE-2019-14817 Tim Blechmann (1): icecc: blacklist pixman Yeoh Ee Peng (3): bitbake: bitbake-layers: show-recipes: Show recipes only bitbake: bitbake-layers: show-recipes: Select recipes from selected layer bitbake: bitbake-layers: show-recipes: Enable bare output Yi Zhao (3): screen: add /etc/screenrc as global config file nfs-utils: fix nfs mount error on 32bit nfs server grub: remove diffutils and freetype runtime dependencies Zang Ruochen (2): btrfs-tools:upgrade 5.2.1 -> 5.2.2 timezone:upgrade 2019b -> 2019c Change-Id: I1ec24480a8964e474cd99d60a0cb0975e49b46b8 Signed-off-by: Brad Bishop --- poky/bitbake/lib/bb/cooker.py | 18 +- poky/bitbake/lib/bb/fetch2/git.py | 18 +- poky/bitbake/lib/bb/fetch2/svn.py | 2 +- poky/bitbake/lib/bb/runqueue.py | 11 +- poky/bitbake/lib/bb/siggen.py | 84 +++---- poky/bitbake/lib/bb/tests/runqueue.py | 19 +- poky/bitbake/lib/bb/utils.py | 11 +- poky/bitbake/lib/bblayers/query.py | 39 ++-- poky/bitbake/lib/hashserv/__init__.py | 261 +++++---------------- poky/bitbake/lib/hashserv/client.py | 156 +++++++++++++ poky/bitbake/lib/hashserv/server.py | 414 ++++++++++++++++++++++++++++++++++ poky/bitbake/lib/hashserv/tests.py | 159 +++++++------ 12 files changed, 824 insertions(+), 368 deletions(-) create mode 100644 poky/bitbake/lib/hashserv/client.py create mode 100644 poky/bitbake/lib/hashserv/server.py (limited to 'poky/bitbake/lib') diff --git a/poky/bitbake/lib/bb/cooker.py b/poky/bitbake/lib/bb/cooker.py index 5840aa75e..0c540028a 100644 --- a/poky/bitbake/lib/bb/cooker.py +++ b/poky/bitbake/lib/bb/cooker.py @@ -194,7 +194,7 @@ class BBCooker: self.ui_cmdline = None self.hashserv = None - self.hashservport = None + self.hashservaddr = None self.initConfigurationData() @@ -392,19 +392,20 @@ class BBCooker: except prserv.serv.PRServiceConfigError as e: bb.fatal("Unable to start PR Server, exitting") - if self.data.getVar("BB_HASHSERVE") == "localhost:0": + if self.data.getVar("BB_HASHSERVE") == "auto": + # Create a new hash server bound to a unix domain socket if not self.hashserv: dbfile = (self.data.getVar("PERSISTENT_DIR") or self.data.getVar("CACHE")) + "/hashserv.db" - self.hashserv = hashserv.create_server(('localhost', 0), dbfile, '') - self.hashservport = "localhost:" + str(self.hashserv.server_port) + self.hashservaddr = "unix://%s/hashserve.sock" % self.data.getVar("TOPDIR") + self.hashserv = hashserv.create_server(self.hashservaddr, dbfile, sync=False) self.hashserv.process = multiprocessing.Process(target=self.hashserv.serve_forever) self.hashserv.process.daemon = True self.hashserv.process.start() - self.data.setVar("BB_HASHSERVE", self.hashservport) - self.databuilder.origdata.setVar("BB_HASHSERVE", self.hashservport) - self.databuilder.data.setVar("BB_HASHSERVE", self.hashservport) + self.data.setVar("BB_HASHSERVE", self.hashservaddr) + self.databuilder.origdata.setVar("BB_HASHSERVE", self.hashservaddr) + self.databuilder.data.setVar("BB_HASHSERVE", self.hashservaddr) for mc in self.databuilder.mcdata: - self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.hashservport) + self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.hashservaddr) bb.parse.init_parser(self.data) @@ -1852,7 +1853,6 @@ class CookerCollectFiles(object): (bbappend, filename) = b if (bbappend == f) or ('%' in bbappend and bbappend.startswith(f[:bbappend.index('%')])): filelist.append(filename) - filelist.sort() return filelist def collection_priorities(self, pkgfns, d): diff --git a/poky/bitbake/lib/bb/fetch2/git.py b/poky/bitbake/lib/bb/fetch2/git.py index e171aa7eb..5fd63b4e3 100644 --- a/poky/bitbake/lib/bb/fetch2/git.py +++ b/poky/bitbake/lib/bb/fetch2/git.py @@ -464,6 +464,8 @@ class Git(FetchMethod): if os.path.exists(destdir): bb.utils.prunedir(destdir) + need_lfs = ud.parm.get("lfs", "1") == "1" + source_found = False source_error = [] @@ -493,14 +495,16 @@ class Git(FetchMethod): runfetchcmd("%s remote set-url origin %s" % (ud.basecmd, repourl), d, workdir=destdir) if self._contains_lfs(ud, d, destdir): - path = d.getVar('PATH') - if path: - gitlfstool = bb.utils.which(path, "git-lfs", executable=True) - if not gitlfstool: - raise bb.fetch2.FetchError("Repository %s has lfs content, install git-lfs plugin on host to download" % (repourl)) + if need_lfs: + path = d.getVar('PATH') + if path: + gitlfstool = bb.utils.which(path, "git-lfs", executable=True) + if not gitlfstool: + raise bb.fetch2.FetchError("Repository %s has LFS content, install git-lfs on host to download (or set lfs=0 to ignore it)" % (repourl)) + else: + bb.note("Could not find 'PATH'") else: - bb.note("Could not find 'PATH'") - + bb.note("Repository %s has LFS content but it is not being fetched" % (repourl)) if not ud.nocheckout: if subdir != "": diff --git a/poky/bitbake/lib/bb/fetch2/svn.py b/poky/bitbake/lib/bb/fetch2/svn.py index 59ce93160..96d666ba3 100644 --- a/poky/bitbake/lib/bb/fetch2/svn.py +++ b/poky/bitbake/lib/bb/fetch2/svn.py @@ -145,7 +145,7 @@ class Svn(FetchMethod): if not ("externals" in ud.parm and ud.parm["externals"] == "nowarn"): # Warn the user if this had externals (won't catch them all) - output = runfetchcmd("svn propget svn:externals", d, workdir=ud.moddir) + output = runfetchcmd("svn propget svn:externals || true", d, workdir=ud.moddir) if output: if "--ignore-externals" in svnfetchcmd.split(): bb.warn("%s contains svn:externals." % ud.url) diff --git a/poky/bitbake/lib/bb/runqueue.py b/poky/bitbake/lib/bb/runqueue.py index addb2bb82..d9a67a316 100644 --- a/poky/bitbake/lib/bb/runqueue.py +++ b/poky/bitbake/lib/bb/runqueue.py @@ -1173,6 +1173,7 @@ class RunQueueData: self.prepare_task_hash(tid) bb.parse.siggen.writeout_file_checksum_cache() + bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) #self.dump_data() return len(self.runtaskentries) @@ -1259,7 +1260,7 @@ class RunQueue: "buildname" : self.cfgData.getVar("BUILDNAME"), "date" : self.cfgData.getVar("DATE"), "time" : self.cfgData.getVar("TIME"), - "hashservport" : self.cooker.hashservport, + "hashservaddr" : self.cooker.hashservaddr, } worker.stdin.write(b"" + pickle.dumps(self.cooker.configuration) + b"") @@ -1393,7 +1394,7 @@ class RunQueue: cache[tid] = iscurrent return iscurrent - def validate_hashes(self, tocheck, data, currentcount=None, siginfo=False): + def validate_hashes(self, tocheck, data, currentcount=0, siginfo=False): valid = set() if self.hashvalidate: sq_data = {} @@ -1600,7 +1601,7 @@ class RunQueue: tocheck.add(tid) - valid_new = self.validate_hashes(tocheck, self.cooker.data, None, True) + valid_new = self.validate_hashes(tocheck, self.cooker.data, 0, True) # Tasks which are both setscene and noexec never care about dependencies # We therefore find tasks which are setscene and noexec and mark their @@ -1981,7 +1982,7 @@ class RunQueueExecute: continue logger.debug(1, "Task %s no longer deferred" % nexttask) del self.sq_deferred[nexttask] - valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, None, False) + valid = self.rq.validate_hashes(set([nexttask]), self.cooker.data, 0, False) if not valid: logger.debug(1, "%s didn't become valid, skipping setscene" % nexttask) self.sq_task_failoutright(nexttask) @@ -2173,7 +2174,7 @@ class RunQueueExecute: ret.add(dep) return ret - # We filter out multiconfig dependencies from taskdepdata we pass to the tasks + # We filter out multiconfig dependencies from taskdepdata we pass to the tasks # as most code can't handle them def build_taskdepdata(self, task): taskdepdata = {} diff --git a/poky/bitbake/lib/bb/siggen.py b/poky/bitbake/lib/bb/siggen.py index b50355930..e047c217e 100644 --- a/poky/bitbake/lib/bb/siggen.py +++ b/poky/bitbake/lib/bb/siggen.py @@ -13,6 +13,7 @@ import difflib import simplediff from bb.checksum import FileChecksumCache from bb import runqueue +import hashserv logger = logging.getLogger('BitBake.SigGen') @@ -91,6 +92,8 @@ class SignatureGenerator(object): def save_unitaskhashes(self): return + def set_setscene_tasks(self, setscene_tasks): + return class SignatureGeneratorBasic(SignatureGenerator): """ @@ -106,6 +109,7 @@ class SignatureGeneratorBasic(SignatureGenerator): self.taints = {} self.gendeps = {} self.lookupcache = {} + self.setscenetasks = {} self.basewhitelist = set((data.getVar("BB_HASHBASE_WHITELIST") or "").split()) self.taskwhitelist = None self.init_rundepcheck(data) @@ -151,6 +155,9 @@ class SignatureGeneratorBasic(SignatureGenerator): return taskdeps + def set_setscene_tasks(self, setscene_tasks): + self.setscenetasks = setscene_tasks + def finalise(self, fn, d, variant): mc = d.getVar("__BBMULTICONFIG", False) or "" @@ -369,6 +376,11 @@ class SignatureGeneratorUniHashMixIn(object): self.server, self.method = data[:2] super().set_taskdata(data[2:]) + def client(self): + if getattr(self, '_client', None) is None: + self._client = hashserv.create_client(self.server) + return self._client + def __get_task_unihash_key(self, tid): # TODO: The key only *needs* to be the taskhash, the tid is just # convenient @@ -389,11 +401,12 @@ class SignatureGeneratorUniHashMixIn(object): self.unitaskhashes[self.__get_task_unihash_key(tid)] = unihash def get_unihash(self, tid): - import urllib - import json - taskhash = self.taskhash[tid] + # If its not a setscene task we can return + if self.setscenetasks and tid not in self.setscenetasks: + return taskhash + key = self.__get_task_unihash_key(tid) # TODO: This cache can grow unbounded. It probably only needs to keep @@ -418,36 +431,22 @@ class SignatureGeneratorUniHashMixIn(object): unihash = taskhash try: - url = '%s/v1/equivalent?%s' % (self.server, - urllib.parse.urlencode({'method': self.method, 'taskhash': self.taskhash[tid]})) - - request = urllib.request.Request(url) - response = urllib.request.urlopen(request) - data = response.read().decode('utf-8') - - json_data = json.loads(data) - - if json_data: - unihash = json_data['unihash'] + data = self.client().get_unihash(self.method, self.taskhash[tid]) + if data: + unihash = data # A unique hash equal to the taskhash is not very interesting, # so it is reported it at debug level 2. If they differ, that # is much more interesting, so it is reported at debug level 1 bb.debug((1, 2)[unihash == taskhash], 'Found unihash %s in place of %s for %s from %s' % (unihash, taskhash, tid, self.server)) else: bb.debug(2, 'No reported unihash for %s:%s from %s' % (tid, taskhash, self.server)) - except urllib.error.URLError as e: - bb.warn('Failure contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) - except (KeyError, json.JSONDecodeError) as e: - bb.warn('Poorly formatted response from %s: %s' % (self.server, str(e))) + except hashserv.HashConnectionError as e: + bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) self.unitaskhashes[key] = unihash return unihash def report_unihash(self, path, task, d): - import urllib - import json - import tempfile - import base64 import importlib taskhash = d.getVar('BB_TASKHASH') @@ -482,42 +481,31 @@ class SignatureGeneratorUniHashMixIn(object): outhash = bb.utils.better_eval(self.method + '(path, sigfile, task, d)', locs) try: - url = '%s/v1/equivalent' % self.server - task_data = { - 'taskhash': taskhash, - 'method': self.method, - 'outhash': outhash, - 'unihash': unihash, - 'owner': d.getVar('SSTATE_HASHEQUIV_OWNER') - } + extra_data = {} + + owner = d.getVar('SSTATE_HASHEQUIV_OWNER') + if owner: + extra_data['owner'] = owner if report_taskdata: sigfile.seek(0) - task_data['PN'] = d.getVar('PN') - task_data['PV'] = d.getVar('PV') - task_data['PR'] = d.getVar('PR') - task_data['task'] = task - task_data['outhash_siginfo'] = sigfile.read().decode('utf-8') - - headers = {'content-type': 'application/json'} - - request = urllib.request.Request(url, json.dumps(task_data).encode('utf-8'), headers) - response = urllib.request.urlopen(request) - data = response.read().decode('utf-8') + extra_data['PN'] = d.getVar('PN') + extra_data['PV'] = d.getVar('PV') + extra_data['PR'] = d.getVar('PR') + extra_data['task'] = task + extra_data['outhash_siginfo'] = sigfile.read().decode('utf-8') - json_data = json.loads(data) - new_unihash = json_data['unihash'] + data = self.client().report_unihash(taskhash, self.method, outhash, unihash, extra_data) + new_unihash = data['unihash'] if new_unihash != unihash: bb.debug(1, 'Task %s unihash changed %s -> %s by server %s' % (taskhash, unihash, new_unihash, self.server)) bb.event.fire(bb.runqueue.taskUniHashUpdate(fn + ':do_' + task, new_unihash), d) else: bb.debug(1, 'Reported task %s as unihash %s to %s' % (taskhash, unihash, self.server)) - except urllib.error.URLError as e: - bb.warn('Failure contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) - except (KeyError, json.JSONDecodeError) as e: - bb.warn('Poorly formatted response from %s: %s' % (self.server, str(e))) + except hashserv.HashConnectionError as e: + bb.warn('Error contacting Hash Equivalence Server %s: %s' % (self.server, str(e))) finally: if sigfile: sigfile.close() @@ -538,7 +526,7 @@ class SignatureGeneratorTestEquivHash(SignatureGeneratorUniHashMixIn, SignatureG name = "TestEquivHash" def init_rundepcheck(self, data): super().init_rundepcheck(data) - self.server = "http://" + data.getVar('BB_HASHSERVE') + self.server = data.getVar('BB_HASHSERVE') self.method = "sstate_output_hash" diff --git a/poky/bitbake/lib/bb/tests/runqueue.py b/poky/bitbake/lib/bb/tests/runqueue.py index c7f5e5572..cb4d526f1 100644 --- a/poky/bitbake/lib/bb/tests/runqueue.py +++ b/poky/bitbake/lib/bb/tests/runqueue.py @@ -11,6 +11,7 @@ import bb import os import tempfile import subprocess +import sys # # TODO: @@ -232,10 +233,11 @@ class RunQueueTests(unittest.TestCase): self.assertEqual(set(tasks), set(expected)) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_single(self): with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1"] @@ -255,10 +257,11 @@ class RunQueueTests(unittest.TestCase): 'a1:package_write_ipk_setscene', 'a1:package_qa_setscene'] self.assertEqual(set(tasks), set(expected)) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_double(self): with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1", "e1"] @@ -278,11 +281,12 @@ class RunQueueTests(unittest.TestCase): self.assertEqual(set(tasks), set(expected)) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_multiple_setscene(self): # Runs e1:do_package_setscene twice with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1", "e1"] @@ -308,11 +312,12 @@ class RunQueueTests(unittest.TestCase): else: self.assertEqual(tasks.count(i), 1, "%s not in task list once" % i) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_partial_match(self): # e1:do_package matches initial built but not second hash value with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1"] @@ -336,11 +341,12 @@ class RunQueueTests(unittest.TestCase): expected.remove('e1:package') self.assertEqual(set(tasks), set(expected)) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_partial_match2(self): # e1:do_package + e1:do_populate_sysroot matches initial built but not second hash value with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1"] @@ -363,13 +369,14 @@ class RunQueueTests(unittest.TestCase): 'e1:package_setscene', 'e1:populate_sysroot_setscene', 'e1:build', 'e1:package_qa', 'e1:package_write_rpm', 'e1:package_write_ipk', 'e1:packagedata'] self.assertEqual(set(tasks), set(expected)) + @unittest.skipIf(sys.version_info < (3, 5, 0), 'Python 3.5 or later required') def test_hashserv_partial_match3(self): # e1:do_package is valid for a1 but not after b1 # In former buggy code, this triggered e1:do_fetch, then e1:do_populate_sysroot to run # with none of the intermediate tasks which is a serious bug with tempfile.TemporaryDirectory(prefix="runqueuetest") as tempdir: extraenv = { - "BB_HASHSERVE" : "localhost:0", + "BB_HASHSERVE" : "auto", "BB_SIGNATURE_HANDLER" : "TestEquivHash" } cmd = ["bitbake", "a1", "b1"] diff --git a/poky/bitbake/lib/bb/utils.py b/poky/bitbake/lib/bb/utils.py index 3e90b6a30..d035949b3 100644 --- a/poky/bitbake/lib/bb/utils.py +++ b/poky/bitbake/lib/bb/utils.py @@ -677,7 +677,7 @@ def _check_unsafe_delete_path(path): return True return False -def remove(path, recurse=False): +def remove(path, recurse=False, ionice=False): """Equivalent to rm -f or rm -rf""" if not path: return @@ -686,7 +686,10 @@ def remove(path, recurse=False): if _check_unsafe_delete_path(path): raise Exception('bb.utils.remove: called with dangerous path "%s" and recurse=True, refusing to delete!' % path) # shutil.rmtree(name) would be ideal but its too slow - subprocess.check_call(['rm', '-rf'] + glob.glob(path)) + cmd = [] + if ionice: + cmd = ['ionice', '-c', '3'] + subprocess.check_call(cmd + ['rm', '-rf'] + glob.glob(path)) return for name in glob.glob(path): try: @@ -695,12 +698,12 @@ def remove(path, recurse=False): if exc.errno != errno.ENOENT: raise -def prunedir(topdir): +def prunedir(topdir, ionice=False): # Delete everything reachable from the directory named in 'topdir'. # CAUTION: This is dangerous! if _check_unsafe_delete_path(topdir): raise Exception('bb.utils.prunedir: called with dangerous path "%s", refusing to delete!' % topdir) - remove(topdir, recurse=True) + remove(topdir, recurse=True, ionice=ionice) # # Could also use return re.compile("(%s)" % "|".join(map(re.escape, suffixes))).sub(lambda mo: "", var) diff --git a/poky/bitbake/lib/bblayers/query.py b/poky/bitbake/lib/bblayers/query.py index 993589de9..7db49c8e2 100644 --- a/poky/bitbake/lib/bblayers/query.py +++ b/poky/bitbake/lib/bblayers/query.py @@ -46,7 +46,7 @@ layer, with the preferred version first. Note that skipped recipes that are overlayed will also be listed, with a " (skipped)" suffix. """ - items_listed = self.list_recipes('Overlayed recipes', None, True, args.same_version, args.filenames, True, None) + items_listed = self.list_recipes('Overlayed recipes', None, True, args.same_version, args.filenames, False, True, None, False, None) # Check for overlayed .bbclass files classes = collections.defaultdict(list) @@ -112,9 +112,9 @@ skipped recipes will also be listed, with a " (skipped)" suffix. title = 'Matching recipes:' else: title = 'Available recipes:' - self.list_recipes(title, args.pnspec, False, False, args.filenames, args.multiple, inheritlist) + self.list_recipes(title, args.pnspec, False, False, args.filenames, args.recipes_only, args.multiple, args.layer, args.bare, inheritlist) - def list_recipes(self, title, pnspec, show_overlayed_only, show_same_ver_only, show_filenames, show_multi_provider_only, inherits): + def list_recipes(self, title, pnspec, show_overlayed_only, show_same_ver_only, show_filenames, show_recipes_only, show_multi_provider_only, selected_layer, bare, inherits): if inherits: bbpath = str(self.tinfoil.config_data.getVar('BBPATH')) for classname in inherits: @@ -144,24 +144,30 @@ skipped recipes will also be listed, with a " (skipped)" suffix. preferred_versions[p] = (ver, fn) def print_item(f, pn, ver, layer, ispref): - if f in skiplist: - skipped = ' (skipped)' - else: - skipped = '' - if show_filenames: - if ispref: - logger.plain("%s%s", f, skipped) + if not selected_layer or layer == selected_layer: + if not bare and f in skiplist: + skipped = ' (skipped)' else: - logger.plain(" %s%s", f, skipped) - else: - if ispref: - logger.plain("%s:", pn) - logger.plain(" %s %s%s", layer.ljust(20), ver, skipped) + skipped = '' + if show_filenames: + if ispref: + logger.plain("%s%s", f, skipped) + else: + logger.plain(" %s%s", f, skipped) + elif show_recipes_only: + if pn not in show_unique_pn: + show_unique_pn.append(pn) + logger.plain("%s%s", pn, skipped) + else: + if ispref: + logger.plain("%s:", pn) + logger.plain(" %s %s%s", layer.ljust(20), ver, skipped) global_inherit = (self.tinfoil.config_data.getVar('INHERIT') or "").split() cls_re = re.compile('classes/') preffiles = [] + show_unique_pn = [] items_listed = False for p in sorted(pkg_pn): if pnspec: @@ -493,8 +499,11 @@ NOTE: .bbappend files can impact the dependencies. parser_show_recipes = self.add_command(sp, 'show-recipes', self.do_show_recipes) parser_show_recipes.add_argument('-f', '--filenames', help='instead of the default formatting, list filenames of higher priority recipes with the ones they overlay indented underneath', action='store_true') + parser_show_recipes.add_argument('-r', '--recipes-only', help='instead of the default formatting, list recipes only', action='store_true') parser_show_recipes.add_argument('-m', '--multiple', help='only list where multiple recipes (in the same layer or different layers) exist for the same recipe name', action='store_true') parser_show_recipes.add_argument('-i', '--inherits', help='only list recipes that inherit the named class(es) - separate multiple classes using , (without spaces)', metavar='CLASS', default='') + parser_show_recipes.add_argument('-l', '--layer', help='only list recipes from the selected layer', default='') + parser_show_recipes.add_argument('-b', '--bare', help='output just names without the "(skipped)" marker', action='store_true') parser_show_recipes.add_argument('pnspec', nargs='*', help='optional recipe name specification (wildcards allowed, enclose in quotes to avoid shell expansion)') parser_show_appends = self.add_command(sp, 'show-appends', self.do_show_appends) diff --git a/poky/bitbake/lib/hashserv/__init__.py b/poky/bitbake/lib/hashserv/__init__.py index eb03c3221..c3318620f 100644 --- a/poky/bitbake/lib/hashserv/__init__.py +++ b/poky/bitbake/lib/hashserv/__init__.py @@ -3,203 +3,21 @@ # SPDX-License-Identifier: GPL-2.0-only # -from http.server import BaseHTTPRequestHandler, HTTPServer -import contextlib -import urllib.parse +from contextlib import closing +import re import sqlite3 -import json -import traceback -import logging -import socketserver -import queue -import threading -import signal -import socket -import struct -from datetime import datetime - -logger = logging.getLogger('hashserv') - -class HashEquivalenceServer(BaseHTTPRequestHandler): - def log_message(self, f, *args): - logger.debug(f, *args) - - def opendb(self): - self.db = sqlite3.connect(self.dbname) - self.db.row_factory = sqlite3.Row - self.db.execute("PRAGMA synchronous = OFF;") - self.db.execute("PRAGMA journal_mode = MEMORY;") - - def do_GET(self): - try: - if not self.db: - self.opendb() - - p = urllib.parse.urlparse(self.path) - - if p.path != self.prefix + '/v1/equivalent': - self.send_error(404) - return - - query = urllib.parse.parse_qs(p.query, strict_parsing=True) - method = query['method'][0] - taskhash = query['taskhash'][0] - - d = None - with contextlib.closing(self.db.cursor()) as cursor: - cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1', - {'method': method, 'taskhash': taskhash}) - - row = cursor.fetchone() - - if row is not None: - logger.debug('Found equivalent task %s', row['taskhash']) - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} - - self.send_response(200) - self.send_header('Content-Type', 'application/json; charset=utf-8') - self.end_headers() - self.wfile.write(json.dumps(d).encode('utf-8')) - except: - logger.exception('Error in GET') - self.send_error(400, explain=traceback.format_exc()) - return - - def do_POST(self): - try: - if not self.db: - self.opendb() - - p = urllib.parse.urlparse(self.path) - - if p.path != self.prefix + '/v1/equivalent': - self.send_error(404) - return - - length = int(self.headers['content-length']) - data = json.loads(self.rfile.read(length).decode('utf-8')) - - with contextlib.closing(self.db.cursor()) as cursor: - cursor.execute(''' - -- Find tasks with a matching outhash (that is, tasks that - -- are equivalent) - SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash - - -- If there is an exact match on the taskhash, return it. - -- Otherwise return the oldest matching outhash of any - -- taskhash - ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, - created ASC - - -- Only return one row - LIMIT 1 - ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')}) - - row = cursor.fetchone() - - # If no matching outhash was found, or one *was* found but it - # wasn't an exact match on the taskhash, a new entry for this - # taskhash should be added - if row is None or row['taskhash'] != data['taskhash']: - # If a row matching the outhash was found, the unihash for - # the new taskhash should be the same as that one. - # Otherwise the caller provided unihash is used. - unihash = data['unihash'] - if row is not None: - unihash = row['unihash'] - - insert_data = { - 'method': data['method'], - 'outhash': data['outhash'], - 'taskhash': data['taskhash'], - 'unihash': unihash, - 'created': datetime.now() - } - - for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): - if k in data: - insert_data[k] = data[k] - - cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( - ', '.join(sorted(insert_data.keys())), - ', '.join(':' + k for k in sorted(insert_data.keys()))), - insert_data) - - logger.info('Adding taskhash %s with unihash %s', data['taskhash'], unihash) - - self.db.commit() - d = {'taskhash': data['taskhash'], 'method': data['method'], 'unihash': unihash} - else: - d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} - - self.send_response(200) - self.send_header('Content-Type', 'application/json; charset=utf-8') - self.end_headers() - self.wfile.write(json.dumps(d).encode('utf-8')) - except: - logger.exception('Error in POST') - self.send_error(400, explain=traceback.format_exc()) - return - -class ThreadedHTTPServer(HTTPServer): - quit = False - - def serve_forever(self): - self.requestqueue = queue.Queue() - self.handlerthread = threading.Thread(target=self.process_request_thread) - self.handlerthread.daemon = False - - self.handlerthread.start() - - signal.signal(signal.SIGTERM, self.sigterm_exception) - super().serve_forever() - os._exit(0) - - def sigterm_exception(self, signum, stackframe): - self.server_close() - os._exit(0) - - def server_bind(self): - HTTPServer.server_bind(self) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0)) - - def process_request_thread(self): - while not self.quit: - try: - (request, client_address) = self.requestqueue.get(True) - except queue.Empty: - continue - if request is None: - continue - try: - self.finish_request(request, client_address) - except Exception: - self.handle_error(request, client_address) - finally: - self.shutdown_request(request) - os._exit(0) - - def process_request(self, request, client_address): - self.requestqueue.put((request, client_address)) - - def server_close(self): - super().server_close() - self.quit = True - self.requestqueue.put((None, None)) - self.handlerthread.join() - -def create_server(addr, dbname, prefix=''): - class Handler(HashEquivalenceServer): - pass - - db = sqlite3.connect(dbname) - db.row_factory = sqlite3.Row - Handler.prefix = prefix - Handler.db = None - Handler.dbname = dbname +UNIX_PREFIX = "unix://" + +ADDR_TYPE_UNIX = 0 +ADDR_TYPE_TCP = 1 + + +def setup_database(database, sync=True): + db = sqlite3.connect(database) + db.row_factory = sqlite3.Row - with contextlib.closing(db.cursor()) as cursor: + with closing(db.cursor()) as cursor: cursor.execute(''' CREATE TABLE IF NOT EXISTS tasks_v2 ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -220,11 +38,56 @@ def create_server(addr, dbname, prefix=''): UNIQUE(method, outhash, taskhash) ) ''') - cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup ON tasks_v2 (method, taskhash)') - cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup ON tasks_v2 (method, outhash)') + cursor.execute('PRAGMA journal_mode = WAL') + cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF')) + + # Drop old indexes + cursor.execute('DROP INDEX IF EXISTS taskhash_lookup') + cursor.execute('DROP INDEX IF EXISTS outhash_lookup') + + # Create new indexes + cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v2 ON tasks_v2 (method, taskhash, created)') + cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v2 ON tasks_v2 (method, outhash)') + + return db + + +def parse_address(addr): + if addr.startswith(UNIX_PREFIX): + return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],)) + else: + m = re.match(r'\[(?P[^\]]*)\]:(?P\d+)$', addr) + if m is not None: + host = m.group('host') + port = m.group('port') + else: + host, port = addr.split(':') + + return (ADDR_TYPE_TCP, (host, int(port))) + + +def create_server(addr, dbname, *, sync=True): + from . import server + db = setup_database(dbname, sync=sync) + s = server.Server(db) + + (typ, a) = parse_address(addr) + if typ == ADDR_TYPE_UNIX: + s.start_unix_server(*a) + else: + s.start_tcp_server(*a) + + return s + - ret = ThreadedHTTPServer(addr, Handler) +def create_client(addr): + from . import client + c = client.Client() - logger.info('Starting server on %s\n', ret.server_port) + (typ, a) = parse_address(addr) + if typ == ADDR_TYPE_UNIX: + c.connect_unix(*a) + else: + c.connect_tcp(*a) - return ret + return c diff --git a/poky/bitbake/lib/hashserv/client.py b/poky/bitbake/lib/hashserv/client.py new file mode 100644 index 000000000..2559bbb3f --- /dev/null +++ b/poky/bitbake/lib/hashserv/client.py @@ -0,0 +1,156 @@ +# Copyright (C) 2019 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only +# + +from contextlib import closing +import json +import logging +import socket + + +logger = logging.getLogger('hashserv.client') + + +class HashConnectionError(Exception): + pass + + +class Client(object): + MODE_NORMAL = 0 + MODE_GET_STREAM = 1 + + def __init__(self): + self._socket = None + self.reader = None + self.writer = None + self.mode = self.MODE_NORMAL + + def connect_tcp(self, address, port): + def connect_sock(): + s = socket.create_connection((address, port)) + + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + return s + + self._connect_sock = connect_sock + + def connect_unix(self, path): + def connect_sock(): + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + # AF_UNIX has path length issues so chdir here to workaround + cwd = os.getcwd() + try: + os.chdir(os.path.dirname(path)) + s.connect(os.path.basename(path)) + finally: + os.chdir(cwd) + return s + + self._connect_sock = connect_sock + + def connect(self): + if self._socket is None: + self._socket = self._connect_sock() + + self.reader = self._socket.makefile('r', encoding='utf-8') + self.writer = self._socket.makefile('w', encoding='utf-8') + + self.writer.write('OEHASHEQUIV 1.0\n\n') + self.writer.flush() + + # Restore mode if the socket is being re-created + cur_mode = self.mode + self.mode = self.MODE_NORMAL + self._set_mode(cur_mode) + + return self._socket + + def close(self): + if self._socket is not None: + self._socket.close() + self._socket = None + self.reader = None + self.writer = None + + def _send_wrapper(self, proc): + count = 0 + while True: + try: + self.connect() + return proc() + except (OSError, HashConnectionError, json.JSONDecodeError, UnicodeDecodeError) as e: + logger.warning('Error talking to server: %s' % e) + if count >= 3: + if not isinstance(e, HashConnectionError): + raise HashConnectionError(str(e)) + raise e + self.close() + count += 1 + + def send_message(self, msg): + def proc(): + self.writer.write('%s\n' % json.dumps(msg)) + self.writer.flush() + + l = self.reader.readline() + if not l: + raise HashConnectionError('Connection closed') + + if not l.endswith('\n'): + raise HashConnectionError('Bad message %r' % message) + + return json.loads(l) + + return self._send_wrapper(proc) + + def send_stream(self, msg): + def proc(): + self.writer.write("%s\n" % msg) + self.writer.flush() + l = self.reader.readline() + if not l: + raise HashConnectionError('Connection closed') + return l.rstrip() + + return self._send_wrapper(proc) + + def _set_mode(self, new_mode): + if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: + r = self.send_stream('END') + if r != 'ok': + raise HashConnectionError('Bad response from server %r' % r) + elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: + r = self.send_message({'get-stream': None}) + if r != 'ok': + raise HashConnectionError('Bad response from server %r' % r) + elif new_mode != self.mode: + raise Exception('Undefined mode transition %r -> %r' % (self.mode, new_mode)) + + self.mode = new_mode + + def get_unihash(self, method, taskhash): + self._set_mode(self.MODE_GET_STREAM) + r = self.send_stream('%s %s' % (method, taskhash)) + if not r: + return None + return r + + def report_unihash(self, taskhash, method, outhash, unihash, extra={}): + self._set_mode(self.MODE_NORMAL) + m = extra.copy() + m['taskhash'] = taskhash + m['method'] = method + m['outhash'] = outhash + m['unihash'] = unihash + return self.send_message({'report': m}) + + def get_stats(self): + self._set_mode(self.MODE_NORMAL) + return self.send_message({'get-stats': None}) + + def reset_stats(self): + self._set_mode(self.MODE_NORMAL) + return self.send_message({'reset-stats': None}) diff --git a/poky/bitbake/lib/hashserv/server.py b/poky/bitbake/lib/hashserv/server.py new file mode 100644 index 000000000..0aff77688 --- /dev/null +++ b/poky/bitbake/lib/hashserv/server.py @@ -0,0 +1,414 @@ +# Copyright (C) 2019 Garmin Ltd. +# +# SPDX-License-Identifier: GPL-2.0-only +# + +from contextlib import closing +from datetime import datetime +import asyncio +import json +import logging +import math +import os +import signal +import socket +import time + +logger = logging.getLogger('hashserv.server') + + +class Measurement(object): + def __init__(self, sample): + self.sample = sample + + def start(self): + self.start_time = time.perf_counter() + + def end(self): + self.sample.add(time.perf_counter() - self.start_time) + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args, **kwargs): + self.end() + + +class Sample(object): + def __init__(self, stats): + self.stats = stats + self.num_samples = 0 + self.elapsed = 0 + + def measure(self): + return Measurement(self) + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.end() + + def add(self, elapsed): + self.num_samples += 1 + self.elapsed += elapsed + + def end(self): + if self.num_samples: + self.stats.add(self.elapsed) + self.num_samples = 0 + self.elapsed = 0 + + +class Stats(object): + def __init__(self): + self.reset() + + def reset(self): + self.num = 0 + self.total_time = 0 + self.max_time = 0 + self.m = 0 + self.s = 0 + self.current_elapsed = None + + def add(self, elapsed): + self.num += 1 + if self.num == 1: + self.m = elapsed + self.s = 0 + else: + last_m = self.m + self.m = last_m + (elapsed - last_m) / self.num + self.s = self.s + (elapsed - last_m) * (elapsed - self.m) + + self.total_time += elapsed + + if self.max_time < elapsed: + self.max_time = elapsed + + def start_sample(self): + return Sample(self) + + @property + def average(self): + if self.num == 0: + return 0 + return self.total_time / self.num + + @property + def stdev(self): + if self.num <= 1: + return 0 + return math.sqrt(self.s / (self.num - 1)) + + def todict(self): + return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} + + +class ServerClient(object): + def __init__(self, reader, writer, db, request_stats): + self.reader = reader + self.writer = writer + self.db = db + self.request_stats = request_stats + + async def process_requests(self): + try: + self.addr = self.writer.get_extra_info('peername') + logger.debug('Client %r connected' % (self.addr,)) + + # Read protocol and version + protocol = await self.reader.readline() + if protocol is None: + return + + (proto_name, proto_version) = protocol.decode('utf-8').rstrip().split() + if proto_name != 'OEHASHEQUIV' or proto_version != '1.0': + return + + # Read headers. Currently, no headers are implemented, so look for + # an empty line to signal the end of the headers + while True: + line = await self.reader.readline() + if line is None: + return + + line = line.decode('utf-8').rstrip() + if not line: + break + + # Handle messages + handlers = { + 'get': self.handle_get, + 'report': self.handle_report, + 'get-stream': self.handle_get_stream, + 'get-stats': self.handle_get_stats, + 'reset-stats': self.handle_reset_stats, + } + + while True: + d = await self.read_message() + if d is None: + break + + for k in handlers.keys(): + if k in d: + logger.debug('Handling %s' % k) + if 'stream' in k: + await handlers[k](d[k]) + else: + with self.request_stats.start_sample() as self.request_sample, \ + self.request_sample.measure(): + await handlers[k](d[k]) + break + else: + logger.warning("Unrecognized command %r" % d) + break + + await self.writer.drain() + finally: + self.writer.close() + + def write_message(self, msg): + self.writer.write(('%s\n' % json.dumps(msg)).encode('utf-8')) + + async def read_message(self): + l = await self.reader.readline() + if not l: + return None + + try: + message = l.decode('utf-8') + + if not message.endswith('\n'): + return None + + return json.loads(message) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + logger.error('Bad message from client: %r' % message) + raise e + + async def handle_get(self, request): + method = request['method'] + taskhash = request['taskhash'] + + row = self.query_equivalent(method, taskhash) + if row is not None: + logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} + + self.write_message(d) + else: + self.write_message(None) + + async def handle_get_stream(self, request): + self.write_message('ok') + + while True: + l = await self.reader.readline() + if not l: + return + + try: + # This inner loop is very sensitive and must be as fast as + # possible (which is why the request sample is handled manually + # instead of using 'with', and also why logging statements are + # commented out. + self.request_sample = self.request_stats.start_sample() + request_measure = self.request_sample.measure() + request_measure.start() + + l = l.decode('utf-8').rstrip() + if l == 'END': + self.writer.write('ok\n'.encode('utf-8')) + return + + (method, taskhash) = l.split() + #logger.debug('Looking up %s %s' % (method, taskhash)) + row = self.query_equivalent(method, taskhash) + if row is not None: + msg = ('%s\n' % row['unihash']).encode('utf-8') + #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) + else: + msg = '\n'.encode('utf-8') + + self.writer.write(msg) + finally: + request_measure.end() + self.request_sample.end() + + await self.writer.drain() + + async def handle_report(self, data): + with closing(self.db.cursor()) as cursor: + cursor.execute(''' + -- Find tasks with a matching outhash (that is, tasks that + -- are equivalent) + SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND outhash=:outhash + + -- If there is an exact match on the taskhash, return it. + -- Otherwise return the oldest matching outhash of any + -- taskhash + ORDER BY CASE WHEN taskhash=:taskhash THEN 1 ELSE 2 END, + created ASC + + -- Only return one row + LIMIT 1 + ''', {k: data[k] for k in ('method', 'outhash', 'taskhash')}) + + row = cursor.fetchone() + + # If no matching outhash was found, or one *was* found but it + # wasn't an exact match on the taskhash, a new entry for this + # taskhash should be added + if row is None or row['taskhash'] != data['taskhash']: + # If a row matching the outhash was found, the unihash for + # the new taskhash should be the same as that one. + # Otherwise the caller provided unihash is used. + unihash = data['unihash'] + if row is not None: + unihash = row['unihash'] + + insert_data = { + 'method': data['method'], + 'outhash': data['outhash'], + 'taskhash': data['taskhash'], + 'unihash': unihash, + 'created': datetime.now() + } + + for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): + if k in data: + insert_data[k] = data[k] + + cursor.execute('''INSERT INTO tasks_v2 (%s) VALUES (%s)''' % ( + ', '.join(sorted(insert_data.keys())), + ', '.join(':' + k for k in sorted(insert_data.keys()))), + insert_data) + + self.db.commit() + + logger.info('Adding taskhash %s with unihash %s', + data['taskhash'], unihash) + + d = { + 'taskhash': data['taskhash'], + 'method': data['method'], + 'unihash': unihash + } + else: + d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} + + self.write_message(d) + + async def handle_get_stats(self, request): + d = { + 'requests': self.request_stats.todict(), + } + + self.write_message(d) + + async def handle_reset_stats(self, request): + d = { + 'requests': self.request_stats.todict(), + } + + self.request_stats.reset() + self.write_message(d) + + def query_equivalent(self, method, taskhash): + # This is part of the inner loop and must be as fast as possible + try: + cursor = self.db.cursor() + cursor.execute('SELECT taskhash, method, unihash FROM tasks_v2 WHERE method=:method AND taskhash=:taskhash ORDER BY created ASC LIMIT 1', + {'method': method, 'taskhash': taskhash}) + return cursor.fetchone() + except: + cursor.close() + + +class Server(object): + def __init__(self, db, loop=None): + self.request_stats = Stats() + self.db = db + + if loop is None: + self.loop = asyncio.new_event_loop() + self.close_loop = True + else: + self.loop = loop + self.close_loop = False + + self._cleanup_socket = None + + def start_tcp_server(self, host, port): + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, host, port, loop=self.loop) + ) + + for s in self.server.sockets: + logger.info('Listening on %r' % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + def start_unix_server(self, path): + def cleanup(): + os.unlink(path) + + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(path)) + self.server = self.loop.run_until_complete( + asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) + ) + finally: + os.chdir(cwd) + + logger.info('Listening on %r' % path) + + self._cleanup_socket = cleanup + self.address = "unix://%s" % os.path.abspath(path) + + async def handle_client(self, reader, writer): + # writer.transport.set_write_buffer_limits(0) + try: + client = ServerClient(reader, writer, self.db, self.request_stats) + await client.process_requests() + except Exception as e: + import traceback + logger.error('Error from client: %s' % str(e), exc_info=True) + traceback.print_exc() + writer.close() + logger.info('Client disconnected') + + def serve_forever(self): + def signal_handler(): + self.loop.stop() + + self.loop.add_signal_handler(signal.SIGTERM, signal_handler) + + try: + self.loop.run_forever() + except KeyboardInterrupt: + pass + + self.server.close() + self.loop.run_until_complete(self.server.wait_closed()) + logger.info('Server shutting down') + + if self.close_loop: + self.loop.close() + + if self._cleanup_socket is not None: + self._cleanup_socket() diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py index 6845b5388..6584ff57b 100644 --- a/poky/bitbake/lib/hashserv/tests.py +++ b/poky/bitbake/lib/hashserv/tests.py @@ -1,29 +1,40 @@ #! /usr/bin/env python3 # -# Copyright (C) 2018 Garmin Ltd. +# Copyright (C) 2018-2019 Garmin Ltd. # # SPDX-License-Identifier: GPL-2.0-only # -import unittest -import multiprocessing -import sqlite3 +from . import create_server, create_client import hashlib -import urllib.request -import json +import logging +import multiprocessing +import sys import tempfile -from . import create_server +import threading +import unittest + + +class TestHashEquivalenceServer(object): + METHOD = 'TestMethod' + + def _run_server(self): + # logging.basicConfig(level=logging.DEBUG, filename='bbhashserv.log', filemode='w', + # format='%(levelname)s %(filename)s:%(lineno)d %(message)s') + self.server.serve_forever() -class TestHashEquivalenceServer(unittest.TestCase): def setUp(self): - # Start a hash equivalence server in the background bound to - # an ephemeral port - self.dbfile = tempfile.NamedTemporaryFile(prefix="bb-hashserv-db-") - self.server = create_server(('localhost', 0), self.dbfile.name) - self.server_addr = 'http://localhost:%d' % self.server.socket.getsockname()[1] - self.server_thread = multiprocessing.Process(target=self.server.serve_forever) + if sys.version_info < (3, 5, 0): + self.skipTest('Python 3.5 or later required') + + self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv') + self.dbfile = os.path.join(self.temp_dir.name, 'db.sqlite') + + self.server = create_server(self.get_server_addr(), self.dbfile) + self.server_thread = multiprocessing.Process(target=self._run_server) self.server_thread.daemon = True self.server_thread.start() + self.client = create_client(self.server.address) def tearDown(self): # Shutdown server @@ -31,19 +42,8 @@ class TestHashEquivalenceServer(unittest.TestCase): if s is not None: self.server_thread.terminate() self.server_thread.join() - - def send_get(self, path): - url = '%s/%s' % (self.server_addr, path) - request = urllib.request.Request(url) - response = urllib.request.urlopen(request) - return json.loads(response.read().decode('utf-8')) - - def send_post(self, path, data): - headers = {'content-type': 'application/json'} - url = '%s/%s' % (self.server_addr, path) - request = urllib.request.Request(url, json.dumps(data).encode('utf-8'), headers) - response = urllib.request.urlopen(request) - return json.loads(response.read().decode('utf-8')) + self.client.close() + self.temp_dir.cleanup() def test_create_hash(self): # Simple test that hashes can be created @@ -51,16 +51,11 @@ class TestHashEquivalenceServer(unittest.TestCase): outhash = '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f' unihash = 'f46d3fbb439bd9b921095da657a4de906510d2cd' - d = self.send_get('v1/equivalent?method=TestMethod&taskhash=%s' % taskhash) - self.assertIsNone(d, msg='Found unexpected task, %r' % d) + result = self.client.get_unihash(self.METHOD, taskhash) + self.assertIsNone(result, msg='Found unexpected task, %r' % result) - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash, - 'method': 'TestMethod', - 'outhash': outhash, - 'unihash': unihash, - }) - self.assertEqual(d['unihash'], unihash, 'Server returned bad unihash') + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') def test_create_equivalent(self): # Tests that a second reported task with the same outhash will be @@ -68,25 +63,16 @@ class TestHashEquivalenceServer(unittest.TestCase): taskhash = '53b8dce672cb6d0c73170be43f540460bfc347b4' outhash = '5a9cb1649625f0bf41fc7791b635cd9c2d7118c7f021ba87dcd03f72b67ce7a8' unihash = 'f37918cc02eb5a520b1aff86faacbc0a38124646' - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash, - 'method': 'TestMethod', - 'outhash': outhash, - 'unihash': unihash, - }) - self.assertEqual(d['unihash'], unihash, 'Server returned bad unihash') + + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') # Report a different task with the same outhash. The returned unihash # should match the first task taskhash2 = '3bf6f1e89d26205aec90da04854fbdbf73afe6b4' unihash2 = 'af36b199320e611fbb16f1f277d3ee1d619ca58b' - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash2, - 'method': 'TestMethod', - 'outhash': outhash, - 'unihash': unihash2, - }) - self.assertEqual(d['unihash'], unihash, 'Server returned bad unihash') + result = self.client.report_unihash(taskhash2, self.METHOD, outhash, unihash2) + self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash') def test_duplicate_taskhash(self): # Tests that duplicate reports of the same taskhash with different @@ -95,38 +81,63 @@ class TestHashEquivalenceServer(unittest.TestCase): taskhash = '8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a' outhash = 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e' unihash = '218e57509998197d570e2c98512d0105985dffc9' - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash, - 'method': 'TestMethod', - 'outhash': outhash, - 'unihash': unihash, - }) + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) - d = self.send_get('v1/equivalent?method=TestMethod&taskhash=%s' % taskhash) - self.assertEqual(d['unihash'], unihash) + result = self.client.get_unihash(self.METHOD, taskhash) + self.assertEqual(result, unihash) outhash2 = '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d' unihash2 = 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c' - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash, - 'method': 'TestMethod', - 'outhash': outhash2, - 'unihash': unihash2 - }) + self.client.report_unihash(taskhash, self.METHOD, outhash2, unihash2) - d = self.send_get('v1/equivalent?method=TestMethod&taskhash=%s' % taskhash) - self.assertEqual(d['unihash'], unihash) + result = self.client.get_unihash(self.METHOD, taskhash) + self.assertEqual(result, unihash) outhash3 = '77623a549b5b1a31e3732dfa8fe61d7ce5d44b3370f253c5360e136b852967b4' unihash3 = '9217a7d6398518e5dc002ed58f2cbbbc78696603' - d = self.send_post('v1/equivalent', { - 'taskhash': taskhash, - 'method': 'TestMethod', - 'outhash': outhash3, - 'unihash': unihash3 - }) + self.client.report_unihash(taskhash, self.METHOD, outhash3, unihash3) + + result = self.client.get_unihash(self.METHOD, taskhash) + self.assertEqual(result, unihash) + + def test_stress(self): + def query_server(failures): + client = Client(self.server.address) + try: + for i in range(1000): + taskhash = hashlib.sha256() + taskhash.update(str(i).encode('utf-8')) + taskhash = taskhash.hexdigest() + result = client.get_unihash(self.METHOD, taskhash) + if result != taskhash: + failures.append("taskhash mismatch: %s != %s" % (result, taskhash)) + finally: + client.close() + + # Report hashes + for i in range(1000): + taskhash = hashlib.sha256() + taskhash.update(str(i).encode('utf-8')) + taskhash = taskhash.hexdigest() + self.client.report_unihash(taskhash, self.METHOD, taskhash, taskhash) + + failures = [] + threads = [threading.Thread(target=query_server, args=(failures,)) for t in range(100)] + + for t in threads: + t.start() + + for t in threads: + t.join() + + self.assertFalse(failures) + - d = self.send_get('v1/equivalent?method=TestMethod&taskhash=%s' % taskhash) - self.assertEqual(d['unihash'], unihash) +class TestHashEquivalenceUnixServer(TestHashEquivalenceServer, unittest.TestCase): + def get_server_addr(self): + return "unix://" + os.path.join(self.temp_dir.name, 'sock') +class TestHashEquivalenceTCPServer(TestHashEquivalenceServer, unittest.TestCase): + def get_server_addr(self): + return "localhost:0" -- cgit v1.2.3