bitbake-worker 14 KB


  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. import signal
  12. # Users shouldn't be running this code directly
  13. if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
  14. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  15. sys.exit(1)
  16. profiling = False
  17. if sys.argv[1] == "decafbadbad":
  18. profiling = True
  19. try:
  20. import cProfile as profile
  21. except:
  22. import profile
  23. # Unbuffer stdout to avoid log truncation in the event
  24. # of an unorderly exit as well as to provide timely
  25. # updates to log files for use with tail
  26. try:
  27. if sys.stdout.name == '<stdout>':
  28. sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  29. except:
  30. pass
  31. logger = logging.getLogger("BitBake")
  32. try:
  33. import cPickle as pickle
  34. except ImportError:
  35. import pickle
  36. bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
  37. worker_pipe = sys.stdout.fileno()
  38. bb.utils.nonblockingfd(worker_pipe)
  39. handler = bb.event.LogHandler()
  40. logger.addHandler(handler)
  41. if 0:
  42. # Code to write out a log file of all events passing through the worker
  43. logfilename = "/tmp/workerlogfile"
  44. format_str = "%(levelname)s: %(message)s"
  45. conlogformat = bb.msg.BBLogFormatter(format_str)
  46. consolelog = logging.FileHandler(logfilename)
  47. bb.msg.addDefaultlogFilter(consolelog)
  48. consolelog.setFormatter(conlogformat)
  49. logger.addHandler(consolelog)
  50. worker_queue = ""
  51. def worker_fire(event, d):
  52. data = "<event>" + pickle.dumps(event) + "</event>"
  53. worker_fire_prepickled(data)
  54. def worker_fire_prepickled(event):
  55. global worker_queue
  56. worker_queue = worker_queue + event
  57. worker_flush()
  58. def worker_flush():
  59. global worker_queue, worker_pipe
  60. if not worker_queue:
  61. return
  62. try:
  63. written = os.write(worker_pipe, worker_queue)
  64. worker_queue = worker_queue[written:]
  65. except (IOError, OSError) as e:
  66. if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
  67. raise
  68. def worker_child_fire(event, d):
  69. global worker_pipe
  70. data = "<event>" + pickle.dumps(event) + "</event>"
  71. try:
  72. worker_pipe.write(data)
  73. except IOError:
  74. sigterm_handler(None, None)
  75. raise
  76. bb.event.worker_fire = worker_fire
  77. lf = None
  78. #lf = open("/tmp/workercommandlog", "w+")
  79. def workerlog_write(msg):
  80. if lf:
  81. lf.write(msg)
  82. lf.flush()
  83. def sigterm_handler(signum, frame):
  84. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  85. os.killpg(0, signal.SIGTERM)
  86. sys.exit()
  87. def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
  88. # We need to setup the environment BEFORE the fork, since
  89. # a fork() or exec*() activates PSEUDO...
  90. envbackup = {}
  91. fakeenv = {}
  92. umask = None
  93. taskdep = workerdata["taskdeps"][fn]
  94. if 'umask' in taskdep and taskname in taskdep['umask']:
  95. # umask might come in as a number or text string..
  96. try:
  97. umask = int(taskdep['umask'][taskname],8)
  98. except TypeError:
  99. umask = taskdep['umask'][taskname]
  100. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  101. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
  102. envvars = (workerdata["fakerootenv"][fn] or "").split()
  103. for key, value in (var.split('=') for var in envvars):
  104. envbackup[key] = os.environ.get(key)
  105. os.environ[key] = value
  106. fakeenv[key] = value
  107. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  108. for p in fakedirs:
  109. bb.utils.mkdirhier(p)
  110. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  111. (fn, taskname, ', '.join(fakedirs)))
  112. else:
  113. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  114. for key, value in (var.split('=') for var in envvars):
  115. envbackup[key] = os.environ.get(key)
  116. os.environ[key] = value
  117. fakeenv[key] = value
  118. sys.stdout.flush()
  119. sys.stderr.flush()
  120. try:
  121. pipein, pipeout = os.pipe()
  122. pipein = os.fdopen(pipein, 'rb', 4096)
  123. pipeout = os.fdopen(pipeout, 'wb', 0)
  124. pid = os.fork()
  125. except OSError as e:
  126. bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
  127. if pid == 0:
  128. def child():
  129. global worker_pipe
  130. pipein.close()
  131. signal.signal(signal.SIGTERM, sigterm_handler)
  132. # Let SIGHUP exit as SIGTERM
  133. signal.signal(signal.SIGHUP, sigterm_handler)
  134. bb.utils.signal_on_parent_exit("SIGTERM")
  135. # Save out the PID so that the event can include it the
  136. # events
  137. bb.event.worker_pid = os.getpid()
  138. bb.event.worker_fire = worker_child_fire
  139. worker_pipe = pipeout
  140. # Make the child the process group leader and ensure no
  141. # child process will be controlled by the current terminal
  142. # This ensures signals sent to the controlling terminal like Ctrl+C
  143. # don't stop the child processes.
  144. os.setsid()
  145. # No stdin
  146. newsi = os.open(os.devnull, os.O_RDWR)
  147. os.dup2(newsi, sys.stdin.fileno())
  148. if umask:
  149. os.umask(umask)
  150. data.setVar("BB_WORKERCONTEXT", "1")
  151. data.setVar("BB_TASKDEPDATA", taskdepdata)
  152. data.setVar("BUILDNAME", workerdata["buildname"])
  153. data.setVar("DATE", workerdata["date"])
  154. data.setVar("TIME", workerdata["time"])
  155. bb.parse.siggen.set_taskdata(workerdata["sigdata"])
  156. ret = 0
  157. try:
  158. the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
  159. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  160. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  161. # successfully. We also need to unset anything from the environment which shouldn't be there
  162. exports = bb.data.exported_vars(the_data)
  163. bb.utils.empty_environment()
  164. for e, v in exports:
  165. os.environ[e] = v
  166. for e in fakeenv:
  167. os.environ[e] = fakeenv[e]
  168. the_data.setVar(e, fakeenv[e])
  169. the_data.setVarFlag(e, 'export', "1")
  170. if quieterrors:
  171. the_data.setVarFlag(taskname, "quieterrors", "1")
  172. except Exception as exc:
  173. if not quieterrors:
  174. logger.critical(str(exc))
  175. os._exit(1)
  176. try:
  177. if cfg.dry_run:
  178. return 0
  179. return bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  180. except:
  181. os._exit(1)
  182. if not profiling:
  183. os._exit(child())
  184. else:
  185. profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
  186. prof = profile.Profile()
  187. try:
  188. ret = profile.Profile.runcall(prof, child)
  189. finally:
  190. prof.dump_stats(profname)
  191. bb.utils.process_profilelog(profname)
  192. os._exit(ret)
  193. else:
  194. for key, value in envbackup.iteritems():
  195. if value is None:
  196. del os.environ[key]
  197. else:
  198. os.environ[key] = value
  199. return pid, pipein, pipeout
  200. class runQueueWorkerPipe():
  201. """
  202. Abstraction for a pipe between a worker thread and the worker server
  203. """
  204. def __init__(self, pipein, pipeout):
  205. self.input = pipein
  206. if pipeout:
  207. pipeout.close()
  208. bb.utils.nonblockingfd(self.input)
  209. self.queue = ""
  210. def read(self):
  211. start = len(self.queue)
  212. try:
  213. self.queue = self.queue + self.input.read(102400)
  214. except (OSError, IOError) as e:
  215. if e.errno != errno.EAGAIN:
  216. raise
  217. end = len(self.queue)
  218. index = self.queue.find("</event>")
  219. while index != -1:
  220. worker_fire_prepickled(self.queue[:index+8])
  221. self.queue = self.queue[index+8:]
  222. index = self.queue.find("</event>")
  223. return (end > start)
  224. def close(self):
  225. while self.read():
  226. continue
  227. if len(self.queue) > 0:
  228. print("Warning, worker child left partial message: %s" % self.queue)
  229. self.input.close()
  230. normalexit = False
  231. class BitbakeWorker(object):
  232. def __init__(self, din):
  233. self.input = din
  234. bb.utils.nonblockingfd(self.input)
  235. self.queue = ""
  236. self.cookercfg = None
  237. self.databuilder = None
  238. self.data = None
  239. self.build_pids = {}
  240. self.build_pipes = {}
  241. signal.signal(signal.SIGTERM, self.sigterm_exception)
  242. # Let SIGHUP exit as SIGTERM
  243. signal.signal(signal.SIGHUP, self.sigterm_exception)
  244. def sigterm_exception(self, signum, stackframe):
  245. if signum == signal.SIGTERM:
  246. bb.warn("Worker recieved SIGTERM, shutting down...")
  247. elif signum == signal.SIGHUP:
  248. bb.warn("Worker recieved SIGHUP, shutting down...")
  249. self.handle_finishnow(None)
  250. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  251. os.kill(os.getpid(), signal.SIGTERM)
  252. def serve(self):
  253. while True:
  254. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  255. if self.input in ready:
  256. try:
  257. r = self.input.read()
  258. if len(r) == 0:
  259. # EOF on pipe, server must have terminated
  260. self.sigterm_exception(signal.SIGTERM, None)
  261. self.queue = self.queue + r
  262. except (OSError, IOError):
  263. pass
  264. if len(self.queue):
  265. self.handle_item("cookerconfig", self.handle_cookercfg)
  266. self.handle_item("workerdata", self.handle_workerdata)
  267. self.handle_item("runtask", self.handle_runtask)
  268. self.handle_item("finishnow", self.handle_finishnow)
  269. self.handle_item("ping", self.handle_ping)
  270. self.handle_item("quit", self.handle_quit)
  271. for pipe in self.build_pipes:
  272. self.build_pipes[pipe].read()
  273. if len(self.build_pids):
  274. self.process_waitpid()
  275. worker_flush()
  276. def handle_item(self, item, func):
  277. if self.queue.startswith("<" + item + ">"):
  278. index = self.queue.find("</" + item + ">")
  279. while index != -1:
  280. func(self.queue[(len(item) + 2):index])
  281. self.queue = self.queue[(index + len(item) + 3):]
  282. index = self.queue.find("</" + item + ">")
  283. def handle_cookercfg(self, data):
  284. self.cookercfg = pickle.loads(data)
  285. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  286. self.databuilder.parseBaseConfiguration()
  287. self.data = self.databuilder.data
  288. def handle_workerdata(self, data):
  289. self.workerdata = pickle.loads(data)
  290. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  291. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  292. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  293. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  294. self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
  295. def handle_ping(self, _):
  296. workerlog_write("Handling ping\n")
  297. logger.warn("Pong from bitbake-worker!")
  298. def handle_quit(self, data):
  299. workerlog_write("Handling quit\n")
  300. global normalexit
  301. normalexit = True
  302. sys.exit(0)
  303. def handle_runtask(self, data):
  304. fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
  305. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  306. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
  307. self.build_pids[pid] = task
  308. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  309. def process_waitpid(self):
  310. """
  311. Return none is there are no processes awaiting result collection, otherwise
  312. collect the process exit codes and close the information pipe.
  313. """
  314. try:
  315. pid, status = os.waitpid(-1, os.WNOHANG)
  316. if pid == 0 or os.WIFSTOPPED(status):
  317. return None
  318. except OSError:
  319. return None
  320. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  321. if os.WIFEXITED(status):
  322. status = os.WEXITSTATUS(status)
  323. elif os.WIFSIGNALED(status):
  324. # Per shell conventions for $?, when a process exits due to
  325. # a signal, we return an exit code of 128 + SIGNUM
  326. status = 128 + os.WTERMSIG(status)
  327. task = self.build_pids[pid]
  328. del self.build_pids[pid]
  329. self.build_pipes[pid].close()
  330. del self.build_pipes[pid]
  331. worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
  332. def handle_finishnow(self, _):
  333. if self.build_pids:
  334. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  335. for k, v in self.build_pids.iteritems():
  336. try:
  337. os.kill(-k, signal.SIGTERM)
  338. os.waitpid(-1, 0)
  339. except:
  340. pass
  341. for pipe in self.build_pipes:
  342. self.build_pipes[pipe].read()
  343. try:
  344. worker = BitbakeWorker(sys.stdin)
  345. if not profiling:
  346. worker.serve()
  347. else:
  348. profname = "profile-worker.log"
  349. prof = profile.Profile()
  350. try:
  351. profile.Profile.runcall(prof, worker.serve)
  352. finally:
  353. prof.dump_stats(profname)
  354. bb.utils.process_profilelog(profname)
  355. except BaseException as e:
  356. if not normalexit:
  357. import traceback
  358. sys.stderr.write(traceback.format_exc())
  359. sys.stderr.write(str(e))
  360. while len(worker_queue):
  361. worker_flush()
  362. workerlog_write("exitting")
  363. sys.exit(0)