bitbake-worker 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. import signal
  12. import pickle
  13. from multiprocessing import Lock
  14. if sys.getfilesystemencoding() != "utf-8":
  15. sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
  16. # Users shouldn't be running this code directly
  17. if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
  18. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  19. sys.exit(1)
  20. profiling = False
  21. if sys.argv[1].startswith("decafbadbad"):
  22. profiling = True
  23. try:
  24. import cProfile as profile
  25. except:
  26. import profile
  27. # Unbuffer stdout to avoid log truncation in the event
  28. # of an unorderly exit as well as to provide timely
  29. # updates to log files for use with tail
  30. try:
  31. if sys.stdout.name == '<stdout>':
  32. import fcntl
  33. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  34. fl |= os.O_SYNC
  35. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
  36. #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  37. except:
  38. pass
  39. logger = logging.getLogger("BitBake")
  40. worker_pipe = sys.stdout.fileno()
  41. bb.utils.nonblockingfd(worker_pipe)
  42. # Need to guard against multiprocessing being used in child processes
  43. # and multiple processes trying to write to the parent at the same time
  44. worker_pipe_lock = None
  45. handler = bb.event.LogHandler()
  46. logger.addHandler(handler)
  47. if 0:
  48. # Code to write out a log file of all events passing through the worker
  49. logfilename = "/tmp/workerlogfile"
  50. format_str = "%(levelname)s: %(message)s"
  51. conlogformat = bb.msg.BBLogFormatter(format_str)
  52. consolelog = logging.FileHandler(logfilename)
  53. bb.msg.addDefaultlogFilter(consolelog)
  54. consolelog.setFormatter(conlogformat)
  55. logger.addHandler(consolelog)
  56. worker_queue = b""
  57. def worker_fire(event, d):
  58. data = b"<event>" + pickle.dumps(event) + b"</event>"
  59. worker_fire_prepickled(data)
  60. def worker_fire_prepickled(event):
  61. global worker_queue
  62. worker_queue = worker_queue + event
  63. worker_flush()
  64. def worker_flush():
  65. global worker_queue, worker_pipe
  66. if not worker_queue:
  67. return
  68. try:
  69. written = os.write(worker_pipe, worker_queue)
  70. worker_queue = worker_queue[written:]
  71. except (IOError, OSError) as e:
  72. if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
  73. raise
  74. def worker_child_fire(event, d):
  75. global worker_pipe
  76. global worker_pipe_lock
  77. data = b"<event>" + pickle.dumps(event) + b"</event>"
  78. try:
  79. worker_pipe_lock.acquire()
  80. worker_pipe.write(data)
  81. worker_pipe_lock.release()
  82. except IOError:
  83. sigterm_handler(None, None)
  84. raise
  85. bb.event.worker_fire = worker_fire
  86. lf = None
  87. #lf = open("/tmp/workercommandlog", "w+")
  88. def workerlog_write(msg):
  89. if lf:
  90. lf.write(msg)
  91. lf.flush()
  92. def sigterm_handler(signum, frame):
  93. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  94. os.killpg(0, signal.SIGTERM)
  95. sys.exit()
  96. def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
  97. # We need to setup the environment BEFORE the fork, since
  98. # a fork() or exec*() activates PSEUDO...
  99. envbackup = {}
  100. fakeenv = {}
  101. umask = None
  102. taskdep = workerdata["taskdeps"][fn]
  103. if 'umask' in taskdep and taskname in taskdep['umask']:
  104. # umask might come in as a number or text string..
  105. try:
  106. umask = int(taskdep['umask'][taskname],8)
  107. except TypeError:
  108. umask = taskdep['umask'][taskname]
  109. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  110. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
  111. envvars = (workerdata["fakerootenv"][fn] or "").split()
  112. for key, value in (var.split('=') for var in envvars):
  113. envbackup[key] = os.environ.get(key)
  114. os.environ[key] = value
  115. fakeenv[key] = value
  116. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  117. for p in fakedirs:
  118. bb.utils.mkdirhier(p)
  119. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  120. (fn, taskname, ', '.join(fakedirs)))
  121. else:
  122. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  123. for key, value in (var.split('=') for var in envvars):
  124. envbackup[key] = os.environ.get(key)
  125. os.environ[key] = value
  126. fakeenv[key] = value
  127. sys.stdout.flush()
  128. sys.stderr.flush()
  129. try:
  130. pipein, pipeout = os.pipe()
  131. pipein = os.fdopen(pipein, 'rb', 4096)
  132. pipeout = os.fdopen(pipeout, 'wb', 0)
  133. pid = os.fork()
  134. except OSError as e:
  135. logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
  136. sys.exit(1)
  137. if pid == 0:
  138. def child():
  139. global worker_pipe
  140. global worker_pipe_lock
  141. pipein.close()
  142. signal.signal(signal.SIGTERM, sigterm_handler)
  143. # Let SIGHUP exit as SIGTERM
  144. signal.signal(signal.SIGHUP, sigterm_handler)
  145. bb.utils.signal_on_parent_exit("SIGTERM")
  146. # Save out the PID so that the event can include it the
  147. # events
  148. bb.event.worker_pid = os.getpid()
  149. bb.event.worker_fire = worker_child_fire
  150. worker_pipe = pipeout
  151. worker_pipe_lock = Lock()
  152. # Make the child the process group leader and ensure no
  153. # child process will be controlled by the current terminal
  154. # This ensures signals sent to the controlling terminal like Ctrl+C
  155. # don't stop the child processes.
  156. os.setsid()
  157. # No stdin
  158. newsi = os.open(os.devnull, os.O_RDWR)
  159. os.dup2(newsi, sys.stdin.fileno())
  160. if umask:
  161. os.umask(umask)
  162. try:
  163. bb_cache = bb.cache.NoCache(databuilder)
  164. the_data = databuilder.data
  165. the_data.setVar("BB_WORKERCONTEXT", "1")
  166. the_data.setVar("BB_TASKDEPDATA", taskdepdata)
  167. the_data.setVar("BUILDNAME", workerdata["buildname"])
  168. the_data.setVar("DATE", workerdata["date"])
  169. the_data.setVar("TIME", workerdata["time"])
  170. bb.parse.siggen.set_taskdata(workerdata["sigdata"])
  171. ret = 0
  172. the_data = bb_cache.loadDataFull(fn, appends)
  173. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  174. bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN", True), taskname.replace("do_", "")))
  175. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  176. # successfully. We also need to unset anything from the environment which shouldn't be there
  177. exports = bb.data.exported_vars(the_data)
  178. bb.utils.empty_environment()
  179. for e, v in exports:
  180. os.environ[e] = v
  181. for e in fakeenv:
  182. os.environ[e] = fakeenv[e]
  183. the_data.setVar(e, fakeenv[e])
  184. the_data.setVarFlag(e, 'export', "1")
  185. task_exports = the_data.getVarFlag(taskname, 'exports', True)
  186. if task_exports:
  187. for e in task_exports.split():
  188. the_data.setVarFlag(e, 'export', '1')
  189. v = the_data.getVar(e, True)
  190. if v is not None:
  191. os.environ[e] = v
  192. if quieterrors:
  193. the_data.setVarFlag(taskname, "quieterrors", "1")
  194. except Exception as exc:
  195. if not quieterrors:
  196. logger.critical(str(exc))
  197. os._exit(1)
  198. try:
  199. if cfg.dry_run:
  200. return 0
  201. return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  202. except:
  203. os._exit(1)
  204. if not profiling:
  205. os._exit(child())
  206. else:
  207. profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
  208. prof = profile.Profile()
  209. try:
  210. ret = profile.Profile.runcall(prof, child)
  211. finally:
  212. prof.dump_stats(profname)
  213. bb.utils.process_profilelog(profname)
  214. os._exit(ret)
  215. else:
  216. for key, value in iter(envbackup.items()):
  217. if value is None:
  218. del os.environ[key]
  219. else:
  220. os.environ[key] = value
  221. return pid, pipein, pipeout
  222. class runQueueWorkerPipe():
  223. """
  224. Abstraction for a pipe between a worker thread and the worker server
  225. """
  226. def __init__(self, pipein, pipeout):
  227. self.input = pipein
  228. if pipeout:
  229. pipeout.close()
  230. bb.utils.nonblockingfd(self.input)
  231. self.queue = b""
  232. def read(self):
  233. start = len(self.queue)
  234. try:
  235. self.queue = self.queue + (self.input.read(102400) or b"")
  236. except (OSError, IOError) as e:
  237. if e.errno != errno.EAGAIN:
  238. raise
  239. end = len(self.queue)
  240. index = self.queue.find(b"</event>")
  241. while index != -1:
  242. worker_fire_prepickled(self.queue[:index+8])
  243. self.queue = self.queue[index+8:]
  244. index = self.queue.find(b"</event>")
  245. return (end > start)
  246. def close(self):
  247. while self.read():
  248. continue
  249. if len(self.queue) > 0:
  250. print("Warning, worker child left partial message: %s" % self.queue)
  251. self.input.close()
  252. normalexit = False
  253. class BitbakeWorker(object):
  254. def __init__(self, din):
  255. self.input = din
  256. bb.utils.nonblockingfd(self.input)
  257. self.queue = b""
  258. self.cookercfg = None
  259. self.databuilder = None
  260. self.data = None
  261. self.build_pids = {}
  262. self.build_pipes = {}
  263. signal.signal(signal.SIGTERM, self.sigterm_exception)
  264. # Let SIGHUP exit as SIGTERM
  265. signal.signal(signal.SIGHUP, self.sigterm_exception)
  266. if "beef" in sys.argv[1]:
  267. bb.utils.set_process_name("Worker (Fakeroot)")
  268. else:
  269. bb.utils.set_process_name("Worker")
  270. def sigterm_exception(self, signum, stackframe):
  271. if signum == signal.SIGTERM:
  272. bb.warn("Worker received SIGTERM, shutting down...")
  273. elif signum == signal.SIGHUP:
  274. bb.warn("Worker received SIGHUP, shutting down...")
  275. self.handle_finishnow(None)
  276. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  277. os.kill(os.getpid(), signal.SIGTERM)
  278. def serve(self):
  279. while True:
  280. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  281. if self.input in ready:
  282. try:
  283. r = self.input.read()
  284. if len(r) == 0:
  285. # EOF on pipe, server must have terminated
  286. self.sigterm_exception(signal.SIGTERM, None)
  287. self.queue = self.queue + r
  288. except (OSError, IOError):
  289. pass
  290. if len(self.queue):
  291. self.handle_item(b"cookerconfig", self.handle_cookercfg)
  292. self.handle_item(b"workerdata", self.handle_workerdata)
  293. self.handle_item(b"runtask", self.handle_runtask)
  294. self.handle_item(b"finishnow", self.handle_finishnow)
  295. self.handle_item(b"ping", self.handle_ping)
  296. self.handle_item(b"quit", self.handle_quit)
  297. for pipe in self.build_pipes:
  298. self.build_pipes[pipe].read()
  299. if len(self.build_pids):
  300. self.process_waitpid()
  301. worker_flush()
  302. def handle_item(self, item, func):
  303. if self.queue.startswith(b"<" + item + b">"):
  304. index = self.queue.find(b"</" + item + b">")
  305. while index != -1:
  306. func(self.queue[(len(item) + 2):index])
  307. self.queue = self.queue[(index + len(item) + 3):]
  308. index = self.queue.find(b"</" + item + b">")
  309. def handle_cookercfg(self, data):
  310. self.cookercfg = pickle.loads(data)
  311. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  312. self.databuilder.parseBaseConfiguration()
  313. self.data = self.databuilder.data
  314. def handle_workerdata(self, data):
  315. self.workerdata = pickle.loads(data)
  316. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  317. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  318. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  319. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  320. self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
  321. def handle_ping(self, _):
  322. workerlog_write("Handling ping\n")
  323. logger.warning("Pong from bitbake-worker!")
  324. def handle_quit(self, data):
  325. workerlog_write("Handling quit\n")
  326. global normalexit
  327. normalexit = True
  328. sys.exit(0)
  329. def handle_runtask(self, data):
  330. fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
  331. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  332. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
  333. self.build_pids[pid] = task
  334. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  335. def process_waitpid(self):
  336. """
  337. Return none is there are no processes awaiting result collection, otherwise
  338. collect the process exit codes and close the information pipe.
  339. """
  340. try:
  341. pid, status = os.waitpid(-1, os.WNOHANG)
  342. if pid == 0 or os.WIFSTOPPED(status):
  343. return None
  344. except OSError:
  345. return None
  346. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  347. if os.WIFEXITED(status):
  348. status = os.WEXITSTATUS(status)
  349. elif os.WIFSIGNALED(status):
  350. # Per shell conventions for $?, when a process exits due to
  351. # a signal, we return an exit code of 128 + SIGNUM
  352. status = 128 + os.WTERMSIG(status)
  353. task = self.build_pids[pid]
  354. del self.build_pids[pid]
  355. self.build_pipes[pid].close()
  356. del self.build_pipes[pid]
  357. worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
  358. def handle_finishnow(self, _):
  359. if self.build_pids:
  360. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  361. for k, v in iter(self.build_pids.items()):
  362. try:
  363. os.kill(-k, signal.SIGTERM)
  364. os.waitpid(-1, 0)
  365. except:
  366. pass
  367. for pipe in self.build_pipes:
  368. self.build_pipes[pipe].read()
  369. try:
  370. worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
  371. if not profiling:
  372. worker.serve()
  373. else:
  374. profname = "profile-worker.log"
  375. prof = profile.Profile()
  376. try:
  377. profile.Profile.runcall(prof, worker.serve)
  378. finally:
  379. prof.dump_stats(profname)
  380. bb.utils.process_profilelog(profname)
  381. except BaseException as e:
  382. if not normalexit:
  383. import traceback
  384. sys.stderr.write(traceback.format_exc())
  385. sys.stderr.write(str(e))
  386. while len(worker_queue):
  387. worker_flush()
  388. workerlog_write("exitting")
  389. sys.exit(0)