bitbake-worker 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. #!/usr/bin/env python3
  2. #
  3. # Copyright BitBake Contributors
  4. #
  5. # SPDX-License-Identifier: GPL-2.0-only
  6. #
  7. import os
  8. import sys
  9. import warnings
  10. warnings.simplefilter("default")
  11. warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*use.of.fork.*may.lead.to.deadlocks.in.the.child.*")
  12. sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
  13. from bb import fetch2
  14. import logging
  15. import bb
  16. import select
  17. import errno
  18. import signal
  19. import pickle
  20. import traceback
  21. import queue
  22. import shlex
  23. import subprocess
  24. import fcntl
  25. from multiprocessing import Lock
  26. from threading import Thread
  27. # Remove when we have a minimum of python 3.10
  28. if not hasattr(fcntl, 'F_SETPIPE_SZ'):
  29. fcntl.F_SETPIPE_SZ = 1031
  30. bb.utils.check_system_locale()
  31. # Users shouldn't be running this code directly
  32. if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
  33. print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
  34. sys.exit(1)
  35. profiling = False
  36. if sys.argv[1].startswith("decafbadbad"):
  37. profiling = True
  38. try:
  39. import cProfile as profile
  40. except:
  41. import profile
  42. # Unbuffer stdout to avoid log truncation in the event
  43. # of an unorderly exit as well as to provide timely
  44. # updates to log files for use with tail
  45. try:
  46. if sys.stdout.name == '<stdout>':
  47. fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
  48. fl |= os.O_SYNC
  49. fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
  50. #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  51. except:
  52. pass
  53. logger = logging.getLogger("BitBake")
  54. worker_pipe = sys.stdout.fileno()
  55. bb.utils.nonblockingfd(worker_pipe)
  56. # Try to make the pipe buffers larger as it is much more efficient. If we can't
  57. # e.g. out of buffer space (/proc/sys/fs/pipe-user-pages-soft) then just pass over.
  58. try:
  59. fcntl.fcntl(worker_pipe, fcntl.F_SETPIPE_SZ, 512 * 1024)
  60. except:
  61. pass
  62. # Need to guard against multiprocessing being used in child processes
  63. # and multiple processes trying to write to the parent at the same time
  64. worker_pipe_lock = None
  65. handler = bb.event.LogHandler()
  66. logger.addHandler(handler)
  67. if 0:
  68. # Code to write out a log file of all events passing through the worker
  69. logfilename = "/tmp/workerlogfile"
  70. format_str = "%(levelname)s: %(message)s"
  71. conlogformat = bb.msg.BBLogFormatter(format_str)
  72. consolelog = logging.FileHandler(logfilename)
  73. consolelog.setFormatter(conlogformat)
  74. logger.addHandler(consolelog)
  75. worker_queue = queue.Queue()
  76. def worker_fire(event, d):
  77. data = b"<event>" + pickle.dumps(event) + b"</event>"
  78. worker_fire_prepickled(data)
  79. def worker_fire_prepickled(event):
  80. global worker_queue
  81. worker_queue.put(event)
  82. #
  83. # We can end up with write contention with the cooker, it can be trying to send commands
  84. # and we can be trying to send event data back. Therefore use a separate thread for writing
  85. # back data to cooker.
  86. #
  87. worker_thread_exit = False
  88. def worker_flush(worker_queue):
  89. worker_queue_int = bytearray()
  90. global worker_pipe, worker_thread_exit
  91. while True:
  92. try:
  93. worker_queue_int.extend(worker_queue.get(True, 1))
  94. except queue.Empty:
  95. pass
  96. while (worker_queue_int or not worker_queue.empty()):
  97. try:
  98. (_, ready, _) = select.select([], [worker_pipe], [], 1)
  99. if not worker_queue.empty():
  100. worker_queue_int.extend(worker_queue.get())
  101. written = os.write(worker_pipe, worker_queue_int)
  102. del worker_queue_int[0:written]
  103. except (IOError, OSError) as e:
  104. if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
  105. raise
  106. if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
  107. return
  108. worker_thread = Thread(target=worker_flush, args=(worker_queue,))
  109. worker_thread.start()
  110. def worker_child_fire(event, d):
  111. global worker_pipe
  112. global worker_pipe_lock
  113. data = b"<event>" + pickle.dumps(event) + b"</event>"
  114. try:
  115. with bb.utils.lock_timeout(worker_pipe_lock):
  116. while(len(data)):
  117. written = worker_pipe.write(data)
  118. data = data[written:]
  119. except IOError:
  120. sigterm_handler(None, None)
  121. raise
  122. bb.event.worker_fire = worker_fire
  123. lf = None
  124. #lf = open("/tmp/workercommandlog", "w+")
  125. def workerlog_write(msg):
  126. if lf:
  127. lf.write(msg)
  128. lf.flush()
  129. def sigterm_handler(signum, frame):
  130. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  131. os.killpg(0, signal.SIGTERM)
  132. sys.exit()
  133. def fork_off_task(cfg, data, databuilder, workerdata, extraconfigdata, runtask):
  134. fn = runtask['fn']
  135. task = runtask['task']
  136. taskname = runtask['taskname']
  137. taskhash = runtask['taskhash']
  138. unihash = runtask['unihash']
  139. appends = runtask['appends']
  140. layername = runtask['layername']
  141. taskdepdata = runtask['taskdepdata']
  142. quieterrors = runtask['quieterrors']
  143. # We need to setup the environment BEFORE the fork, since
  144. # a fork() or exec*() activates PSEUDO...
  145. envbackup = {}
  146. fakeroot = False
  147. fakeenv = {}
  148. umask = None
  149. uid = os.getuid()
  150. gid = os.getgid()
  151. taskdep = runtask['taskdep']
  152. if 'umask' in taskdep and taskname in taskdep['umask']:
  153. umask = taskdep['umask'][taskname]
  154. elif workerdata["umask"]:
  155. umask = workerdata["umask"]
  156. if umask:
  157. # umask might come in as a number or text string..
  158. try:
  159. umask = int(umask, 8)
  160. except TypeError:
  161. pass
  162. dry_run = cfg.dry_run or runtask['dry_run']
  163. # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
  164. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
  165. fakeroot = True
  166. envvars = (runtask['fakerootenv'] or "").split()
  167. for key, value in (var.split('=',1) for var in envvars):
  168. envbackup[key] = os.environ.get(key)
  169. os.environ[key] = value
  170. fakeenv[key] = value
  171. fakedirs = (runtask['fakerootdirs'] or "").split()
  172. for p in fakedirs:
  173. bb.utils.mkdirhier(p)
  174. logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
  175. (fn, taskname, ', '.join(fakedirs)))
  176. else:
  177. envvars = (runtask['fakerootnoenv'] or "").split()
  178. for key, value in (var.split('=',1) for var in envvars):
  179. envbackup[key] = os.environ.get(key)
  180. os.environ[key] = value
  181. fakeenv[key] = value
  182. sys.stdout.flush()
  183. sys.stderr.flush()
  184. try:
  185. pipein, pipeout = os.pipe()
  186. pipein = os.fdopen(pipein, 'rb', 4096)
  187. pipeout = os.fdopen(pipeout, 'wb', 0)
  188. pid = os.fork()
  189. except OSError as e:
  190. logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
  191. sys.exit(1)
  192. if pid == 0:
  193. def child():
  194. global worker_pipe
  195. global worker_pipe_lock
  196. pipein.close()
  197. bb.utils.signal_on_parent_exit("SIGTERM")
  198. # Save out the PID so that the event can include it the
  199. # events
  200. bb.event.worker_pid = os.getpid()
  201. bb.event.worker_fire = worker_child_fire
  202. worker_pipe = pipeout
  203. worker_pipe_lock = Lock()
  204. # Make the child the process group leader and ensure no
  205. # child process will be controlled by the current terminal
  206. # This ensures signals sent to the controlling terminal like Ctrl+C
  207. # don't stop the child processes.
  208. os.setsid()
  209. signal.signal(signal.SIGTERM, sigterm_handler)
  210. # Let SIGHUP exit as SIGTERM
  211. signal.signal(signal.SIGHUP, sigterm_handler)
  212. # No stdin & stdout
  213. # stdout is used as a status report channel and must not be used by child processes.
  214. dumbio = os.open(os.devnull, os.O_RDWR)
  215. os.dup2(dumbio, sys.stdin.fileno())
  216. os.dup2(dumbio, sys.stdout.fileno())
  217. if umask is not None:
  218. os.umask(umask)
  219. try:
  220. (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
  221. the_data = databuilder.mcdata[mc]
  222. the_data.setVar("BB_WORKERCONTEXT", "1")
  223. the_data.setVar("BB_TASKDEPDATA", taskdepdata)
  224. the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
  225. if cfg.limited_deps:
  226. the_data.setVar("BB_LIMITEDDEPS", "1")
  227. the_data.setVar("BUILDNAME", workerdata["buildname"])
  228. the_data.setVar("DATE", workerdata["date"])
  229. the_data.setVar("TIME", workerdata["time"])
  230. for varname, value in extraconfigdata.items():
  231. the_data.setVar(varname, value)
  232. bb.parse.siggen.set_taskdata(workerdata["sigdata"])
  233. if "newhashes" in workerdata:
  234. bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
  235. ret = 0
  236. the_data = databuilder.parseRecipe(fn, appends, layername)
  237. the_data.setVar('BB_TASKHASH', taskhash)
  238. the_data.setVar('BB_UNIHASH', unihash)
  239. bb.parse.siggen.setup_datacache_from_datastore(fn, the_data)
  240. bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
  241. if not bb.utils.to_boolean(the_data.getVarFlag(taskname, 'network')):
  242. if bb.utils.is_local_uid(uid):
  243. logger.debug("Attempting to disable network for %s" % taskname)
  244. bb.utils.disable_network(uid, gid)
  245. else:
  246. logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
  247. # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
  248. # successfully. We also need to unset anything from the environment which shouldn't be there
  249. exports = bb.data.exported_vars(the_data)
  250. bb.utils.empty_environment()
  251. for e, v in exports:
  252. os.environ[e] = v
  253. for e in fakeenv:
  254. os.environ[e] = fakeenv[e]
  255. the_data.setVar(e, fakeenv[e])
  256. the_data.setVarFlag(e, 'export', "1")
  257. task_exports = the_data.getVarFlag(taskname, 'exports')
  258. if task_exports:
  259. for e in task_exports.split():
  260. the_data.setVarFlag(e, 'export', '1')
  261. v = the_data.getVar(e)
  262. if v is not None:
  263. os.environ[e] = v
  264. if quieterrors:
  265. the_data.setVarFlag(taskname, "quieterrors", "1")
  266. except Exception:
  267. if not quieterrors:
  268. logger.critical(traceback.format_exc())
  269. os._exit(1)
  270. sys.stdout.flush()
  271. sys.stderr.flush()
  272. try:
  273. if dry_run:
  274. return 0
  275. try:
  276. ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
  277. finally:
  278. if fakeroot:
  279. fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
  280. subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
  281. return ret
  282. except:
  283. os._exit(1)
  284. if not profiling:
  285. os._exit(child())
  286. else:
  287. profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
  288. prof = profile.Profile()
  289. try:
  290. ret = profile.Profile.runcall(prof, child)
  291. finally:
  292. prof.dump_stats(profname)
  293. bb.utils.process_profilelog(profname)
  294. os._exit(ret)
  295. else:
  296. for key, value in iter(envbackup.items()):
  297. if value is None:
  298. del os.environ[key]
  299. else:
  300. os.environ[key] = value
  301. return pid, pipein, pipeout
  302. class runQueueWorkerPipe():
  303. """
  304. Abstraction for a pipe between a worker thread and the worker server
  305. """
  306. def __init__(self, pipein, pipeout):
  307. self.input = pipein
  308. if pipeout:
  309. pipeout.close()
  310. bb.utils.nonblockingfd(self.input)
  311. self.queue = bytearray()
  312. def read(self):
  313. start = len(self.queue)
  314. try:
  315. self.queue.extend(self.input.read(512*1024) or b"")
  316. except (OSError, IOError) as e:
  317. if e.errno != errno.EAGAIN:
  318. raise
  319. end = len(self.queue)
  320. index = self.queue.find(b"</event>")
  321. while index != -1:
  322. msg = self.queue[:index+8]
  323. assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
  324. worker_fire_prepickled(msg)
  325. self.queue = self.queue[index+8:]
  326. index = self.queue.find(b"</event>")
  327. return (end > start)
  328. def close(self):
  329. while self.read():
  330. continue
  331. if len(self.queue) > 0:
  332. print("Warning, worker child left partial message: %s" % self.queue)
  333. self.input.close()
  334. normalexit = False
  335. class BitbakeWorker(object):
  336. def __init__(self, din):
  337. self.input = din
  338. bb.utils.nonblockingfd(self.input)
  339. self.queue = bytearray()
  340. self.cookercfg = None
  341. self.databuilder = None
  342. self.data = None
  343. self.extraconfigdata = None
  344. self.build_pids = {}
  345. self.build_pipes = {}
  346. signal.signal(signal.SIGTERM, self.sigterm_exception)
  347. # Let SIGHUP exit as SIGTERM
  348. signal.signal(signal.SIGHUP, self.sigterm_exception)
  349. if "beef" in sys.argv[1]:
  350. bb.utils.set_process_name("Worker (Fakeroot)")
  351. else:
  352. bb.utils.set_process_name("Worker")
  353. def sigterm_exception(self, signum, stackframe):
  354. if signum == signal.SIGTERM:
  355. bb.warn("Worker received SIGTERM, shutting down...")
  356. elif signum == signal.SIGHUP:
  357. bb.warn("Worker received SIGHUP, shutting down...")
  358. self.handle_finishnow(None)
  359. signal.signal(signal.SIGTERM, signal.SIG_DFL)
  360. os.kill(os.getpid(), signal.SIGTERM)
  361. def serve(self):
  362. while True:
  363. (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
  364. if self.input in ready:
  365. try:
  366. r = self.input.read()
  367. if len(r) == 0:
  368. # EOF on pipe, server must have terminated
  369. self.sigterm_exception(signal.SIGTERM, None)
  370. self.queue.extend(r)
  371. except (OSError, IOError):
  372. pass
  373. if len(self.queue):
  374. self.handle_item(b"cookerconfig", self.handle_cookercfg)
  375. self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
  376. self.handle_item(b"workerdata", self.handle_workerdata)
  377. self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
  378. self.handle_item(b"runtask", self.handle_runtask)
  379. self.handle_item(b"finishnow", self.handle_finishnow)
  380. self.handle_item(b"ping", self.handle_ping)
  381. self.handle_item(b"quit", self.handle_quit)
  382. for pipe in self.build_pipes:
  383. if self.build_pipes[pipe].input in ready:
  384. self.build_pipes[pipe].read()
  385. if len(self.build_pids):
  386. while self.process_waitpid():
  387. continue
  388. def handle_item(self, item, func):
  389. opening_tag = b"<" + item + b">"
  390. if not self.queue.startswith(opening_tag):
  391. return
  392. tag_len = len(opening_tag)
  393. if len(self.queue) < tag_len + 4:
  394. # we need to receive more data
  395. return
  396. header = self.queue[tag_len:tag_len + 4]
  397. payload_len = int.from_bytes(header, 'big')
  398. # closing tag has length (tag_len + 1)
  399. if len(self.queue) < tag_len * 2 + 1 + payload_len:
  400. # we need to receive more data
  401. return
  402. index = self.queue.find(b"</" + item + b">")
  403. if index != -1:
  404. try:
  405. func(self.queue[(tag_len + 4):index])
  406. except pickle.UnpicklingError:
  407. workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
  408. raise
  409. self.queue = self.queue[(index + len(b"</") + len(item) + len(b">")):]
  410. def handle_cookercfg(self, data):
  411. self.cookercfg = pickle.loads(data)
  412. self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
  413. self.databuilder.parseBaseConfiguration(worker=True)
  414. self.data = self.databuilder.data
  415. def handle_extraconfigdata(self, data):
  416. self.extraconfigdata = pickle.loads(data)
  417. def handle_workerdata(self, data):
  418. self.workerdata = pickle.loads(data)
  419. bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
  420. bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
  421. bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
  422. bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
  423. for mc in self.databuilder.mcdata:
  424. self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
  425. self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
  426. self.databuilder.mcdata[mc].setVar("__bbclasstype", "recipe")
  427. def handle_newtaskhashes(self, data):
  428. self.workerdata["newhashes"] = pickle.loads(data)
  429. def handle_ping(self, _):
  430. workerlog_write("Handling ping\n")
  431. logger.warning("Pong from bitbake-worker!")
  432. def handle_quit(self, data):
  433. workerlog_write("Handling quit\n")
  434. global normalexit
  435. normalexit = True
  436. sys.exit(0)
  437. def handle_runtask(self, data):
  438. runtask = pickle.loads(data)
  439. fn = runtask['fn']
  440. task = runtask['task']
  441. taskname = runtask['taskname']
  442. workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
  443. pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, self.extraconfigdata, runtask)
  444. self.build_pids[pid] = task
  445. self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
  446. def process_waitpid(self):
  447. """
  448. Return none is there are no processes awaiting result collection, otherwise
  449. collect the process exit codes and close the information pipe.
  450. """
  451. try:
  452. pid, status = os.waitpid(-1, os.WNOHANG)
  453. if pid == 0 or os.WIFSTOPPED(status):
  454. return False
  455. except OSError:
  456. return False
  457. workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
  458. if os.WIFEXITED(status):
  459. status = os.WEXITSTATUS(status)
  460. elif os.WIFSIGNALED(status):
  461. # Per shell conventions for $?, when a process exits due to
  462. # a signal, we return an exit code of 128 + SIGNUM
  463. status = 128 + os.WTERMSIG(status)
  464. task = self.build_pids[pid]
  465. del self.build_pids[pid]
  466. self.build_pipes[pid].close()
  467. del self.build_pipes[pid]
  468. worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
  469. return True
  470. def handle_finishnow(self, _):
  471. if self.build_pids:
  472. logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
  473. for k, v in iter(self.build_pids.items()):
  474. try:
  475. os.kill(-k, signal.SIGTERM)
  476. os.waitpid(-1, 0)
  477. except:
  478. pass
  479. for pipe in self.build_pipes:
  480. self.build_pipes[pipe].read()
  481. try:
  482. worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
  483. if not profiling:
  484. worker.serve()
  485. else:
  486. profname = "profile-worker.log"
  487. prof = profile.Profile()
  488. try:
  489. profile.Profile.runcall(prof, worker.serve)
  490. finally:
  491. prof.dump_stats(profname)
  492. bb.utils.process_profilelog(profname)
  493. except BaseException as e:
  494. if not normalexit:
  495. import traceback
  496. sys.stderr.write(traceback.format_exc())
  497. sys.stderr.write(str(e))
  498. finally:
  499. worker_thread_exit = True
  500. worker_thread.join()
  501. workerlog_write("exiting")
  502. if not normalexit:
  503. sys.exit(1)
  504. sys.exit(0)