123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- #!/usr/bin/env python3
- import os
- import sys
- import warnings
- sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
- from bb import fetch2
- import logging
- import bb
- import select
- import errno
- import signal
- import pickle
- import traceback
- import queue
- from multiprocessing import Lock
- from threading import Thread
- if sys.getfilesystemencoding() != "utf-8":
- sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
- # Users shouldn't be running this code directly
- if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
- print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
- sys.exit(1)
- profiling = False
- if sys.argv[1].startswith("decafbadbad"):
- profiling = True
- try:
- import cProfile as profile
- except:
- import profile
- # Unbuffer stdout to avoid log truncation in the event
- # of an unorderly exit as well as to provide timely
- # updates to log files for use with tail
- try:
- if sys.stdout.name == '<stdout>':
- import fcntl
- fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
- fl |= os.O_SYNC
- fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
- #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
- except:
- pass
- logger = logging.getLogger("BitBake")
- worker_pipe = sys.stdout.fileno()
- bb.utils.nonblockingfd(worker_pipe)
- # Need to guard against multiprocessing being used in child processes
- # and multiple processes trying to write to the parent at the same time
- worker_pipe_lock = None
- handler = bb.event.LogHandler()
- logger.addHandler(handler)
- if 0:
- # Code to write out a log file of all events passing through the worker
- logfilename = "/tmp/workerlogfile"
- format_str = "%(levelname)s: %(message)s"
- conlogformat = bb.msg.BBLogFormatter(format_str)
- consolelog = logging.FileHandler(logfilename)
- bb.msg.addDefaultlogFilter(consolelog)
- consolelog.setFormatter(conlogformat)
- logger.addHandler(consolelog)
- worker_queue = queue.Queue()
- def worker_fire(event, d):
- data = b"<event>" + pickle.dumps(event) + b"</event>"
- worker_fire_prepickled(data)
- def worker_fire_prepickled(event):
- global worker_queue
- worker_queue.put(event)
- #
- # We can end up with write contention with the cooker, it can be trying to send commands
- # and we can be trying to send event data back. Therefore use a separate thread for writing
- # back data to cooker.
- #
- worker_thread_exit = False
- def worker_flush(worker_queue):
- worker_queue_int = b""
- global worker_pipe, worker_thread_exit
- while True:
- try:
- worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
- except queue.Empty:
- pass
- while (worker_queue_int or not worker_queue.empty()):
- try:
- (_, ready, _) = select.select([], [worker_pipe], [], 1)
- if not worker_queue.empty():
- worker_queue_int = worker_queue_int + worker_queue.get()
- written = os.write(worker_pipe, worker_queue_int)
- worker_queue_int = worker_queue_int[written:]
- except (IOError, OSError) as e:
- if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
- raise
- if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
- return
- worker_thread = Thread(target=worker_flush, args=(worker_queue,))
- worker_thread.start()
- def worker_child_fire(event, d):
- global worker_pipe
- global worker_pipe_lock
- data = b"<event>" + pickle.dumps(event) + b"</event>"
- try:
- worker_pipe_lock.acquire()
- worker_pipe.write(data)
- worker_pipe_lock.release()
- except IOError:
- sigterm_handler(None, None)
- raise
- bb.event.worker_fire = worker_fire
- lf = None
- #lf = open("/tmp/workercommandlog", "w+")
- def workerlog_write(msg):
- if lf:
- lf.write(msg)
- lf.flush()
- def sigterm_handler(signum, frame):
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- os.killpg(0, signal.SIGTERM)
- sys.exit()
- def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
- # We need to setup the environment BEFORE the fork, since
- # a fork() or exec*() activates PSEUDO...
- envbackup = {}
- fakeenv = {}
- umask = None
- taskdep = workerdata["taskdeps"][fn]
- if 'umask' in taskdep and taskname in taskdep['umask']:
- # umask might come in as a number or text string..
- try:
- umask = int(taskdep['umask'][taskname],8)
- except TypeError:
- umask = taskdep['umask'][taskname]
- dry_run = cfg.dry_run or dry_run_exec
- # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
- if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
- envvars = (workerdata["fakerootenv"][fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
- fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
- for p in fakedirs:
- bb.utils.mkdirhier(p)
- logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
- (fn, taskname, ', '.join(fakedirs)))
- else:
- envvars = (workerdata["fakerootnoenv"][fn] or "").split()
- for key, value in (var.split('=') for var in envvars):
- envbackup[key] = os.environ.get(key)
- os.environ[key] = value
- fakeenv[key] = value
- sys.stdout.flush()
- sys.stderr.flush()
- try:
- pipein, pipeout = os.pipe()
- pipein = os.fdopen(pipein, 'rb', 4096)
- pipeout = os.fdopen(pipeout, 'wb', 0)
- pid = os.fork()
- except OSError as e:
- logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
- sys.exit(1)
- if pid == 0:
- def child():
- global worker_pipe
- global worker_pipe_lock
- pipein.close()
- bb.utils.signal_on_parent_exit("SIGTERM")
- # Save out the PID so that the event can include it the
- # events
- bb.event.worker_pid = os.getpid()
- bb.event.worker_fire = worker_child_fire
- worker_pipe = pipeout
- worker_pipe_lock = Lock()
- # Make the child the process group leader and ensure no
- # child process will be controlled by the current terminal
- # This ensures signals sent to the controlling terminal like Ctrl+C
- # don't stop the child processes.
- os.setsid()
- signal.signal(signal.SIGTERM, sigterm_handler)
- # Let SIGHUP exit as SIGTERM
- signal.signal(signal.SIGHUP, sigterm_handler)
- # No stdin
- newsi = os.open(os.devnull, os.O_RDWR)
- os.dup2(newsi, sys.stdin.fileno())
- if umask:
- os.umask(umask)
- try:
- bb_cache = bb.cache.NoCache(databuilder)
- (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
- the_data = databuilder.mcdata[mc]
- the_data.setVar("BB_WORKERCONTEXT", "1")
- the_data.setVar("BB_TASKDEPDATA", taskdepdata)
- if cfg.limited_deps:
- the_data.setVar("BB_LIMITEDDEPS", "1")
- the_data.setVar("BUILDNAME", workerdata["buildname"])
- the_data.setVar("DATE", workerdata["date"])
- the_data.setVar("TIME", workerdata["time"])
- for varname, value in extraconfigdata.items():
- the_data.setVar(varname, value)
- bb.parse.siggen.set_taskdata(workerdata["sigdata"])
- ret = 0
- the_data = bb_cache.loadDataFull(fn, appends)
- the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
- bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
- # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
- # successfully. We also need to unset anything from the environment which shouldn't be there
- exports = bb.data.exported_vars(the_data)
- bb.utils.empty_environment()
- for e, v in exports:
- os.environ[e] = v
- for e in fakeenv:
- os.environ[e] = fakeenv[e]
- the_data.setVar(e, fakeenv[e])
- the_data.setVarFlag(e, 'export', "1")
- task_exports = the_data.getVarFlag(taskname, 'exports')
- if task_exports:
- for e in task_exports.split():
- the_data.setVarFlag(e, 'export', '1')
- v = the_data.getVar(e)
- if v is not None:
- os.environ[e] = v
- if quieterrors:
- the_data.setVarFlag(taskname, "quieterrors", "1")
- except Exception:
- if not quieterrors:
- logger.critical(traceback.format_exc())
- os._exit(1)
- try:
- if dry_run:
- return 0
- return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
- except:
- os._exit(1)
- if not profiling:
- os._exit(child())
- else:
- profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
- prof = profile.Profile()
- try:
- ret = profile.Profile.runcall(prof, child)
- finally:
- prof.dump_stats(profname)
- bb.utils.process_profilelog(profname)
- os._exit(ret)
- else:
- for key, value in iter(envbackup.items()):
- if value is None:
- del os.environ[key]
- else:
- os.environ[key] = value
- return pid, pipein, pipeout
- class runQueueWorkerPipe():
- """
- Abstraction for a pipe between a worker thread and the worker server
- """
- def __init__(self, pipein, pipeout):
- self.input = pipein
- if pipeout:
- pipeout.close()
- bb.utils.nonblockingfd(self.input)
- self.queue = b""
- def read(self):
- start = len(self.queue)
- try:
- self.queue = self.queue + (self.input.read(102400) or b"")
- except (OSError, IOError) as e:
- if e.errno != errno.EAGAIN:
- raise
- end = len(self.queue)
- index = self.queue.find(b"</event>")
- while index != -1:
- worker_fire_prepickled(self.queue[:index+8])
- self.queue = self.queue[index+8:]
- index = self.queue.find(b"</event>")
- return (end > start)
- def close(self):
- while self.read():
- continue
- if len(self.queue) > 0:
- print("Warning, worker child left partial message: %s" % self.queue)
- self.input.close()
- normalexit = False
- class BitbakeWorker(object):
- def __init__(self, din):
- self.input = din
- bb.utils.nonblockingfd(self.input)
- self.queue = b""
- self.cookercfg = None
- self.databuilder = None
- self.data = None
- self.extraconfigdata = None
- self.build_pids = {}
- self.build_pipes = {}
-
- signal.signal(signal.SIGTERM, self.sigterm_exception)
- # Let SIGHUP exit as SIGTERM
- signal.signal(signal.SIGHUP, self.sigterm_exception)
- if "beef" in sys.argv[1]:
- bb.utils.set_process_name("Worker (Fakeroot)")
- else:
- bb.utils.set_process_name("Worker")
- def sigterm_exception(self, signum, stackframe):
- if signum == signal.SIGTERM:
- bb.warn("Worker received SIGTERM, shutting down...")
- elif signum == signal.SIGHUP:
- bb.warn("Worker received SIGHUP, shutting down...")
- self.handle_finishnow(None)
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- os.kill(os.getpid(), signal.SIGTERM)
- def serve(self):
- while True:
- (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
- if self.input in ready:
- try:
- r = self.input.read()
- if len(r) == 0:
- # EOF on pipe, server must have terminated
- self.sigterm_exception(signal.SIGTERM, None)
- self.queue = self.queue + r
- except (OSError, IOError):
- pass
- if len(self.queue):
- self.handle_item(b"cookerconfig", self.handle_cookercfg)
- self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
- self.handle_item(b"workerdata", self.handle_workerdata)
- self.handle_item(b"runtask", self.handle_runtask)
- self.handle_item(b"finishnow", self.handle_finishnow)
- self.handle_item(b"ping", self.handle_ping)
- self.handle_item(b"quit", self.handle_quit)
- for pipe in self.build_pipes:
- if self.build_pipes[pipe].input in ready:
- self.build_pipes[pipe].read()
- if len(self.build_pids):
- while self.process_waitpid():
- continue
- def handle_item(self, item, func):
- if self.queue.startswith(b"<" + item + b">"):
- index = self.queue.find(b"</" + item + b">")
- while index != -1:
- func(self.queue[(len(item) + 2):index])
- self.queue = self.queue[(index + len(item) + 3):]
- index = self.queue.find(b"</" + item + b">")
- def handle_cookercfg(self, data):
- self.cookercfg = pickle.loads(data)
- self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
- self.databuilder.parseBaseConfiguration()
- self.data = self.databuilder.data
- def handle_extraconfigdata(self, data):
- self.extraconfigdata = pickle.loads(data)
- def handle_workerdata(self, data):
- self.workerdata = pickle.loads(data)
- bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
- bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
- bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
- bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
- for mc in self.databuilder.mcdata:
- self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
- def handle_ping(self, _):
- workerlog_write("Handling ping\n")
- logger.warning("Pong from bitbake-worker!")
- def handle_quit(self, data):
- workerlog_write("Handling quit\n")
- global normalexit
- normalexit = True
- sys.exit(0)
- def handle_runtask(self, data):
- fn, task, taskname, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
- workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
- pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
- self.build_pids[pid] = task
- self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
- def process_waitpid(self):
- """
- Return none is there are no processes awaiting result collection, otherwise
- collect the process exit codes and close the information pipe.
- """
- try:
- pid, status = os.waitpid(-1, os.WNOHANG)
- if pid == 0 or os.WIFSTOPPED(status):
- return False
- except OSError:
- return False
- workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
- if os.WIFEXITED(status):
- status = os.WEXITSTATUS(status)
- elif os.WIFSIGNALED(status):
- # Per shell conventions for $?, when a process exits due to
- # a signal, we return an exit code of 128 + SIGNUM
- status = 128 + os.WTERMSIG(status)
- task = self.build_pids[pid]
- del self.build_pids[pid]
- self.build_pipes[pid].close()
- del self.build_pipes[pid]
- worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
- return True
- def handle_finishnow(self, _):
- if self.build_pids:
- logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
- for k, v in iter(self.build_pids.items()):
- try:
- os.kill(-k, signal.SIGTERM)
- os.waitpid(-1, 0)
- except:
- pass
- for pipe in self.build_pipes:
- self.build_pipes[pipe].read()
- try:
- worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
- if not profiling:
- worker.serve()
- else:
- profname = "profile-worker.log"
- prof = profile.Profile()
- try:
- profile.Profile.runcall(prof, worker.serve)
- finally:
- prof.dump_stats(profname)
- bb.utils.process_profilelog(profname)
- except BaseException as e:
- if not normalexit:
- import traceback
- sys.stderr.write(traceback.format_exc())
- sys.stderr.write(str(e))
- worker_thread_exit = True
- worker_thread.join()
- workerlog_write("exitting")
- sys.exit(0)
|