diff options
author | Dave Cobbley <david.j.cobbley@linux.intel.com> | 2018-08-14 20:05:37 +0300 |
---|---|---|
committer | Brad Bishop <bradleyb@fuzziesquirrel.com> | 2018-08-23 04:26:31 +0300 |
commit | eb8dc40360f0cfef56fb6947cc817a547d6d9bc6 (patch) | |
tree | de291a73dc37168da6370e2cf16c347d1eba9df8 /poky/bitbake/bin/bitbake-worker | |
parent | 9c3cf826d853102535ead04cebc2d6023eff3032 (diff) | |
download | openbmc-eb8dc40360f0cfef56fb6947cc817a547d6d9bc6.tar.xz |
[Subtree] Removing import-layers directory
As part of the move to subtrees, need to bring all the import layers
content to the top level.
Change-Id: I4a163d10898cbc6e11c27f776f60e1a470049d8f
Signed-off-by: Dave Cobbley <david.j.cobbley@linux.intel.com>
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
Diffstat (limited to 'poky/bitbake/bin/bitbake-worker')
-rwxr-xr-x | poky/bitbake/bin/bitbake-worker | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/poky/bitbake/bin/bitbake-worker b/poky/bitbake/bin/bitbake-worker new file mode 100755 index 000000000..e925054b7 --- /dev/null +++ b/poky/bitbake/bin/bitbake-worker @@ -0,0 +1,501 @@ +#!/usr/bin/env python3 + +import os +import sys +import warnings +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) +from bb import fetch2 +import logging +import bb +import select +import errno +import signal +import pickle +import traceback +import queue +from multiprocessing import Lock +from threading import Thread + +if sys.getfilesystemencoding() != "utf-8": + sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") + +# Users shouldn't be running this code directly +if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): + print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") + sys.exit(1) + +profiling = False +if sys.argv[1].startswith("decafbadbad"): + profiling = True + try: + import cProfile as profile + except: + import profile + +# Unbuffer stdout to avoid log truncation in the event +# of an unorderly exit as well as to provide timely +# updates to log files for use with tail +try: + if sys.stdout.name == '<stdout>': + import fcntl + fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) + fl |= os.O_SYNC + fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) + #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) +except: + pass + +logger = logging.getLogger("BitBake") + +worker_pipe = sys.stdout.fileno() +bb.utils.nonblockingfd(worker_pipe) +# Need to guard against multiprocessing being used in child processes +# and multiple processes trying to write to the parent at the same time +worker_pipe_lock = None + +handler = bb.event.LogHandler() +logger.addHandler(handler) + +if 0: + # Code to write out a log file of all events passing through the worker + logfilename = "/tmp/workerlogfile" + format_str = "%(levelname)s: %(message)s" + conlogformat = bb.msg.BBLogFormatter(format_str) + consolelog = logging.FileHandler(logfilename) + bb.msg.addDefaultlogFilter(consolelog) + consolelog.setFormatter(conlogformat) + logger.addHandler(consolelog) + +worker_queue = queue.Queue() + +def worker_fire(event, d): + data = b"<event>" + pickle.dumps(event) + b"</event>" + worker_fire_prepickled(data) + +def worker_fire_prepickled(event): + global worker_queue + + worker_queue.put(event) + +# +# We can end up with write contention with the cooker, it can be trying to send commands +# and we can be trying to send event data back. Therefore use a separate thread for writing +# back data to cooker. +# +worker_thread_exit = False + +def worker_flush(worker_queue): + worker_queue_int = b"" + global worker_pipe, worker_thread_exit + + while True: + try: + worker_queue_int = worker_queue_int + worker_queue.get(True, 1) + except queue.Empty: + pass + while (worker_queue_int or not worker_queue.empty()): + try: + (_, ready, _) = select.select([], [worker_pipe], [], 1) + if not worker_queue.empty(): + worker_queue_int = worker_queue_int + worker_queue.get() + written = os.write(worker_pipe, worker_queue_int) + worker_queue_int = worker_queue_int[written:] + except (IOError, OSError) as e: + if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: + raise + if worker_thread_exit and worker_queue.empty() and not worker_queue_int: + return + +worker_thread = Thread(target=worker_flush, args=(worker_queue,)) +worker_thread.start() + +def worker_child_fire(event, d): + global worker_pipe + global worker_pipe_lock + + data = b"<event>" + pickle.dumps(event) + b"</event>" + try: + worker_pipe_lock.acquire() + worker_pipe.write(data) + worker_pipe_lock.release() + except IOError: + sigterm_handler(None, None) + raise + +bb.event.worker_fire = worker_fire + +lf = None +#lf = open("/tmp/workercommandlog", "w+") +def workerlog_write(msg): + if lf: + lf.write(msg) + lf.flush() + +def sigterm_handler(signum, frame): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.killpg(0, signal.SIGTERM) + sys.exit() + +def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): + # We need to setup the environment BEFORE the fork, since + # a fork() or exec*() activates PSEUDO... + + envbackup = {} + fakeenv = {} + umask = None + + taskdep = workerdata["taskdeps"][fn] + if 'umask' in taskdep and taskname in taskdep['umask']: + # umask might come in as a number or text string.. + try: + umask = int(taskdep['umask'][taskname],8) + except TypeError: + umask = taskdep['umask'][taskname] + + dry_run = cfg.dry_run or dry_run_exec + + # We can't use the fakeroot environment in a dry run as it possibly hasn't been built + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: + envvars = (workerdata["fakerootenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + fakedirs = (workerdata["fakerootdirs"][fn] or "").split() + for p in fakedirs: + bb.utils.mkdirhier(p) + logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % + (fn, taskname, ', '.join(fakedirs))) + else: + envvars = (workerdata["fakerootnoenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + sys.stdout.flush() + sys.stderr.flush() + + try: + pipein, pipeout = os.pipe() + pipein = os.fdopen(pipein, 'rb', 4096) + pipeout = os.fdopen(pipeout, 'wb', 0) + pid = os.fork() + except OSError as e: + logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) + sys.exit(1) + + if pid == 0: + def child(): + global worker_pipe + global worker_pipe_lock + pipein.close() + + signal.signal(signal.SIGTERM, sigterm_handler) + # Let SIGHUP exit as SIGTERM + signal.signal(signal.SIGHUP, sigterm_handler) + bb.utils.signal_on_parent_exit("SIGTERM") + + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_fire = worker_child_fire + worker_pipe = pipeout + worker_pipe_lock = Lock() + + # Make the child the process group leader and ensure no + # child process will be controlled by the current terminal + # This ensures signals sent to the controlling terminal like Ctrl+C + # don't stop the child processes. + os.setsid() + # No stdin + newsi = os.open(os.devnull, os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + if umask: + os.umask(umask) + + try: + bb_cache = bb.cache.NoCache(databuilder) + (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) + the_data = databuilder.mcdata[mc] + the_data.setVar("BB_WORKERCONTEXT", "1") + the_data.setVar("BB_TASKDEPDATA", taskdepdata) + if cfg.limited_deps: + the_data.setVar("BB_LIMITEDDEPS", "1") + the_data.setVar("BUILDNAME", workerdata["buildname"]) + the_data.setVar("DATE", workerdata["date"]) + the_data.setVar("TIME", workerdata["time"]) + for varname, value in extraconfigdata.items(): + the_data.setVar(varname, value) + + bb.parse.siggen.set_taskdata(workerdata["sigdata"]) + ret = 0 + + the_data = bb_cache.loadDataFull(fn, appends) + the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) + + bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) + + # exported_vars() returns a generator which *cannot* be passed to os.environ.update() + # successfully. We also need to unset anything from the environment which shouldn't be there + exports = bb.data.exported_vars(the_data) + + bb.utils.empty_environment() + for e, v in exports: + os.environ[e] = v + + for e in fakeenv: + os.environ[e] = fakeenv[e] + the_data.setVar(e, fakeenv[e]) + the_data.setVarFlag(e, 'export', "1") + + task_exports = the_data.getVarFlag(taskname, 'exports') + if task_exports: + for e in task_exports.split(): + the_data.setVarFlag(e, 'export', '1') + v = the_data.getVar(e) + if v is not None: + os.environ[e] = v + + if quieterrors: + the_data.setVarFlag(taskname, "quieterrors", "1") + + except Exception: + if not quieterrors: + logger.critical(traceback.format_exc()) + os._exit(1) + try: + if dry_run: + return 0 + return bb.build.exec_task(fn, taskname, the_data, cfg.profile) + except: + os._exit(1) + if not profiling: + os._exit(child()) + else: + profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) + prof = profile.Profile() + try: + ret = profile.Profile.runcall(prof, child) + finally: + prof.dump_stats(profname) + bb.utils.process_profilelog(profname) + os._exit(ret) + else: + for key, value in iter(envbackup.items()): + if value is None: + del os.environ[key] + else: + os.environ[key] = value + + return pid, pipein, pipeout + +class runQueueWorkerPipe(): + """ + Abstraction for a pipe between a worker thread and the worker server + """ + def __init__(self, pipein, pipeout): + self.input = pipein + if pipeout: + pipeout.close() + bb.utils.nonblockingfd(self.input) + self.queue = b"" + + def read(self): + start = len(self.queue) + try: + self.queue = self.queue + (self.input.read(102400) or b"") + except (OSError, IOError) as e: + if e.errno != errno.EAGAIN: + raise + + end = len(self.queue) + index = self.queue.find(b"</event>") + while index != -1: + worker_fire_prepickled(self.queue[:index+8]) + self.queue = self.queue[index+8:] + index = self.queue.find(b"</event>") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print("Warning, worker child left partial message: %s" % self.queue) + self.input.close() + +normalexit = False + +class BitbakeWorker(object): + def __init__(self, din): + self.input = din + bb.utils.nonblockingfd(self.input) + self.queue = b"" + self.cookercfg = None + self.databuilder = None + self.data = None + self.extraconfigdata = None + self.build_pids = {} + self.build_pipes = {} + + signal.signal(signal.SIGTERM, self.sigterm_exception) + # Let SIGHUP exit as SIGTERM + signal.signal(signal.SIGHUP, self.sigterm_exception) + if "beef" in sys.argv[1]: + bb.utils.set_process_name("Worker (Fakeroot)") + else: + bb.utils.set_process_name("Worker") + + def sigterm_exception(self, signum, stackframe): + if signum == signal.SIGTERM: + bb.warn("Worker received SIGTERM, shutting down...") + elif signum == signal.SIGHUP: + bb.warn("Worker received SIGHUP, shutting down...") + self.handle_finishnow(None) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTERM) + + def serve(self): + while True: + (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) + if self.input in ready: + try: + r = self.input.read() + if len(r) == 0: + # EOF on pipe, server must have terminated + self.sigterm_exception(signal.SIGTERM, None) + self.queue = self.queue + r + except (OSError, IOError): + pass + if len(self.queue): + self.handle_item(b"cookerconfig", self.handle_cookercfg) + self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) + self.handle_item(b"workerdata", self.handle_workerdata) + self.handle_item(b"runtask", self.handle_runtask) + self.handle_item(b"finishnow", self.handle_finishnow) + self.handle_item(b"ping", self.handle_ping) + self.handle_item(b"quit", self.handle_quit) + + for pipe in self.build_pipes: + if self.build_pipes[pipe].input in ready: + self.build_pipes[pipe].read() + if len(self.build_pids): + while self.process_waitpid(): + continue + + + def handle_item(self, item, func): + if self.queue.startswith(b"<" + item + b">"): + index = self.queue.find(b"</" + item + b">") + while index != -1: + func(self.queue[(len(item) + 2):index]) + self.queue = self.queue[(index + len(item) + 3):] + index = self.queue.find(b"</" + item + b">") + + def handle_cookercfg(self, data): + self.cookercfg = pickle.loads(data) + self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) + self.databuilder.parseBaseConfiguration() + self.data = self.databuilder.data + + def handle_extraconfigdata(self, data): + self.extraconfigdata = pickle.loads(data) + + def handle_workerdata(self, data): + self.workerdata = pickle.loads(data) + bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] + bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] + bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] + bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] + for mc in self.databuilder.mcdata: + self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) + + def handle_ping(self, _): + workerlog_write("Handling ping\n") + + logger.warning("Pong from bitbake-worker!") + + def handle_quit(self, data): + workerlog_write("Handling quit\n") + + global normalexit + normalexit = True + sys.exit(0) + + def handle_runtask(self, data): + fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + + pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) + + self.build_pids[pid] = task + self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) + + def process_waitpid(self): + """ + Return none is there are no processes awaiting result collection, otherwise + collect the process exit codes and close the information pipe. + """ + try: + pid, status = os.waitpid(-1, os.WNOHANG) + if pid == 0 or os.WIFSTOPPED(status): + return False + except OSError: + return False + + workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) + + if os.WIFEXITED(status): + status = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + # Per shell conventions for $?, when a process exits due to + # a signal, we return an exit code of 128 + SIGNUM + status = 128 + os.WTERMSIG(status) + + task = self.build_pids[pid] + del self.build_pids[pid] + + self.build_pipes[pid].close() + del self.build_pipes[pid] + + worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") + + return True + + def handle_finishnow(self, _): + if self.build_pids: + logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) + for k, v in iter(self.build_pids.items()): + try: + os.kill(-k, signal.SIGTERM) + os.waitpid(-1, 0) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + +try: + worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) + if not profiling: + worker.serve() + else: + profname = "profile-worker.log" + prof = profile.Profile() + try: + profile.Profile.runcall(prof, worker.serve) + finally: + prof.dump_stats(profname) + bb.utils.process_profilelog(profname) +except BaseException as e: + if not normalexit: + import traceback + sys.stderr.write(traceback.format_exc()) + sys.stderr.write(str(e)) + +worker_thread_exit = True +worker_thread.join() + +workerlog_write("exitting") +sys.exit(0) |