bitbake-worker 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import warnings
  5. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  6. from bb import fetch2
  7. import logging
  8. import bb
  9. import select
  10. import errno
  11. import signal
  12. # Users shouldn't be running this code directly
  13. if len(sys.argv) != 2 or sys.argv[1] != "decafbad":
  14. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  15. sys.exit(1)
  16. logger = logging.getLogger("BitBake")
  17. try:
  18. import cPickle as pickle
  19. except ImportError:
  20. import pickle
  21. bb.msg.note(1, bb.msg.domain.Cache, "Importing cPickle failed. Falling back to a very slow implementation.")
  22. worker_pipe = sys.stdout.fileno()
  23. bb.utils.nonblockingfd(worker_pipe)
  24. handler = bb.event.LogHandler()
  25. logger.addHandler(handler)
  26. if 0:
  27. # Code to write out a log file of all events passing through the worker
  28. logfilename = "/tmp/workerlogfile"
  29. format_str = "%(levelname)s: %(message)s"
  30. conlogformat = bb.msg.BBLogFormatter(format_str)
  31. consolelog = logging.FileHandler(logfilename)
  32. bb.msg.addDefaultlogFilter(consolelog)
  33. consolelog.setFormatter(conlogformat)
  34. logger.addHandler(consolelog)
  35. worker_queue = ""
  36. def worker_fire(event, d):
  37. data = "<event>" + pickle.dumps(event) + "</event>"
  38. worker_fire_prepickled(data)
  39. def worker_fire_prepickled(event):
  40. global worker_queue
  41. worker_queue = worker_queue + event
  42. worker_flush()
  43. def worker_flush():
  44. global worker_queue, worker_pipe
  45. if not worker_queue:
  46. return
  47. try:
  48. written = os.write(worker_pipe, worker_queue)
  49. worker_queue = worker_queue[written:]
  50. except (IOError, OSError) as e:
  51. if e.errno != errno.EAGAIN:
  52. raise
  53. def worker_child_fire(event, d):
  54. global worker_pipe
  55. data = "<event>" + pickle.dumps(event) + "</event>"
  56. worker_pipe.write(data)
  57. bb.event.worker_fire = worker_fire
  58. lf = None
  59. #lf = open("/tmp/workercommandlog", "w+")
  60. def workerlog_write(msg):
  61. if lf:
  62. lf.write(msg)
  63. lf.flush()
  64. def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdata, quieterrors=False):
  65. # We need to setup the environment BEFORE the fork, since
  66. # a fork() or exec*() activates PSEUDO...
  67. envbackup = {}
  68. fakeenv = {}
  69. umask = None
  70. taskdep = workerdata["taskdeps"][fn]
  71. if 'umask' in taskdep and taskname in taskdep['umask']:
  72. # umask might come in as a number or text string..
  73. try:
  74. umask = int(taskdep['umask'][taskname],8)
  75. except TypeError:
  76. umask = taskdep['umask'][taskname]
  77. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  78. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not cfg.dry_run:
  79. envvars = (workerdata["fakerootenv"][fn] or "").split()
  80. for key, value in (var.split('=') for var in envvars):
  81. envbackup[key] = os.environ.get(key)
  82. os.environ[key] = value
  83. fakeenv[key] = value
  84. fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
  85. for p in fakedirs:
  86. bb.utils.mkdirhier(p)
  87. logger.debug(2, 'Running %s:%s under fakeroot, fakedirs: %s' %
  88. (fn, taskname, ', '.join(fakedirs)))
  89. else:
  90. envvars = (workerdata["fakerootnoenv"][fn] or "").split()
  91. for key, value in (var.split('=') for var in envvars):
  92. envbackup[key] = os.environ.get(key)
  93. os.environ[key] = value
  94. fakeenv[key] = value
  95. sys.stdout.flush()
  96. sys.stderr.flush()
  97. try:
  98. pipein, pipeout = os.pipe()
  99. pipein = os.fdopen(pipein, 'rb', 4096)
  100. pipeout = os.fdopen(pipeout, 'wb', 0)
  101. pid = os.fork()
  102. except OSError as e:
  103. bb.msg.fatal("RunQueue", "fork failed: %d (%s)" % (e.errno, e.strerror))
  104. if pid == 0:
  105. global worker_pipe
  106. pipein.close()
  107. # Save out the PID so that the event can include it the
  108. # events
  109. bb.event.worker_pid = os.getpid()
  110. bb.event.worker_fire = worker_child_fire
  111. worker_pipe = pipeout
  112. # Make the child the process group leader
  113. os.setpgid(0, 0)
  114. # No stdin
  115. newsi = os.open(os.devnull, os.O_RDWR)
  116. os.dup2(newsi, sys.stdin.fileno())
  117. if umask:
  118. os.umask(umask)
  119. data.setVar("BB_WORKERCONTEXT", "1")
  120. data.setVar("BB_TASKDEPDATA", taskdepdata)
  121. data.setVar("BUILDNAME", workerdata["buildname"])
  122. data.setVar("DATE", workerdata["date"])
  123. data.setVar("TIME", workerdata["time"])
  124. bb.parse.siggen.set_taskdata(workerdata["hashes"], workerdata["hash_deps"], workerdata["sigchecksums"])
  125. ret = 0
  126. try:
  127. the_data = bb.cache.Cache.loadDataFull(fn, appends, data)
  128. the_data.setVar('BB_TASKHASH', workerdata["runq_hash"][task])
  129. for h in workerdata["hashes"]:
  130. the_data.setVar("BBHASH_%s" % h, workerdata["hashes"][h])
  131. for h in workerdata["hash_deps"]:
  132. the_data.setVar("BBHASHDEPS_%s" % h, workerdata["hash_deps"][h])
  133. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  134. # successfully. We also need to unset anything from the environment which shouldn't be there
  135. exports = bb.data.exported_vars(the_data)
  136. bb.utils.empty_environment()
  137. for e, v in exports:
  138. os.environ[e] = v
  139. for e in fakeenv:
  140. os.environ[e] = fakeenv[e]
  141. the_data.setVar(e, fakeenv[e])
  142. the_data.setVarFlag(e, 'export', "1")
  143. if quieterrors:
  144. the_data.setVarFlag(taskname, "quieterrors", "1")
  145. except Exception as exc:
  146. if not quieterrors:
  147. logger.critical(str(exc))
  148. os._exit(1)
  149. try:
  150. if not cfg.dry_run:
  151. ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  152. os._exit(ret)
  153. except:
  154. os._exit(1)
  155. else:
  156. for key, value in envbackup.iteritems():
  157. if value is None:
  158. del os.environ[key]
  159. else:
  160. os.environ[key] = value
  161. return pid, pipein, pipeout
  162. class runQueueWorkerPipe():
  163. """
  164. Abstraction for a pipe between a worker thread and the worker server
  165. """
  166. def __init__(self, pipein, pipeout):
  167. self.input = pipein
  168. if pipeout:
  169. pipeout.close()
  170. bb.utils.nonblockingfd(self.input)
  171. self.queue = ""
  172. def read(self):
  173. start = len(self.queue)
  174. try:
  175. self.queue = self.queue + self.input.read(102400)
  176. except (OSError, IOError) as e:
  177. if e.errno != errno.EAGAIN:
  178. raise
  179. end = len(self.queue)
  180. index = self.queue.find("</event>")
  181. while index != -1:
  182. worker_fire_prepickled(self.queue[:index+8])
  183. self.queue = self.queue[index+8:]
  184. index = self.queue.find("</event>")
  185. return (end > start)
  186. def close(self):
  187. while self.read():
  188. continue
  189. if len(self.queue) > 0:
  190. print("Warning, worker child left partial message: %s" % self.queue)
  191. self.input.close()
  192. normalexit = False
  193. class BitbakeWorker(object):
  194. def __init__(self, din):
  195. self.input = din
  196. bb.utils.nonblockingfd(self.input)
  197. self.queue = ""
  198. self.cookercfg = None
  199. self.databuilder = None
  200. self.data = None
  201. self.build_pids = {}
  202. self.build_pipes = {}
  203. def serve(self):
  204. while True:
  205. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  206. if self.input in ready or len(self.queue):
  207. start = len(self.queue)
  208. try:
  209. self.queue = self.queue + self.input.read()
  210. except (OSError, IOError):
  211. pass
  212. end = len(self.queue)
  213. self.handle_item("cookerconfig", self.handle_cookercfg)
  214. self.handle_item("workerdata", self.handle_workerdata)
  215. self.handle_item("runtask", self.handle_runtask)
  216. self.handle_item("finishnow", self.handle_finishnow)
  217. self.handle_item("ping", self.handle_ping)
  218. self.handle_item("quit", self.handle_quit)
  219. for pipe in self.build_pipes:
  220. self.build_pipes[pipe].read()
  221. if len(self.build_pids):
  222. self.process_waitpid()
  223. worker_flush()
  224. def handle_item(self, item, func):
  225. if self.queue.startswith("<" + item + ">"):
  226. index = self.queue.find("</" + item + ">")
  227. while index != -1:
  228. func(self.queue[(len(item) + 2):index])
  229. self.queue = self.queue[(index + len(item) + 3):]
  230. index = self.queue.find("</" + item + ">")
  231. def handle_cookercfg(self, data):
  232. self.cookercfg = pickle.loads(data)
  233. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  234. self.databuilder.parseBaseConfiguration()
  235. self.data = self.databuilder.data
  236. def handle_workerdata(self, data):
  237. self.workerdata = pickle.loads(data)
  238. bb.msg.loggerDefaultDebugLevel = self.workerdata["logdefaultdebug"]
  239. bb.msg.loggerDefaultVerbose = self.workerdata["logdefaultverbose"]
  240. bb.msg.loggerVerboseLogs = self.workerdata["logdefaultverboselogs"]
  241. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  242. self.data.setVar("PRSERV_HOST", self.workerdata["prhost"])
  243. def handle_ping(self, _):
  244. workerlog_write("Handling ping\n")
  245. logger.warn("Pong from bitbake-worker!")
  246. def handle_quit(self, data):
  247. workerlog_write("Handling quit\n")
  248. global normalexit
  249. normalexit = True
  250. sys.exit(0)
  251. def handle_runtask(self, data):
  252. fn, task, taskname, quieterrors, appends, taskdepdata = pickle.loads(data)
  253. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  254. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.workerdata, fn, task, taskname, appends, taskdepdata, quieterrors)
  255. self.build_pids[pid] = task
  256. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  257. def process_waitpid(self):
  258. """
  259. Return none is there are no processes awaiting result collection, otherwise
  260. collect the process exit codes and close the information pipe.
  261. """
  262. try:
  263. pid, status = os.waitpid(-1, os.WNOHANG)
  264. if pid == 0 or os.WIFSTOPPED(status):
  265. return None
  266. except OSError:
  267. return None
  268. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  269. if os.WIFEXITED(status):
  270. status = os.WEXITSTATUS(status)
  271. elif os.WIFSIGNALED(status):
  272. # Per shell conventions for $?, when a process exits due to
  273. # a signal, we return an exit code of 128 + SIGNUM
  274. status = 128 + os.WTERMSIG(status)
  275. task = self.build_pids[pid]
  276. del self.build_pids[pid]
  277. self.build_pipes[pid].close()
  278. del self.build_pipes[pid]
  279. worker_fire_prepickled("<exitcode>" + pickle.dumps((task, status)) + "</exitcode>")
  280. def handle_finishnow(self, _):
  281. if self.build_pids:
  282. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  283. for k, v in self.build_pids.iteritems():
  284. try:
  285. os.kill(-k, signal.SIGTERM)
  286. os.waitpid(-1, 0)
  287. except:
  288. pass
  289. for pipe in self.build_pipes:
  290. self.build_pipes[pipe].read()
  291. try:
  292. worker = BitbakeWorker(sys.stdin)
  293. worker.serve()
  294. except BaseException as e:
  295. if not normalexit:
  296. import traceback
  297. sys.stderr.write(traceback.format_exc())
  298. sys.stderr.write(str(e))
  299. while len(worker_queue):
  300. worker_flush()
  301. workerlog_write("exitting")
  302. sys.exit(0)