diff options
Diffstat (limited to 'yocto-poky/bitbake/bin/bitbake-worker')
-rwxr-xr-x | yocto-poky/bitbake/bin/bitbake-worker | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/yocto-poky/bitbake/bin/bitbake-worker b/yocto-poky/bitbake/bin/bitbake-worker new file mode 100755 index 0000000000..af17b874aa --- /dev/null +++ b/yocto-poky/bitbake/bin/bitbake-worker @@ -0,0 +1,432 @@ +#!/usr/bin/env python + +import os +import sys +import warnings +sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) +from bb import fetch2 +import logging +import bb +import select +import errno +import signal + +# Users shouldn't be running this code directly +if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): + print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") + sys.exit(1) + +profiling = False +if sys.argv[1] == "decafbadbad": + profiling = True + try: + import cProfile as profile + except: + import profile + +# Unbuffer stdout to avoid log truncation in the event +# of an unorderly exit as well as to provide timely +# updates to log files for use with tail +try: + if sys.stdout.name == '<stdout>': + sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) +except: + pass + +logger = logging.getLogger("BitBake") + +try: + import cPickle as pickle +except ImportError: + import pickle + bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.") + + +worker_pipe = sys.stdout.fileno() +bb.utils.nonblockingfd(worker_pipe) + +handler = bb.event.LogHandler() +logger.addHandler(handler) + +if 0: + # Code to write out a log file of all events passing through the worker + logfilename = "/tmp/workerlogfile" + format_str = "%(levelname)s: %(message)s" + conlogformat = bb.msg.BBLogFormatter(format_str) + consolelog = logging.FileHandler(logfilename) + bb.msg.addDefaultlogFilter(consolelog) + consolelog.setFormatter(conlogformat) + logger.addHandler(consolelog) + +worker_queue = "" + +def worker_fire(event, d): + data = "<event>" + pickle.dumps(event) + "</event>" + worker_fire_prepickled(data) + +def worker_fire_prepickled(event): + global worker_queue + + worker_queue = worker_queue + event + worker_flush() + +def worker_flush(): + global worker_queue, worker_pipe + + if not worker_queue: + return + + try: + written = os.write(worker_pipe, worker_queue) + worker_queue = worker_queue[written:] + except (IOError, OSError) as e: + if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: + raise + +def worker_child_fire(event, d): + global worker_pipe + + data = "<event>" + pickle.dumps(event) + "</event>" + try: + worker_pipe.write(data) + except IOError: + sigterm_handler(None, None) + raise + +bb.event.worker_fire = worker_fire + +lf = None +#lf = open("/tmp/workercommandlog", "w+") +def workerlog_write(msg): + if lf: + lf.write(msg) + lf.flush() + +def sigterm_handler(signum, frame): + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.killpg(0, signal.SIGTERM) + sys.exit() + +def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False): + # We need to setup the environment BEFORE the fork, since + # a fork() or exec*() activates PSEUDO... + + envbackup = {} + fakeenv = {} + umask = None + + taskdep = workerdata["taskdeps"][fn] + if 'umask' in taskdep and taskname in taskdep['umask']: + # umask might come in as a number or text string.. + try: + umask = int(taskdep['umask'][taskname],8) + except TypeError: + umask = taskdep['umask'][taskname] + + # We can't use the fakeroot environment in a dry run as it possibly hasn't been built + if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run: + envvars = (workerdata["fakerootenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + fakedirs = (workerdata["fakerootdirs"][fn] or "").split() + for p in fakedirs: + bb.utils.mkdirhier(p) + logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' % + (fn, taskname, ', '.join(fakedirs))) + else: + envvars = (workerdata["fakerootnoenv"][fn] or "").split() + for key, value in (var.split('=') for var in envvars): + envbackup[key] = os.environ.get(key) + os.environ[key] = value + fakeenv[key] = value + + sys.stdout.flush() + sys.stderr.flush() + + try: + pipein, pipeout = os.pipe() + pipein = os.fdopen(pipein, 'rb', 4096) + pipeout = os.fdopen(pipeout, 'wb', 0) + pid = os.fork() + except OSError as e: + bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror)) + + if pid == 0: + def child(): + global worker_pipe + pipein.close() + + signal.signal(signal.SIGTERM, sigterm_handler) + # Let SIGHUP exit as SIGTERM + signal.signal(signal.SIGHUP, sigterm_handler) + bb.utils.signal_on_parent_exit("SIGTERM") + + # Save out the PID so that the event can include it the + # events + bb.event.worker_pid = os.getpid() + bb.event.worker_fire = worker_child_fire + worker_pipe = pipeout + + # Make the child the process group leader and ensure no + # child process will be controlled by the current terminal + # This ensures signals sent to the controlling terminal like Ctrl+C + # don't stop the child processes. + os.setsid() + # No stdin + newsi = os.open(os.devnull, os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) + + if umask: + os.umask(umask) + + data.setVar("BB_WORKERCONTEXT", "1") + data.setVar("BB_TASKDEPDATA", taskdepdata) + data.setVar("BUILDNAME", workerdata["buildname"]) + data.setVar("DATE", workerdata["date"]) + data.setVar("TIME", workerdata["time"]) + bb.parse.siggen.set_taskdata(workerdata["sigdata"]) + ret = 0 + try: + the_data = bb.cache.Cache.loadDataFull(fn, appends, data) + the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task]) + + # exported_vars() returns a generator which *cannot* be passed to os.environ.update() + # successfully. We also need to unset anything from the environment which shouldn't be there + exports = bb.data.exported_vars(the_data) + bb.utils.empty_environment() + for e, v in exports: + os.environ[e] = v + for e in fakeenv: + os.environ[e] = fakeenv[e] + the_data.setVar(e, fakeenv[e]) + the_data.setVarFlag(e, 'export', "1") + + if quieterrors: + the_data.setVarFlag(taskname, "quieterrors", "1") + + except Exception as exc: + if not quieterrors: + logger.critical(str(exc)) + os._exit(1) + try: + if cfg.dry_run: + return 0 + return bb.build.exec_task(fn, taskname, the_data, cfg.profile) + except: + os._exit(1) + if not profiling: + os._exit(child()) + else: + profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) + prof = profile.Profile() + try: + ret = profile.Profile.runcall(prof, child) + finally: + prof.dump_stats(profname) + bb.utils.process_profilelog(profname) + os._exit(ret) + else: + for key, value in envbackup.iteritems(): + if value is None: + del os.environ[key] + else: + os.environ[key] = value + + return pid, pipein, pipeout + +class runQueueWorkerPipe(): + """ + Abstraction for a pipe between a worker thread and the worker server + """ + def __init__(self, pipein, pipeout): + self.input = pipein + if pipeout: + pipeout.close() + bb.utils.nonblockingfd(self.input) + self.queue = "" + + def read(self): + start = len(self.queue) + try: + self.queue = self.queue + self.input.read(102400) + except (OSError, IOError) as e: + if e.errno != errno.EAGAIN: + raise + + end = len(self.queue) + index = self.queue.find("</event>") + while index != -1: + worker_fire_prepickled(self.queue[:index+8]) + self.queue = self.queue[index+8:] + index = self.queue.find("</event>") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print("Warning, worker child left partial message: %s" % self.queue) + self.input.close() + +normalexit = False + +class BitbakeWorker(object): + def __init__(self, din): + self.input = din + bb.utils.nonblockingfd(self.input) + self.queue = "" + self.cookercfg = None + self.databuilder = None + self.data = None + self.build_pids = {} + self.build_pipes = {} + + signal.signal(signal.SIGTERM, self.sigterm_exception) + # Let SIGHUP exit as SIGTERM + signal.signal(signal.SIGHUP, self.sigterm_exception) + + def sigterm_exception(self, signum, stackframe): + if signum == signal.SIGTERM: + bb.warn("Worker recieved SIGTERM, shutting down...") + elif signum == signal.SIGHUP: + bb.warn("Worker recieved SIGHUP, shutting down...") + self.handle_finishnow(None) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + os.kill(os.getpid(), signal.SIGTERM) + + def serve(self): + while True: + (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) + if self.input in ready: + try: + r = self.input.read() + if len(r) == 0: + # EOF on pipe, server must have terminated + self.sigterm_exception(signal.SIGTERM, None) + self.queue = self.queue + r + except (OSError, IOError): + pass + if len(self.queue): + self.handle_item("cookerconfig", self.handle_cookercfg) + self.handle_item("workerdata", self.handle_workerdata) + self.handle_item("runtask", self.handle_runtask) + self.handle_item("finishnow", self.handle_finishnow) + self.handle_item("ping", self.handle_ping) + self.handle_item("quit", self.handle_quit) + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + if len(self.build_pids): + self.process_waitpid() + worker_flush() + + + def handle_item(self, item, func): + if self.queue.startswith("<" + item + ">"): + index = self.queue.find("</" + item + ">") + while index != -1: + func(self.queue[(len(item) + 2):index]) + self.queue = self.queue[(index + len(item) + 3):] + index = self.queue.find("</" + item + ">") + + def handle_cookercfg(self, data): + self.cookercfg = pickle.loads(data) + self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) + self.databuilder.parseBaseConfiguration() + self.data = self.databuilder.data + + def handle_workerdata(self, data): + self.workerdata = pickle.loads(data) + bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"] + bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"] + bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"] + bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] + self.data.setVar("PRSERV_HOST", self.workerdata["prhost"]) + + def handle_ping(self, _): + workerlog_write("Handling ping\n") + + logger.warn("Pong from bitbake-worker!") + + def handle_quit(self, data): + workerlog_write("Handling quit\n") + + global normalexit + normalexit = True + sys.exit(0) + + def handle_runtask(self, data): + fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data) + workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) + + pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors) + + self.build_pids[pid] = task + self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) + + def process_waitpid(self): + """ + Return none is there are no processes awaiting result collection, otherwise + collect the process exit codes and close the information pipe. + """ + try: + pid, status = os.waitpid(-1, os.WNOHANG) + if pid == 0 or os.WIFSTOPPED(status): + return None + except OSError: + return None + + workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) + + if os.WIFEXITED(status): + status = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + # Per shell conventions for $?, when a process exits due to + # a signal, we return an exit code of 128 + SIGNUM + status = 128 + os.WTERMSIG(status) + + task = self.build_pids[pid] + del self.build_pids[pid] + + self.build_pipes[pid].close() + del self.build_pipes[pid] + + worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>") + + def handle_finishnow(self, _): + if self.build_pids: + logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) + for k, v in self.build_pids.iteritems(): + try: + os.kill(-k, signal.SIGTERM) + os.waitpid(-1, 0) + except: + pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + +try: + worker = BitbakeWorker(sys.stdin) + if not profiling: + worker.serve() + else: + profname = "profile-worker.log" + prof = profile.Profile() + try: + profile.Profile.runcall(prof, worker.serve) + finally: + prof.dump_stats(profname) + bb.utils.process_profilelog(profname) +except BaseException as e: + if not normalexit: + import traceback + sys.stderr.write(traceback.format_exc()) + sys.stderr.write(str(e)) +while len(worker_queue): + worker_flush() +workerlog_write("exitting") +sys.exit(0) + |