bitbake-worker 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. import signal
  12. import pickle
  13. import traceback
  14. from multiprocessing import Lock
  15. if sys.getfilesystemencoding() != "utf-8":
  16. sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
  17. # Users shouldn't be running this code directly
  18. if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
  19. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  20. sys.exit(1)
  21. profiling = False
  22. if sys.argv[1].startswith("decafbadbad"):
  23. profiling = True
  24. try:
  25. import cProfile as profile
  26. except:
  27. import profile
  28. # Unbuffer stdout to avoid log truncation in the event
  29. # of an unorderly exit as well as to provide timely
  30. # updates to log files for use with tail
  31. try:
  32. if sys.stdout.name == '<stdout>':
  33. import fcntl
  34. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  35. fl |= os.O_SYNC
  36. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
  37. #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  38. except:
  39. pass
  40. logger = logging.getLogger("BitBake")
  41. worker_pipe = sys.stdout.fileno()
  42. bb.utils.nonblockingfd(worker_pipe)
  43. # Need to guard against multiprocessing being used in child processes
  44. # and multiple processes trying to write to the parent at the same time
  45. worker_pipe_lock = None
  46. handler = bb.event.LogHandler()
  47. logger.addHandler(handler)
  48. if 0:
  49. # Code to write out a log file of all events passing through the worker
  50. logfilename = "/tmp/workerlogfile"
  51. format_str = "%(levelname)s: %(message)s"
  52. conlogformat = bb.msg.BBLogFormatter(format_str)
  53. consolelog = logging.FileHandler(logfilename)
  54. bb.msg.addDefaultlogFilter(consolelog)
  55. consolelog.setFormatter(conlogformat)
  56. logger.addHandler(consolelog)
  57. worker_queue = b""
  58. def worker_fire(event, d):
  59. data = b"<event>" + pickle.dumps(event) + b"</event>"
  60. worker_fire_prepickled(data)
  61. def worker_fire_prepickled(event):
  62. global worker_queue
  63. worker_queue = worker_queue + event
  64. worker_flush()
  65. def worker_flush():
  66. global worker_queue, worker_pipe
  67. if not worker_queue:
  68. return
  69. try:
  70. written = os.write(worker_pipe, worker_queue)
  71. worker_queue = worker_queue[written:]
  72. except (IOError, OSError) as e:
  73. if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
  74. raise
  75. def worker_child_fire(event, d):
  76. global worker_pipe
  77. global worker_pipe_lock
  78. data = b"<event>" + pickle.dumps(event) + b"</event>"
  79. try:
  80. worker_pipe_lock.acquire()
  81. worker_pipe.write(data)
  82. worker_pipe_lock.release()
  83. except IOError:
  84. sigterm_handler(None, None)
  85. raise
  86. bb.event.worker_fire = worker_fire
  87. lf = None
  88. #lf = open("/tmp/workercommandlog", "w+")
  89. def workerlog_write(msg):
  90. if lf:
  91. lf.write(msg)
  92. lf.flush()
  93. def sigterm_handler(signum, frame):
  94. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  95. os.killpg(0, signal.SIGTERM)
  96. sys.exit()
  97. def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
  98. # We need to setup the environment BEFORE the fork, since
  99. # a fork() or exec*() activates PSEUDO...
  100. envbackup = {}
  101. fakeenv = {}
  102. umask = None
  103. taskdep = workerdata["taskdeps"][fn]
  104. if 'umask' in taskdep and taskname in taskdep['umask']:
  105. # umask might come in as a number or text string..
  106. try:
  107. umask = int(taskdep['umask'][taskname],8)
  108. except TypeError:
  109. umask = taskdep['umask'][taskname]
  110. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  111. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
  112. envvars = (workerdata["fakerootenv"][fn] or "").split()
  113. for key, value in (var.split('=') for var in envvars):
  114. envbackup[key] = os.environ.get(key)
  115. os.environ[key] = value
  116. fakeenv[key] = value
  117. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  118. for p in fakedirs:
  119. bb.utils.mkdirhier(p)
  120. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  121. (fn, taskname, ', '.join(fakedirs)))
  122. else:
  123. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  124. for key, value in (var.split('=') for var in envvars):
  125. envbackup[key] = os.environ.get(key)
  126. os.environ[key] = value
  127. fakeenv[key] = value
  128. sys.stdout.flush()
  129. sys.stderr.flush()
  130. try:
  131. pipein, pipeout = os.pipe()
  132. pipein = os.fdopen(pipein, 'rb', 4096)
  133. pipeout = os.fdopen(pipeout, 'wb', 0)
  134. pid = os.fork()
  135. except OSError as e:
  136. logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
  137. sys.exit(1)
  138. if pid == 0:
  139. def child():
  140. global worker_pipe
  141. global worker_pipe_lock
  142. pipein.close()
  143. signal.signal(signal.SIGTERM, sigterm_handler)
  144. # Let SIGHUP exit as SIGTERM
  145. signal.signal(signal.SIGHUP, sigterm_handler)
  146. bb.utils.signal_on_parent_exit("SIGTERM")
  147. # Save out the PID so that the event can include it the
  148. # events
  149. bb.event.worker_pid = os.getpid()
  150. bb.event.worker_fire = worker_child_fire
  151. worker_pipe = pipeout
  152. worker_pipe_lock = Lock()
  153. # Make the child the process group leader and ensure no
  154. # child process will be controlled by the current terminal
  155. # This ensures signals sent to the controlling terminal like Ctrl+C
  156. # don't stop the child processes.
  157. os.setsid()
  158. # No stdin
  159. newsi = os.open(os.devnull, os.O_RDWR)
  160. os.dup2(newsi, sys.stdin.fileno())
  161. if umask:
  162. os.umask(umask)
  163. try:
  164. bb_cache = bb.cache.NoCache(databuilder)
  165. (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
  166. the_data = databuilder.mcdata[mc]
  167. the_data.setVar("BB_WORKERCONTEXT", "1")
  168. the_data.setVar("BB_TASKDEPDATA", taskdepdata)
  169. the_data.setVar("BUILDNAME", workerdata["buildname"])
  170. the_data.setVar("DATE", workerdata["date"])
  171. the_data.setVar("TIME", workerdata["time"])
  172. bb.parse.siggen.set_taskdata(workerdata["sigdata"])
  173. ret = 0
  174. the_data = bb_cache.loadDataFull(fn, appends)
  175. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  176. bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
  177. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  178. # successfully. We also need to unset anything from the environment which shouldn't be there
  179. exports = bb.data.exported_vars(the_data)
  180. bb.utils.empty_environment()
  181. for e, v in exports:
  182. os.environ[e] = v
  183. for e in fakeenv:
  184. os.environ[e] = fakeenv[e]
  185. the_data.setVar(e, fakeenv[e])
  186. the_data.setVarFlag(e, 'export', "1")
  187. task_exports = the_data.getVarFlag(taskname, 'exports', True)
  188. if task_exports:
  189. for e in task_exports.split():
  190. the_data.setVarFlag(e, 'export', '1')
  191. v = the_data.getVar(e, True)
  192. if v is not None:
  193. os.environ[e] = v
  194. if quieterrors:
  195. the_data.setVarFlag(taskname, "quieterrors", "1")
  196. except Exception:
  197. if not quieterrors:
  198. logger.critical(traceback.format_exc())
  199. os._exit(1)
  200. try:
  201. if cfg.dry_run:
  202. return 0
  203. return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  204. except:
  205. os._exit(1)
  206. if not profiling:
  207. os._exit(child())
  208. else:
  209. profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
  210. prof = profile.Profile()
  211. try:
  212. ret = profile.Profile.runcall(prof, child)
  213. finally:
  214. prof.dump_stats(profname)
  215. bb.utils.process_profilelog(profname)
  216. os._exit(ret)
  217. else:
  218. for key, value in iter(envbackup.items()):
  219. if value is None:
  220. del os.environ[key]
  221. else:
  222. os.environ[key] = value
  223. return pid, pipein, pipeout
  224. class runQueueWorkerPipe():
  225. """
  226. Abstraction for a pipe between a worker thread and the worker server
  227. """
  228. def __init__(self, pipein, pipeout):
  229. self.input = pipein
  230. if pipeout:
  231. pipeout.close()
  232. bb.utils.nonblockingfd(self.input)
  233. self.queue = b""
  234. def read(self):
  235. start = len(self.queue)
  236. try:
  237. self.queue = self.queue + (self.input.read(102400) or b"")
  238. except (OSError, IOError) as e:
  239. if e.errno != errno.EAGAIN:
  240. raise
  241. end = len(self.queue)
  242. index = self.queue.find(b"</event>")
  243. while index != -1:
  244. worker_fire_prepickled(self.queue[:index+8])
  245. self.queue = self.queue[index+8:]
  246. index = self.queue.find(b"</event>")
  247. return (end > start)
  248. def close(self):
  249. while self.read():
  250. continue
  251. if len(self.queue) > 0:
  252. print("Warning, worker child left partial message: %s" % self.queue)
  253. self.input.close()
  254. normalexit = False
  255. class BitbakeWorker(object):
  256. def __init__(self, din):
  257. self.input = din
  258. bb.utils.nonblockingfd(self.input)
  259. self.queue = b""
  260. self.cookercfg = None
  261. self.databuilder = None
  262. self.data = None
  263. self.build_pids = {}
  264. self.build_pipes = {}
  265. signal.signal(signal.SIGTERM, self.sigterm_exception)
  266. # Let SIGHUP exit as SIGTERM
  267. signal.signal(signal.SIGHUP, self.sigterm_exception)
  268. if "beef" in sys.argv[1]:
  269. bb.utils.set_process_name("Worker (Fakeroot)")
  270. else:
  271. bb.utils.set_process_name("Worker")
  272. def sigterm_exception(self, signum, stackframe):
  273. if signum == signal.SIGTERM:
  274. bb.warn("Worker received SIGTERM, shutting down...")
  275. elif signum == signal.SIGHUP:
  276. bb.warn("Worker received SIGHUP, shutting down...")
  277. self.handle_finishnow(None)
  278. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  279. os.kill(os.getpid(), signal.SIGTERM)
  280. def serve(self):
  281. while True:
  282. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  283. if self.input in ready:
  284. try:
  285. r = self.input.read()
  286. if len(r) == 0:
  287. # EOF on pipe, server must have terminated
  288. self.sigterm_exception(signal.SIGTERM, None)
  289. self.queue = self.queue + r
  290. except (OSError, IOError):
  291. pass
  292. if len(self.queue):
  293. self.handle_item(b"cookerconfig", self.handle_cookercfg)
  294. self.handle_item(b"workerdata", self.handle_workerdata)
  295. self.handle_item(b"runtask", self.handle_runtask)
  296. self.handle_item(b"finishnow", self.handle_finishnow)
  297. self.handle_item(b"ping", self.handle_ping)
  298. self.handle_item(b"quit", self.handle_quit)
  299. for pipe in self.build_pipes:
  300. self.build_pipes[pipe].read()
  301. if len(self.build_pids):
  302. self.process_waitpid()
  303. worker_flush()
  304. def handle_item(self, item, func):
  305. if self.queue.startswith(b"<" + item + b">"):
  306. index = self.queue.find(b"</" + item + b">")
  307. while index != -1:
  308. func(self.queue[(len(item) + 2):index])
  309. self.queue = self.queue[(index + len(item) + 3):]
  310. index = self.queue.find(b"</" + item + b">")
  311. def handle_cookercfg(self, data):
  312. self.cookercfg = pickle.loads(data)
  313. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  314. self.databuilder.parseBaseConfiguration()
  315. self.data = self.databuilder.data
  316. def handle_workerdata(self, data):
  317. self.workerdata = pickle.loads(data)
  318. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  319. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  320. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  321. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  322. for mc in self.databuilder.mcdata:
  323. self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
  324. def handle_ping(self, _):
  325. workerlog_write("Handling ping\n")
  326. logger.warning("Pong from bitbake-worker!")
  327. def handle_quit(self, data):
  328. workerlog_write("Handling quit\n")
  329. global normalexit
  330. normalexit = True
  331. sys.exit(0)
  332. def handle_runtask(self, data):
  333. fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
  334. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  335. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
  336. self.build_pids[pid] = task
  337. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  338. def process_waitpid(self):
  339. """
  340. Return none is there are no processes awaiting result collection, otherwise
  341. collect the process exit codes and close the information pipe.
  342. """
  343. try:
  344. pid, status = os.waitpid(-1, os.WNOHANG)
  345. if pid == 0 or os.WIFSTOPPED(status):
  346. return None
  347. except OSError:
  348. return None
  349. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  350. if os.WIFEXITED(status):
  351. status = os.WEXITSTATUS(status)
  352. elif os.WIFSIGNALED(status):
  353. # Per shell conventions for $?, when a process exits due to
  354. # a signal, we return an exit code of 128 + SIGNUM
  355. status = 128 + os.WTERMSIG(status)
  356. task = self.build_pids[pid]
  357. del self.build_pids[pid]
  358. self.build_pipes[pid].close()
  359. del self.build_pipes[pid]
  360. worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
  361. def handle_finishnow(self, _):
  362. if self.build_pids:
  363. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  364. for k, v in iter(self.build_pids.items()):
  365. try:
  366. os.kill(-k, signal.SIGTERM)
  367. os.waitpid(-1, 0)
  368. except:
  369. pass
  370. for pipe in self.build_pipes:
  371. self.build_pipes[pipe].read()
  372. try:
  373. worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
  374. if not profiling:
  375. worker.serve()
  376. else:
  377. profname = "profile-worker.log"
  378. prof = profile.Profile()
  379. try:
  380. profile.Profile.runcall(prof, worker.serve)
  381. finally:
  382. prof.dump_stats(profname)
  383. bb.utils.process_profilelog(profname)
  384. except BaseException as e:
  385. if not normalexit:
  386. import traceback
  387. sys.stderr.write(traceback.format_exc())
  388. sys.stderr.write(str(e))
  389. while len(worker_queue):
  390. worker_flush()
  391. workerlog_write("exitting")
  392. sys.exit(0)