diff options
Diffstat (limited to 'import-layers/yocto-poky/bitbake/bin/bitbake-worker')
-rwxr-xr-x | import-layers/yocto-poky/bitbake/bin/bitbake-worker | 501 |
1 files changed, 0 insertions, 501 deletions
diff --git a/import-layers/yocto-poky/bitbake/bin/bitbake-worker b/import-layers/yocto-poky/bitbake/bin/bitbake-worker deleted file mode 100755 index e925054b7..000000000 --- a/import-layers/yocto-poky/bitbake/bin/bitbake-worker +++ /dev/null @@ -1,501 +0,0 @@ -#!/usr/bin/env python3 - -import os -import sys -import warnings -sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) -from bb import fetch2 -import logging -import bb -import select -import errno -import signal -import pickle -import traceback -import queue -from multiprocessing import Lock -from threading import Thread - -if sys.getfilesystemencoding() != "utf-8": - sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") - -# Users shouldn't be running this code directly -if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): - print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") - sys.exit(1) - -profiling = False -if sys.argv[1].startswith("decafbadbad"): - profiling = True - try: - import cProfile as profile - except: - import profile - -# Unbuffer stdout to avoid log truncation in the event -# of an unorderly exit as well as to provide timely -# updates to log files for use with tail -try: - if sys.stdout.name == '<stdout>': - import fcntl - fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) - fl |= os.O_SYNC - fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) - #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) -except: - pass - -logger = logging.getLogger("BitBake") - -worker_pipe = sys.stdout.fileno() -bb.utils.nonblockingfd(worker_pipe) -# Need to guard against multiprocessing being used in child processes -# and multiple processes trying to write to the parent at the same time -worker_pipe_lock = None - -handler = bb.event.LogHandler() -logger.addHandler(handler) - -if 0: - # Code to write out a log file of all events passing through the worker - logfilename = "/tmp/workerlogfile" - format_str = "%(levelname)s: %(message)s" - conlogformat = bb.msg.BBLogFormatter(format_str) - consolelog = logging.FileHandler(logfilename) - bb.msg.addDefaultlogFilter(consolelog) - consolelog.setFormatter(conlogformat) - logger.addHandler(consolelog) - -worker_queue = queue.Queue() - -def worker_fire(event, d): - data = b"<event>" + pickle.dumps(event) + b"</event>" - worker_fire_prepickled(data) - -def worker_fire_prepickled(event): - global worker_queue - - worker_queue.put(event) - -# -# We can end up with write contention with the cooker, it can be trying to send commands -# and we can be trying to send event data back. Therefore use a separate thread for writing -# back data to cooker. -# -worker_thread_exit = False - -def worker_flush(worker_queue): - worker_queue_int = b"" - global worker_pipe, worker_thread_exit - - while True: - try: - worker_queue_int = worker_queue_int + worker_queue.get(True, 1) - except queue.Empty: - pass - while (worker_queue_int or not worker_queue.empty()): - try: - (_, ready, _) = select.select([], [worker_pipe], [], 1) - if not worker_queue.empty(): - worker_queue_int = worker_queue_int + worker_queue.get() - written = os.write(worker_pipe, worker_queue_int) - worker_queue_int = worker_queue_int[written:] - except (IOError, OSError) as e: - if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: - raise - if worker_thread_exit and worker_queue.empty() and not worker_queue_int: - return - -worker_thread = Thread(target=worker_flush, args=(worker_queue,)) -worker_thread.start() - -def worker_child_fire(event, d): - global worker_pipe - global worker_pipe_lock - - data = b"<event>" + pickle.dumps(event) + b"</event>" - try: - worker_pipe_lock.acquire() - worker_pipe.write(data) - worker_pipe_lock.release() - except IOError: - sigterm_handler(None, None) - raise - -bb.event.worker_fire = worker_fire - -lf = None -#lf = open("/tmp/workercommandlog", "w+") -def workerlog_write(msg): - if lf: - lf.write(msg) - lf.flush() - -def sigterm_handler(signum, frame): - signal.signal(signal.SIGTERM, signal.SIG_DFL) - os.killpg(0, signal.SIGTERM) - sys.exit() - -def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): - # We need to setup the environment BEFORE the fork, since - # a fork() or exec*() activates PSEUDO... - - envbackup = {} - fakeenv = {} - umask = None - - taskdep = workerdata["taskdeps"][fn] - if 'umask' in taskdep and taskname in taskdep['umask']: - # umask might come in as a number or text string.. - try: - umask = int(taskdep['umask'][taskname],8) - except TypeError: - umask = taskdep['umask'][taskname] - - dry_run = cfg.dry_run or dry_run_exec - - # We can't use the fakeroot environment in a dry run as it possibly hasn't been built - if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: - envvars = (workerdata["fakerootenv"][fn] or "").split() - for key, value in (var.split('=') for var in envvars): - envbackup[key] = os.environ.get(key) - os.environ[key] = value - fakeenv[key] = value - - fakedirs = (workerdata["fakerootdirs"][fn] or "").split() - for p in fakedirs: - bb.utils.mkdirhier(p) - logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % - (fn, taskname, ', '.join(fakedirs))) - else: - envvars = (workerdata["fakerootnoenv"][fn] or "").split() - for key, value in (var.split('=') for var in envvars): - envbackup[key] = os.environ.get(key) - os.environ[key] = value - fakeenv[key] = value - - sys.stdout.flush() - sys.stderr.flush() - - try: - pipein, pipeout = os.pipe() - pipein = os.fdopen(pipein, 'rb', 4096) - pipeout = os.fdopen(pipeout, 'wb', 0) - pid = os.fork() - except OSError as e: - logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) - sys.exit(1) - - if pid == 0: - def child(): - global worker_pipe - global worker_pipe_lock - pipein.close() - - signal.signal(signal.SIGTERM, sigterm_handler) - # Let SIGHUP exit as SIGTERM - signal.signal(signal.SIGHUP, sigterm_handler) - bb.utils.signal_on_parent_exit("SIGTERM") - - # Save out the PID so that the event can include it the - # events - bb.event.worker_pid = os.getpid() - bb.event.worker_fire = worker_child_fire - worker_pipe = pipeout - worker_pipe_lock = Lock() - - # Make the child the process group leader and ensure no - # child process will be controlled by the current terminal - # This ensures signals sent to the controlling terminal like Ctrl+C - # don't stop the child processes. - os.setsid() - # No stdin - newsi = os.open(os.devnull, os.O_RDWR) - os.dup2(newsi, sys.stdin.fileno()) - - if umask: - os.umask(umask) - - try: - bb_cache = bb.cache.NoCache(databuilder) - (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) - the_data = databuilder.mcdata[mc] - the_data.setVar("BB_WORKERCONTEXT", "1") - the_data.setVar("BB_TASKDEPDATA", taskdepdata) - if cfg.limited_deps: - the_data.setVar("BB_LIMITEDDEPS", "1") - the_data.setVar("BUILDNAME", workerdata["buildname"]) - the_data.setVar("DATE", workerdata["date"]) - the_data.setVar("TIME", workerdata["time"]) - for varname, value in extraconfigdata.items(): - the_data.setVar(varname, value) - - bb.parse.siggen.set_taskdata(workerdata["sigdata"]) - ret = 0 - - the_data = bb_cache.loadDataFull(fn, appends) - the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) - - bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) - - # exported_vars() returns a generator which *cannot* be passed to os.environ.update() - # successfully. We also need to unset anything from the environment which shouldn't be there - exports = bb.data.exported_vars(the_data) - - bb.utils.empty_environment() - for e, v in exports: - os.environ[e] = v - - for e in fakeenv: - os.environ[e] = fakeenv[e] - the_data.setVar(e, fakeenv[e]) - the_data.setVarFlag(e, 'export', "1") - - task_exports = the_data.getVarFlag(taskname, 'exports') - if task_exports: - for e in task_exports.split(): - the_data.setVarFlag(e, 'export', '1') - v = the_data.getVar(e) - if v is not None: - os.environ[e] = v - - if quieterrors: - the_data.setVarFlag(taskname, "quieterrors", "1") - - except Exception: - if not quieterrors: - logger.critical(traceback.format_exc()) - os._exit(1) - try: - if dry_run: - return 0 - return bb.build.exec_task(fn, taskname, the_data, cfg.profile) - except: - os._exit(1) - if not profiling: - os._exit(child()) - else: - profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) - prof = profile.Profile() - try: - ret = profile.Profile.runcall(prof, child) - finally: - prof.dump_stats(profname) - bb.utils.process_profilelog(profname) - os._exit(ret) - else: - for key, value in iter(envbackup.items()): - if value is None: - del os.environ[key] - else: - os.environ[key] = value - - return pid, pipein, pipeout - -class runQueueWorkerPipe(): - """ - Abstraction for a pipe between a worker thread and the worker server - """ - def __init__(self, pipein, pipeout): - self.input = pipein - if pipeout: - pipeout.close() - bb.utils.nonblockingfd(self.input) - self.queue = b"" - - def read(self): - start = len(self.queue) - try: - self.queue = self.queue + (self.input.read(102400) or b"") - except (OSError, IOError) as e: - if e.errno != errno.EAGAIN: - raise - - end = len(self.queue) - index = self.queue.find(b"</event>") - while index != -1: - worker_fire_prepickled(self.queue[:index+8]) - self.queue = self.queue[index+8:] - index = self.queue.find(b"</event>") - return (end > start) - - def close(self): - while self.read(): - continue - if len(self.queue) > 0: - print("Warning, worker child left partial message: %s" % self.queue) - self.input.close() - -normalexit = False - -class BitbakeWorker(object): - def __init__(self, din): - self.input = din - bb.utils.nonblockingfd(self.input) - self.queue = b"" - self.cookercfg = None - self.databuilder = None - self.data = None - self.extraconfigdata = None - self.build_pids = {} - self.build_pipes = {} - - signal.signal(signal.SIGTERM, self.sigterm_exception) - # Let SIGHUP exit as SIGTERM - signal.signal(signal.SIGHUP, self.sigterm_exception) - if "beef" in sys.argv[1]: - bb.utils.set_process_name("Worker (Fakeroot)") - else: - bb.utils.set_process_name("Worker") - - def sigterm_exception(self, signum, stackframe): - if signum == signal.SIGTERM: - bb.warn("Worker received SIGTERM, shutting down...") - elif signum == signal.SIGHUP: - bb.warn("Worker received SIGHUP, shutting down...") - self.handle_finishnow(None) - signal.signal(signal.SIGTERM, signal.SIG_DFL) - os.kill(os.getpid(), signal.SIGTERM) - - def serve(self): - while True: - (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) - if self.input in ready: - try: - r = self.input.read() - if len(r) == 0: - # EOF on pipe, server must have terminated - self.sigterm_exception(signal.SIGTERM, None) - self.queue = self.queue + r - except (OSError, IOError): - pass - if len(self.queue): - self.handle_item(b"cookerconfig", self.handle_cookercfg) - self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) - self.handle_item(b"workerdata", self.handle_workerdata) - self.handle_item(b"runtask", self.handle_runtask) - self.handle_item(b"finishnow", self.handle_finishnow) - self.handle_item(b"ping", self.handle_ping) - self.handle_item(b"quit", self.handle_quit) - - for pipe in self.build_pipes: - if self.build_pipes[pipe].input in ready: - self.build_pipes[pipe].read() - if len(self.build_pids): - while self.process_waitpid(): - continue - - - def handle_item(self, item, func): - if self.queue.startswith(b"<" + item + b">"): - index = self.queue.find(b"</" + item + b">") - while index != -1: - func(self.queue[(len(item) + 2):index]) - self.queue = self.queue[(index + len(item) + 3):] - index = self.queue.find(b"</" + item + b">") - - def handle_cookercfg(self, data): - self.cookercfg = pickle.loads(data) - self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) - self.databuilder.parseBaseConfiguration() - self.data = self.databuilder.data - - def handle_extraconfigdata(self, data): - self.extraconfigdata = pickle.loads(data) - - def handle_workerdata(self, data): - self.workerdata = pickle.loads(data) - bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] - bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] - bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] - bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] - for mc in self.databuilder.mcdata: - self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) - - def handle_ping(self, _): - workerlog_write("Handling ping\n") - - logger.warning("Pong from bitbake-worker!") - - def handle_quit(self, data): - workerlog_write("Handling quit\n") - - global normalexit - normalexit = True - sys.exit(0) - - def handle_runtask(self, data): - fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) - workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) - - pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) - - self.build_pids[pid] = task - self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) - - def process_waitpid(self): - """ - Return none is there are no processes awaiting result collection, otherwise - collect the process exit codes and close the information pipe. - """ - try: - pid, status = os.waitpid(-1, os.WNOHANG) - if pid == 0 or os.WIFSTOPPED(status): - return False - except OSError: - return False - - workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) - - if os.WIFEXITED(status): - status = os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - # Per shell conventions for $?, when a process exits due to - # a signal, we return an exit code of 128 + SIGNUM - status = 128 + os.WTERMSIG(status) - - task = self.build_pids[pid] - del self.build_pids[pid] - - self.build_pipes[pid].close() - del self.build_pipes[pid] - - worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") - - return True - - def handle_finishnow(self, _): - if self.build_pids: - logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) - for k, v in iter(self.build_pids.items()): - try: - os.kill(-k, signal.SIGTERM) - os.waitpid(-1, 0) - except: - pass - for pipe in self.build_pipes: - self.build_pipes[pipe].read() - -try: - worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) - if not profiling: - worker.serve() - else: - profname = "profile-worker.log" - prof = profile.Profile() - try: - profile.Profile.runcall(prof, worker.serve) - finally: - prof.dump_stats(profname) - bb.utils.process_profilelog(profname) -except BaseException as e: - if not normalexit: - import traceback - sys.stderr.write(traceback.format_exc()) - sys.stderr.write(str(e)) - -worker_thread_exit = True -worker_thread.join() - -workerlog_write("exitting") -sys.exit(0) |