bitbake-worker 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. #!/usr/bin/env python3
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. import signal
  12. import pickle
  13. import traceback
  14. import queue
  15. from multiprocessing import Lock
  16. from threading import Thread
  17. if sys.getfilesystemencoding() != "utf-8":
  18. sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
  19. # Users shouldn't be running this code directly
  20. if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
  21. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  22. sys.exit(1)
  23. profiling = False
  24. if sys.argv[1].startswith("decafbadbad"):
  25. profiling = True
  26. try:
  27. import cProfile as profile
  28. except:
  29. import profile
  30. # Unbuffer stdout to avoid log truncation in the event
  31. # of an unorderly exit as well as to provide timely
  32. # updates to log files for use with tail
  33. try:
  34. if sys.stdout.name == '<stdout>':
  35. import fcntl
  36. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  37. fl |= os.O_SYNC
  38. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
  39. #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  40. except:
  41. pass
  42. logger = logging.getLogger("BitBake")
  43. worker_pipe = sys.stdout.fileno()
  44. bb.utils.nonblockingfd(worker_pipe)
  45. # Need to guard against multiprocessing being used in child processes
  46. # and multiple processes trying to write to the parent at the same time
  47. worker_pipe_lock = None
  48. handler = bb.event.LogHandler()
  49. logger.addHandler(handler)
  50. if 0:
  51. # Code to write out a log file of all events passing through the worker
  52. logfilename = "/tmp/workerlogfile"
  53. format_str = "%(levelname)s: %(message)s"
  54. conlogformat = bb.msg.BBLogFormatter(format_str)
  55. consolelog = logging.FileHandler(logfilename)
  56. bb.msg.addDefaultlogFilter(consolelog)
  57. consolelog.setFormatter(conlogformat)
  58. logger.addHandler(consolelog)
  59. worker_queue = queue.Queue()
  60. def worker_fire(event, d):
  61. data = b"<event>" + pickle.dumps(event) + b"</event>"
  62. worker_fire_prepickled(data)
  63. def worker_fire_prepickled(event):
  64. global worker_queue
  65. worker_queue.put(event)
  66. #
  67. # We can end up with write contention with the cooker, it can be trying to send commands
  68. # and we can be trying to send event data back. Therefore use a separate thread for writing
  69. # back data to cooker.
  70. #
  71. worker_thread_exit = False
  72. def worker_flush(worker_queue):
  73. worker_queue_int = b""
  74. global worker_pipe, worker_thread_exit
  75. while True:
  76. try:
  77. worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
  78. except queue.Empty:
  79. pass
  80. while (worker_queue_int or not worker_queue.empty()):
  81. try:
  82. (_, ready, _) = select.select([], [worker_pipe], [], 1)
  83. if not worker_queue.empty():
  84. worker_queue_int = worker_queue_int + worker_queue.get()
  85. written = os.write(worker_pipe, worker_queue_int)
  86. worker_queue_int = worker_queue_int[written:]
  87. except (IOError, OSError) as e:
  88. if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
  89. raise
  90. if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
  91. return
  92. worker_thread = Thread(target=worker_flush, args=(worker_queue,))
  93. worker_thread.start()
  94. def worker_child_fire(event, d):
  95. global worker_pipe
  96. global worker_pipe_lock
  97. data = b"<event>" + pickle.dumps(event) + b"</event>"
  98. try:
  99. worker_pipe_lock.acquire()
  100. worker_pipe.write(data)
  101. worker_pipe_lock.release()
  102. except IOError:
  103. sigterm_handler(None, None)
  104. raise
  105. bb.event.worker_fire = worker_fire
  106. lf = None
  107. #lf = open("/tmp/workercommandlog", "w+")
  108. def workerlog_write(msg):
  109. if lf:
  110. lf.write(msg)
  111. lf.flush()
  112. def sigterm_handler(signum, frame):
  113. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  114. os.killpg(0, signal.SIGTERM)
  115. sys.exit()
  116. def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
  117. # We need to setup the environment BEFORE the fork, since
  118. # a fork() or exec*() activates PSEUDO...
  119. envbackup = {}
  120. fakeenv = {}
  121. umask = None
  122. taskdep = workerdata["taskdeps"][fn]
  123. if 'umask' in taskdep and taskname in taskdep['umask']:
  124. # umask might come in as a number or text string..
  125. try:
  126. umask = int(taskdep['umask'][taskname],8)
  127. except TypeError:
  128. umask = taskdep['umask'][taskname]
  129. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  130. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
  131. envvars = (workerdata["fakerootenv"][fn] or "").split()
  132. for key, value in (var.split('=') for var in envvars):
  133. envbackup[key] = os.environ.get(key)
  134. os.environ[key] = value
  135. fakeenv[key] = value
  136. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  137. for p in fakedirs:
  138. bb.utils.mkdirhier(p)
  139. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  140. (fn, taskname, ', '.join(fakedirs)))
  141. else:
  142. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  143. for key, value in (var.split('=') for var in envvars):
  144. envbackup[key] = os.environ.get(key)
  145. os.environ[key] = value
  146. fakeenv[key] = value
  147. sys.stdout.flush()
  148. sys.stderr.flush()
  149. try:
  150. pipein, pipeout = os.pipe()
  151. pipein = os.fdopen(pipein, 'rb', 4096)
  152. pipeout = os.fdopen(pipeout, 'wb', 0)
  153. pid = os.fork()
  154. except OSError as e:
  155. logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
  156. sys.exit(1)
  157. if pid == 0:
  158. def child():
  159. global worker_pipe
  160. global worker_pipe_lock
  161. pipein.close()
  162. signal.signal(signal.SIGTERM, sigterm_handler)
  163. # Let SIGHUP exit as SIGTERM
  164. signal.signal(signal.SIGHUP, sigterm_handler)
  165. bb.utils.signal_on_parent_exit("SIGTERM")
  166. # Save out the PID so that the event can include it the
  167. # events
  168. bb.event.worker_pid = os.getpid()
  169. bb.event.worker_fire = worker_child_fire
  170. worker_pipe = pipeout
  171. worker_pipe_lock = Lock()
  172. # Make the child the process group leader and ensure no
  173. # child process will be controlled by the current terminal
  174. # This ensures signals sent to the controlling terminal like Ctrl+C
  175. # don't stop the child processes.
  176. os.setsid()
  177. # No stdin
  178. newsi = os.open(os.devnull, os.O_RDWR)
  179. os.dup2(newsi, sys.stdin.fileno())
  180. if umask:
  181. os.umask(umask)
  182. try:
  183. bb_cache = bb.cache.NoCache(databuilder)
  184. (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
  185. the_data = databuilder.mcdata[mc]
  186. the_data.setVar("BB_WORKERCONTEXT", "1")
  187. the_data.setVar("BB_TASKDEPDATA", taskdepdata)
  188. the_data.setVar("BUILDNAME", workerdata["buildname"])
  189. the_data.setVar("DATE", workerdata["date"])
  190. the_data.setVar("TIME", workerdata["time"])
  191. bb.parse.siggen.set_taskdata(workerdata["sigdata"])
  192. ret = 0
  193. the_data = bb_cache.loadDataFull(fn, appends)
  194. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  195. bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
  196. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  197. # successfully. We also need to unset anything from the environment which shouldn't be there
  198. exports = bb.data.exported_vars(the_data)
  199. bb.utils.empty_environment()
  200. for e, v in exports:
  201. os.environ[e] = v
  202. for e in fakeenv:
  203. os.environ[e] = fakeenv[e]
  204. the_data.setVar(e, fakeenv[e])
  205. the_data.setVarFlag(e, 'export', "1")
  206. task_exports = the_data.getVarFlag(taskname, 'exports')
  207. if task_exports:
  208. for e in task_exports.split():
  209. the_data.setVarFlag(e, 'export', '1')
  210. v = the_data.getVar(e)
  211. if v is not None:
  212. os.environ[e] = v
  213. if quieterrors:
  214. the_data.setVarFlag(taskname, "quieterrors", "1")
  215. except Exception:
  216. if not quieterrors:
  217. logger.critical(traceback.format_exc())
  218. os._exit(1)
  219. try:
  220. if cfg.dry_run:
  221. return 0
  222. return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  223. except:
  224. os._exit(1)
  225. if not profiling:
  226. os._exit(child())
  227. else:
  228. profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
  229. prof = profile.Profile()
  230. try:
  231. ret = profile.Profile.runcall(prof, child)
  232. finally:
  233. prof.dump_stats(profname)
  234. bb.utils.process_profilelog(profname)
  235. os._exit(ret)
  236. else:
  237. for key, value in iter(envbackup.items()):
  238. if value is None:
  239. del os.environ[key]
  240. else:
  241. os.environ[key] = value
  242. return pid, pipein, pipeout
  243. class runQueueWorkerPipe():
  244. """
  245. Abstraction for a pipe between a worker thread and the worker server
  246. """
  247. def __init__(self, pipein, pipeout):
  248. self.input = pipein
  249. if pipeout:
  250. pipeout.close()
  251. bb.utils.nonblockingfd(self.input)
  252. self.queue = b""
  253. def read(self):
  254. start = len(self.queue)
  255. try:
  256. self.queue = self.queue + (self.input.read(102400) or b"")
  257. except (OSError, IOError) as e:
  258. if e.errno != errno.EAGAIN:
  259. raise
  260. end = len(self.queue)
  261. index = self.queue.find(b"</event>")
  262. while index != -1:
  263. worker_fire_prepickled(self.queue[:index+8])
  264. self.queue = self.queue[index+8:]
  265. index = self.queue.find(b"</event>")
  266. return (end > start)
  267. def close(self):
  268. while self.read():
  269. continue
  270. if len(self.queue) > 0:
  271. print("Warning, worker child left partial message: %s" % self.queue)
  272. self.input.close()
  273. normalexit = False
  274. class BitbakeWorker(object):
  275. def __init__(self, din):
  276. self.input = din
  277. bb.utils.nonblockingfd(self.input)
  278. self.queue = b""
  279. self.cookercfg = None
  280. self.databuilder = None
  281. self.data = None
  282. self.build_pids = {}
  283. self.build_pipes = {}
  284. signal.signal(signal.SIGTERM, self.sigterm_exception)
  285. # Let SIGHUP exit as SIGTERM
  286. signal.signal(signal.SIGHUP, self.sigterm_exception)
  287. if "beef" in sys.argv[1]:
  288. bb.utils.set_process_name("Worker (Fakeroot)")
  289. else:
  290. bb.utils.set_process_name("Worker")
  291. def sigterm_exception(self, signum, stackframe):
  292. if signum == signal.SIGTERM:
  293. bb.warn("Worker received SIGTERM, shutting down...")
  294. elif signum == signal.SIGHUP:
  295. bb.warn("Worker received SIGHUP, shutting down...")
  296. self.handle_finishnow(None)
  297. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  298. os.kill(os.getpid(), signal.SIGTERM)
  299. def serve(self):
  300. while True:
  301. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  302. if self.input in ready:
  303. try:
  304. r = self.input.read()
  305. if len(r) == 0:
  306. # EOF on pipe, server must have terminated
  307. self.sigterm_exception(signal.SIGTERM, None)
  308. self.queue = self.queue + r
  309. except (OSError, IOError):
  310. pass
  311. if len(self.queue):
  312. self.handle_item(b"cookerconfig", self.handle_cookercfg)
  313. self.handle_item(b"workerdata", self.handle_workerdata)
  314. self.handle_item(b"runtask", self.handle_runtask)
  315. self.handle_item(b"finishnow", self.handle_finishnow)
  316. self.handle_item(b"ping", self.handle_ping)
  317. self.handle_item(b"quit", self.handle_quit)
  318. for pipe in self.build_pipes:
  319. if self.build_pipes[pipe].input in ready:
  320. self.build_pipes[pipe].read()
  321. if len(self.build_pids):
  322. while self.process_waitpid():
  323. continue
  324. def handle_item(self, item, func):
  325. if self.queue.startswith(b"<" + item + b">"):
  326. index = self.queue.find(b"</" + item + b">")
  327. while index != -1:
  328. func(self.queue[(len(item) + 2):index])
  329. self.queue = self.queue[(index + len(item) + 3):]
  330. index = self.queue.find(b"</" + item + b">")
  331. def handle_cookercfg(self, data):
  332. self.cookercfg = pickle.loads(data)
  333. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  334. self.databuilder.parseBaseConfiguration()
  335. self.data = self.databuilder.data
  336. def handle_workerdata(self, data):
  337. self.workerdata = pickle.loads(data)
  338. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  339. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  340. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  341. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  342. for mc in self.databuilder.mcdata:
  343. self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
  344. def handle_ping(self, _):
  345. workerlog_write("Handling ping\n")
  346. logger.warning("Pong from bitbake-worker!")
  347. def handle_quit(self, data):
  348. workerlog_write("Handling quit\n")
  349. global normalexit
  350. normalexit = True
  351. sys.exit(0)
  352. def handle_runtask(self, data):
  353. fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
  354. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  355. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
  356. self.build_pids[pid] = task
  357. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  358. def process_waitpid(self):
  359. """
  360. Return none is there are no processes awaiting result collection, otherwise
  361. collect the process exit codes and close the information pipe.
  362. """
  363. try:
  364. pid, status = os.waitpid(-1, os.WNOHANG)
  365. if pid == 0 or os.WIFSTOPPED(status):
  366. return False
  367. except OSError:
  368. return False
  369. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  370. if os.WIFEXITED(status):
  371. status = os.WEXITSTATUS(status)
  372. elif os.WIFSIGNALED(status):
  373. # Per shell conventions for $?, when a process exits due to
  374. # a signal, we return an exit code of 128 + SIGNUM
  375. status = 128 + os.WTERMSIG(status)
  376. task = self.build_pids[pid]
  377. del self.build_pids[pid]
  378. self.build_pipes[pid].close()
  379. del self.build_pipes[pid]
  380. worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
  381. return True
  382. def handle_finishnow(self, _):
  383. if self.build_pids:
  384. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  385. for k, v in iter(self.build_pids.items()):
  386. try:
  387. os.kill(-k, signal.SIGTERM)
  388. os.waitpid(-1, 0)
  389. except:
  390. pass
  391. for pipe in self.build_pipes:
  392. self.build_pipes[pipe].read()
  393. try:
  394. worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
  395. if not profiling:
  396. worker.serve()
  397. else:
  398. profname = "profile-worker.log"
  399. prof = profile.Profile()
  400. try:
  401. profile.Profile.runcall(prof, worker.serve)
  402. finally:
  403. prof.dump_stats(profname)
  404. bb.utils.process_profilelog(profname)
  405. except BaseException as e:
  406. if not normalexit:
  407. import traceback
  408. sys.stderr.write(traceback.format_exc())
  409. sys.stderr.write(str(e))
  410. worker_thread_exit = True
  411. worker_thread.join()
  412. workerlog_write("exitting")
  413. sys.exit(0)