bitbake-worker 12 KB


  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. # Users shouldn't be running this code directly
  12. if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
  13. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  14. sys.exit(1)
  15. logger = logging.getLogger("BitBake")
  16. try:
  17. import cPickle as pickle
  18. except ImportError:
  19. import pickle
  20. bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
  21. worker_pipe = sys.stdout.fileno()
  22. bb.utils.nonblockingfd(worker_pipe)
  23. handler = bb.event.LogHandler()
  24. logger.addHandler(handler)
  25. if 0:
  26. # Code to write out a log file of all events passing through the worker
  27. logfilename = "/tmp/workerlogfile"
  28. format_str = "%(levelname)s: %(message)s"
  29. conlogformat = bb.msg.BBLogFormatter(format_str)
  30. consolelog = logging.FileHandler(logfilename)
  31. bb.msg.addDefaultlogFilter(consolelog)
  32. consolelog.setFormatter(conlogformat)
  33. logger.addHandler(consolelog)
  34. worker_queue = ""
  35. def worker_fire(event, d):
  36. data = "<event>" + pickle.dumps(event) + "</event>"
  37. worker_fire_prepickled(data)
  38. def worker_fire_prepickled(event):
  39. global worker_queue
  40. worker_queue = worker_queue + event
  41. worker_flush()
  42. def worker_flush():
  43. global worker_queue, worker_pipe
  44. if not worker_queue:
  45. return
  46. try:
  47. written = os.write(worker_pipe, worker_queue)
  48. worker_queue = worker_queue[written:]
  49. except (IOError, OSError) as e:
  50. if e.errno != errno.EAGAIN:
  51. raise
  52. def worker_child_fire(event, d):
  53. global worker_pipe
  54. data = "<event>" + pickle.dumps(event) + "</event>"
  55. worker_pipe.write(data)
  56. bb.event.worker_fire = worker_fire
  57. lf = None
  58. #lf = open("/tmp/workercommandlog", "w+")
  59. def workerlog_write(msg):
  60. if lf:
  61. lf.write(msg)
  62. lf.flush()
  63. def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, quieterrors=False):
  64. # We need to setup the environment BEFORE the fork, since
  65. # a fork() or exec*() activates PSEUDO...
  66. envbackup = {}
  67. fakeenv = {}
  68. umask = None
  69. taskdep = workerdata["taskdeps"][fn]
  70. if 'umask' in taskdep and taskname in taskdep['umask']:
  71. # umask might come in as a number or text string..
  72. try:
  73. umask = int(taskdep['umask'][taskname],8)
  74. except TypeError:
  75. umask = taskdep['umask'][taskname]
  76. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
  77. envvars = (workerdata["fakerootenv"][fn] or "").split()
  78. for key, value in (var.split('=') for var in envvars):
  79. envbackup[key] = os.environ.get(key)
  80. os.environ[key] = value
  81. fakeenv[key] = value
  82. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  83. for p in fakedirs:
  84. bb.utils.mkdirhier(p)
  85. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  86. (fn, taskname, ', '.join(fakedirs)))
  87. else:
  88. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  89. for key, value in (var.split('=') for var in envvars):
  90. envbackup[key] = os.environ.get(key)
  91. os.environ[key] = value
  92. fakeenv[key] = value
  93. sys.stdout.flush()
  94. sys.stderr.flush()
  95. try:
  96. pipein, pipeout = os.pipe()
  97. pipein = os.fdopen(pipein, 'rb', 4096)
  98. pipeout = os.fdopen(pipeout, 'wb', 0)
  99. pid = os.fork()
  100. except OSError as e:
  101. bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
  102. if pid == 0:
  103. global worker_pipe
  104. pipein.close()
  105. # Save out the PID so that the event can include it the
  106. # events
  107. bb.event.worker_pid = os.getpid()
  108. bb.event.worker_fire = worker_child_fire
  109. worker_pipe = pipeout
  110. # Make the child the process group leader
  111. os.setpgid(0, 0)
  112. # No stdin
  113. newsi = os.open(os.devnull, os.O_RDWR)
  114. os.dup2(newsi, sys.stdin.fileno())
  115. if umask:
  116. os.umask(umask)
  117. data.setVar("BB_WORKERCONTEXT", "1")
  118. bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
  119. ret = 0
  120. try:
  121. the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
  122. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  123. for h in workerdata["hashes"]:
  124. the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
  125. for h in workerdata["hash_deps"]:
  126. the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
  127. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  128. # successfully. We also need to unset anything from the environment which shouldn't be there
  129. exports = bb.data.exported_vars(the_data)
  130. bb.utils.empty_environment()
  131. for e, v in exports:
  132. os.environ[e] = v
  133. for e in fakeenv:
  134. os.environ[e] = fakeenv[e]
  135. the_data.setVar(e, fakeenv[e])
  136. the_data.setVarFlag(e, 'export', "1")
  137. if quieterrors:
  138. the_data.setVarFlag(taskname, "quieterrors", "1")
  139. except Exception as exc:
  140. if not quieterrors:
  141. logger.critical(str(exc))
  142. os._exit(1)
  143. try:
  144. if not cfg.dry_run:
  145. ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  146. os._exit(ret)
  147. except:
  148. os._exit(1)
  149. else:
  150. for key, value in envbackup.iteritems():
  151. if value is None:
  152. del os.environ[key]
  153. else:
  154. os.environ[key] = value
  155. return pid, pipein, pipeout
  156. class runQueueWorkerPipe():
  157. """
  158. Abstraction for a pipe between a worker thread and the worker server
  159. """
  160. def __init__(self, pipein, pipeout):
  161. self.input = pipein
  162. if pipeout:
  163. pipeout.close()
  164. bb.utils.nonblockingfd(self.input)
  165. self.queue = ""
  166. def read(self):
  167. start = len(self.queue)
  168. try:
  169. self.queue = self.queue + self.input.read(102400)
  170. except (OSError, IOError) as e:
  171. if e.errno != errno.EAGAIN:
  172. raise
  173. end = len(self.queue)
  174. index = self.queue.find("</event>")
  175. while index != -1:
  176. worker_fire_prepickled(self.queue[:index+8])
  177. self.queue = self.queue[index+8:]
  178. index = self.queue.find("</event>")
  179. return (end > start)
  180. def close(self):
  181. while self.read():
  182. continue
  183. if len(self.queue) > 0:
  184. print("Warning, worker child left partial message: %s" % self.queue)
  185. self.input.close()
  186. normalexit = False
  187. class BitbakeWorker(object):
  188. def __init__(self, din):
  189. self.input = din
  190. bb.utils.nonblockingfd(self.input)
  191. self.queue = ""
  192. self.cookercfg = None
  193. self.databuilder = None
  194. self.data = None
  195. self.build_pids = {}
  196. self.build_pipes = {}
  197. def serve(self):
  198. while True:
  199. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  200. if self.input in ready or len(self.queue):
  201. start = len(self.queue)
  202. try:
  203. self.queue = self.queue + self.input.read()
  204. except (OSError, IOError):
  205. pass
  206. end = len(self.queue)
  207. self.handle_item("cookerconfig", self.handle_cookercfg)
  208. self.handle_item("workerdata", self.handle_workerdata)
  209. self.handle_item("runtask", self.handle_runtask)
  210. self.handle_item("finishnow", self.handle_finishnow)
  211. self.handle_item("ping", self.handle_ping)
  212. self.handle_item("quit", self.handle_quit)
  213. for pipe in self.build_pipes:
  214. self.build_pipes[pipe].read()
  215. if len(self.build_pids):
  216. self.process_waitpid()
  217. worker_flush()
  218. def handle_item(self, item, func):
  219. if self.queue.startswith("<" + item + ">"):
  220. index = self.queue.find("</" + item + ">")
  221. while index != -1:
  222. func(self.queue[(len(item) + 2):index])
  223. self.queue = self.queue[(index + len(item) + 3):]
  224. index = self.queue.find("</" + item + ">")
  225. def handle_cookercfg(self, data):
  226. self.cookercfg = pickle.loads(data)
  227. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  228. self.databuilder.parseBaseConfiguration()
  229. self.data = self.databuilder.data
  230. def handle_workerdata(self, data):
  231. self.workerdata = pickle.loads(data)
  232. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  233. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  234. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  235. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  236. self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
  237. def handle_ping(self, _):
  238. workerlog_write("Handling ping\n")
  239. logger.warn("Pong from bitbake-worker!")
  240. def handle_quit(self, data):
  241. workerlog_write("Handling quit\n")
  242. global normalexit
  243. normalexit = True
  244. sys.exit(0)
  245. def handle_runtask(self, data):
  246. fn, task, taskname, quieterrors, appends = pickle.loads(data)
  247. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  248. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, quieterrors)
  249. self.build_pids[pid] = task
  250. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  251. def process_waitpid(self):
  252. """
  253. Return none is there are no processes awaiting result collection, otherwise
  254. collect the process exit codes and close the information pipe.
  255. """
  256. try:
  257. pid, status = os.waitpid(-1, os.WNOHANG)
  258. if pid == 0 or os.WIFSTOPPED(status):
  259. return None
  260. except OSError:
  261. return None
  262. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  263. if os.WIFEXITED(status):
  264. status = os.WEXITSTATUS(status)
  265. elif os.WIFSIGNALED(status):
  266. # Per shell conventions for $?, when a process exits due to
  267. # a signal, we return an exit code of 128 + SIGNUM
  268. status = 128 + os.WTERMSIG(status)
  269. task = self.build_pids[pid]
  270. del self.build_pids[pid]
  271. self.build_pipes[pid].close()
  272. del self.build_pipes[pid]
  273. worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
  274. def handle_finishnow(self, _):
  275. if self.build_pids:
  276. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  277. for k, v in self.build_pids.iteritems():
  278. try:
  279. os.kill(-k, signal.SIGTERM)
  280. os.waitpid(-1, 0)
  281. except:
  282. pass
  283. for pipe in self.build_pipes:
  284. self.build_pipes[pipe].read()
  285. try:
  286. worker = BitbakeWorker(sys.stdin)
  287. worker.serve()
  288. except BaseException as e:
  289. if not normalexit:
  290. import traceback
  291. sys.stderr.write(traceback.format_exc())
  292. sys.stderr.write(str(e))
  293. while len(worker_queue):
  294. worker_flush()
  295. workerlog_write("exitting")
  296. sys.exit(0)