runqueue.py 73 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837
  1. #!/usr/bin/env python
  2. # ex:ts=4:sw=4:sts=4:et
  3. # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
  4. """
  5. BitBake 'RunQueue' implementation
  6. Handles preparation and execution of a queue of tasks
  7. """
  8. # Copyright (C) 2006-2007 Richard Purdie
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License version 2 as
  12. # published by the Free Software Foundation.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License along
  20. # with this program; if not, write to the Free Software Foundation, Inc.,
  21. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  22. import copy
  23. import os
  24. import sys
  25. import signal
  26. import stat
  27. import fcntl
  28. import errno
  29. import logging
  30. import bb
  31. from bb import msg, data, event
  32. from bb import monitordisk
  33. import subprocess
  34. try:
  35. import cPickle as pickle
  36. except ImportError:
  37. import pickle
  38. bblogger = logging.getLogger("BitBake")
  39. logger = logging.getLogger("BitBake.RunQueue")
  40. class RunQueueStats:
  41. """
  42. Holds statistics on the tasks handled by the associated runQueue
  43. """
  44. def __init__(self, total):
  45. self.completed = 0
  46. self.skipped = 0
  47. self.failed = 0
  48. self.active = 0
  49. self.total = total
  50. def copy(self):
  51. obj = self.__class__(self.total)
  52. obj.__dict__.update(self.__dict__)
  53. return obj
  54. def taskFailed(self):
  55. self.active = self.active - 1
  56. self.failed = self.failed + 1
  57. def taskCompleted(self, number = 1):
  58. self.active = self.active - number
  59. self.completed = self.completed + number
  60. def taskSkipped(self, number = 1):
  61. self.active = self.active + number
  62. self.skipped = self.skipped + number
  63. def taskActive(self):
  64. self.active = self.active + 1
  65. # These values indicate the next step due to be run in the
  66. # runQueue state machine
  67. runQueuePrepare = 2
  68. runQueueSceneInit = 3
  69. runQueueSceneRun = 4
  70. runQueueRunInit = 5
  71. runQueueRunning = 6
  72. runQueueFailed = 7
  73. runQueueCleanUp = 8
  74. runQueueComplete = 9
  75. class RunQueueScheduler(object):
  76. """
  77. Control the order tasks are scheduled in.
  78. """
  79. name = "basic"
  80. def __init__(self, runqueue, rqdata):
  81. """
  82. The default scheduler just returns the first buildable task (the
  83. priority map is sorted by task numer)
  84. """
  85. self.rq = runqueue
  86. self.rqdata = rqdata
  87. numTasks = len(self.rqdata.runq_fnid)
  88. self.prio_map = []
  89. self.prio_map.extend(range(numTasks))
  90. def next_buildable_task(self):
  91. """
  92. Return the id of the first task we find that is buildable
  93. """
  94. for tasknum in xrange(len(self.rqdata.runq_fnid)):
  95. taskid = self.prio_map[tasknum]
  96. if self.rq.runq_running[taskid] == 1:
  97. continue
  98. if self.rq.runq_buildable[taskid] == 1:
  99. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[taskid]]
  100. taskname = self.rqdata.runq_task[taskid]
  101. stamp = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
  102. if stamp in self.rq.build_stamps.values():
  103. continue
  104. return taskid
  105. def next(self):
  106. """
  107. Return the id of the task we should build next
  108. """
  109. if self.rq.stats.active < self.rq.number_tasks:
  110. return self.next_buildable_task()
  111. class RunQueueSchedulerSpeed(RunQueueScheduler):
  112. """
  113. A scheduler optimised for speed. The priority map is sorted by task weight,
  114. heavier weighted tasks (tasks needed by the most other tasks) are run first.
  115. """
  116. name = "speed"
  117. def __init__(self, runqueue, rqdata):
  118. """
  119. The priority map is sorted by task weight.
  120. """
  121. self.rq = runqueue
  122. self.rqdata = rqdata
  123. sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
  124. copyweight = copy.deepcopy(self.rqdata.runq_weight)
  125. self.prio_map = []
  126. for weight in sortweight:
  127. idx = copyweight.index(weight)
  128. self.prio_map.append(idx)
  129. copyweight[idx] = -1
  130. self.prio_map.reverse()
  131. class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
  132. """
  133. A scheduler optimised to complete .bb files are quickly as possible. The
  134. priority map is sorted by task weight, but then reordered so once a given
  135. .bb file starts to build, its completed as quickly as possible. This works
  136. well where disk space is at a premium and classes like OE's rm_work are in
  137. force.
  138. """
  139. name = "completion"
  140. def __init__(self, runqueue, rqdata):
  141. RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
  142. #FIXME - whilst this groups all fnids together it does not reorder the
  143. #fnid groups optimally.
  144. basemap = copy.deepcopy(self.prio_map)
  145. self.prio_map = []
  146. while (len(basemap) > 0):
  147. entry = basemap.pop(0)
  148. self.prio_map.append(entry)
  149. fnid = self.rqdata.runq_fnid[entry]
  150. todel = []
  151. for entry in basemap:
  152. entry_fnid = self.rqdata.runq_fnid[entry]
  153. if entry_fnid == fnid:
  154. todel.append(basemap.index(entry))
  155. self.prio_map.append(entry)
  156. todel.reverse()
  157. for idx in todel:
  158. del basemap[idx]
  159. class RunQueueData:
  160. """
  161. BitBake Run Queue implementation
  162. """
  163. def __init__(self, rq, cooker, cfgData, dataCache, taskData, targets):
  164. self.cooker = cooker
  165. self.dataCache = dataCache
  166. self.taskData = taskData
  167. self.targets = targets
  168. self.rq = rq
  169. self.warn_multi_bb = False
  170. self.stampwhitelist = cfgData.getVar("BB_STAMP_WHITELIST", True) or ""
  171. self.multi_provider_whitelist = (cfgData.getVar("MULTI_PROVIDER_WHITELIST", True) or "").split()
  172. self.reset()
  173. def reset(self):
  174. self.runq_fnid = []
  175. self.runq_task = []
  176. self.runq_depends = []
  177. self.runq_revdeps = []
  178. self.runq_hash = []
  179. def runq_depends_names(self, ids):
  180. import re
  181. ret = []
  182. for id in self.runq_depends[ids]:
  183. nam = os.path.basename(self.get_user_idstring(id))
  184. nam = re.sub("_[^,]*,", ",", nam)
  185. ret.extend([nam])
  186. return ret
  187. def get_user_idstring(self, task, task_name_suffix = ""):
  188. fn = self.taskData.fn_index[self.runq_fnid[task]]
  189. taskname = self.runq_task[task] + task_name_suffix
  190. return "%s, %s" % (fn, taskname)
  191. def get_task_id(self, fnid, taskname):
  192. for listid in xrange(len(self.runq_fnid)):
  193. if self.runq_fnid[listid] == fnid and self.runq_task[listid] == taskname:
  194. return listid
  195. return None
  196. def circular_depchains_handler(self, tasks):
  197. """
  198. Some tasks aren't buildable, likely due to circular dependency issues.
  199. Identify the circular dependencies and print them in a user readable format.
  200. """
  201. from copy import deepcopy
  202. valid_chains = []
  203. explored_deps = {}
  204. msgs = []
  205. def chain_reorder(chain):
  206. """
  207. Reorder a dependency chain so the lowest task id is first
  208. """
  209. lowest = 0
  210. new_chain = []
  211. for entry in xrange(len(chain)):
  212. if chain[entry] < chain[lowest]:
  213. lowest = entry
  214. new_chain.extend(chain[lowest:])
  215. new_chain.extend(chain[:lowest])
  216. return new_chain
  217. def chain_compare_equal(chain1, chain2):
  218. """
  219. Compare two dependency chains and see if they're the same
  220. """
  221. if len(chain1) != len(chain2):
  222. return False
  223. for index in xrange(len(chain1)):
  224. if chain1[index] != chain2[index]:
  225. return False
  226. return True
  227. def chain_array_contains(chain, chain_array):
  228. """
  229. Return True if chain_array contains chain
  230. """
  231. for ch in chain_array:
  232. if chain_compare_equal(ch, chain):
  233. return True
  234. return False
  235. def find_chains(taskid, prev_chain):
  236. prev_chain.append(taskid)
  237. total_deps = []
  238. total_deps.extend(self.runq_revdeps[taskid])
  239. for revdep in self.runq_revdeps[taskid]:
  240. if revdep in prev_chain:
  241. idx = prev_chain.index(revdep)
  242. # To prevent duplicates, reorder the chain to start with the lowest taskid
  243. # and search through an array of those we've already printed
  244. chain = prev_chain[idx:]
  245. new_chain = chain_reorder(chain)
  246. if not chain_array_contains(new_chain, valid_chains):
  247. valid_chains.append(new_chain)
  248. msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
  249. for dep in new_chain:
  250. msgs.append(" Task %s (%s) (dependent Tasks %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends_names(dep)))
  251. msgs.append("\n")
  252. if len(valid_chains) > 10:
  253. msgs.append("Aborted dependency loops search after 10 matches.\n")
  254. return msgs
  255. continue
  256. scan = False
  257. if revdep not in explored_deps:
  258. scan = True
  259. elif revdep in explored_deps[revdep]:
  260. scan = True
  261. else:
  262. for dep in prev_chain:
  263. if dep in explored_deps[revdep]:
  264. scan = True
  265. if scan:
  266. find_chains(revdep, copy.deepcopy(prev_chain))
  267. for dep in explored_deps[revdep]:
  268. if dep not in total_deps:
  269. total_deps.append(dep)
  270. explored_deps[taskid] = total_deps
  271. for task in tasks:
  272. find_chains(task, [])
  273. return msgs
  274. def calculate_task_weights(self, endpoints):
  275. """
  276. Calculate a number representing the "weight" of each task. Heavier weighted tasks
  277. have more dependencies and hence should be executed sooner for maximum speed.
  278. This function also sanity checks the task list finding tasks that are not
  279. possible to execute due to circular dependencies.
  280. """
  281. numTasks = len(self.runq_fnid)
  282. weight = []
  283. deps_left = []
  284. task_done = []
  285. for listid in xrange(numTasks):
  286. task_done.append(False)
  287. weight.append(0)
  288. deps_left.append(len(self.runq_revdeps[listid]))
  289. for listid in endpoints:
  290. weight[listid] = 1
  291. task_done[listid] = True
  292. while True:
  293. next_points = []
  294. for listid in endpoints:
  295. for revdep in self.runq_depends[listid]:
  296. weight[revdep] = weight[revdep] + weight[listid]
  297. deps_left[revdep] = deps_left[revdep] - 1
  298. if deps_left[revdep] == 0:
  299. next_points.append(revdep)
  300. task_done[revdep] = True
  301. endpoints = next_points
  302. if len(next_points) == 0:
  303. break
  304. # Circular dependency sanity check
  305. problem_tasks = []
  306. for task in xrange(numTasks):
  307. if task_done[task] is False or deps_left[task] != 0:
  308. problem_tasks.append(task)
  309. logger.debug(2, "Task %s (%s) is not buildable", task, self.get_user_idstring(task))
  310. logger.debug(2, "(Complete marker was %s and the remaining dependency count was %s)\n", task_done[task], deps_left[task])
  311. if problem_tasks:
  312. message = "Unbuildable tasks were found.\n"
  313. message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
  314. message = message + "Identifying dependency loops (this may take a short while)...\n"
  315. logger.error(message)
  316. msgs = self.circular_depchains_handler(problem_tasks)
  317. message = "\n"
  318. for msg in msgs:
  319. message = message + msg
  320. bb.msg.fatal("RunQueue", message)
  321. return weight
  322. def prepare(self):
  323. """
  324. Turn a set of taskData into a RunQueue and compute data needed
  325. to optimise the execution order.
  326. """
  327. runq_build = []
  328. recursivetasks = {}
  329. recursivetasksselfref = set()
  330. taskData = self.taskData
  331. if len(taskData.tasks_name) == 0:
  332. # Nothing to do
  333. return 0
  334. logger.info("Preparing runqueue")
  335. # Step A - Work out a list of tasks to run
  336. #
  337. # Taskdata gives us a list of possible providers for every build and run
  338. # target ordered by priority. It also gives information on each of those
  339. # providers.
  340. #
  341. # To create the actual list of tasks to execute we fix the list of
  342. # providers and then resolve the dependencies into task IDs. This
  343. # process is repeated for each type of dependency (tdepends, deptask,
  344. # rdeptast, recrdeptask, idepends).
  345. def add_build_dependencies(depids, tasknames, depends):
  346. for depid in depids:
  347. # Won't be in build_targets if ASSUME_PROVIDED
  348. if depid not in taskData.build_targets:
  349. continue
  350. depdata = taskData.build_targets[depid][0]
  351. if depdata is None:
  352. continue
  353. for taskname in tasknames:
  354. taskid = taskData.gettask_id_fromfnid(depdata, taskname)
  355. if taskid is not None:
  356. depends.add(taskid)
  357. def add_runtime_dependencies(depids, tasknames, depends):
  358. for depid in depids:
  359. if depid not in taskData.run_targets:
  360. continue
  361. depdata = taskData.run_targets[depid][0]
  362. if depdata is None:
  363. continue
  364. for taskname in tasknames:
  365. taskid = taskData.gettask_id_fromfnid(depdata, taskname)
  366. if taskid is not None:
  367. depends.add(taskid)
  368. def add_resolved_dependencies(depids, tasknames, depends):
  369. for depid in depids:
  370. for taskname in tasknames:
  371. taskid = taskData.gettask_id_fromfnid(depid, taskname)
  372. if taskid is not None:
  373. depends.add(taskid)
  374. for task in xrange(len(taskData.tasks_name)):
  375. depends = set()
  376. fnid = taskData.tasks_fnid[task]
  377. fn = taskData.fn_index[fnid]
  378. task_deps = self.dataCache.task_deps[fn]
  379. logger.debug(2, "Processing %s:%s", fn, taskData.tasks_name[task])
  380. if fnid not in taskData.failed_fnids:
  381. # Resolve task internal dependencies
  382. #
  383. # e.g. addtask before X after Y
  384. depends = set(taskData.tasks_tdepends[task])
  385. # Resolve 'deptask' dependencies
  386. #
  387. # e.g. do_sometask[deptask] = "do_someothertask"
  388. # (makes sure sometask runs after someothertask of all DEPENDS)
  389. if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
  390. tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
  391. add_build_dependencies(taskData.depids[fnid], tasknames, depends)
  392. # Resolve 'rdeptask' dependencies
  393. #
  394. # e.g. do_sometask[rdeptask] = "do_someothertask"
  395. # (makes sure sometask runs after someothertask of all RDEPENDS)
  396. if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
  397. tasknames = task_deps['rdeptask'][taskData.tasks_name[task]].split()
  398. add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
  399. # Resolve inter-task dependencies
  400. #
  401. # e.g. do_sometask[depends] = "targetname:do_someothertask"
  402. # (makes sure sometask runs after targetname's someothertask)
  403. idepends = taskData.tasks_idepends[task]
  404. for (depid, idependtask) in idepends:
  405. if depid in taskData.build_targets and not depid in taskData.failed_deps:
  406. # Won't be in build_targets if ASSUME_PROVIDED
  407. depdata = taskData.build_targets[depid][0]
  408. if depdata is not None:
  409. taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
  410. if taskid is None:
  411. bb.msg.fatal("RunQueue", "Task %s in %s depends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
  412. depends.add(taskid)
  413. irdepends = taskData.tasks_irdepends[task]
  414. for (depid, idependtask) in irdepends:
  415. if depid in taskData.run_targets:
  416. # Won't be in run_targets if ASSUME_PROVIDED
  417. depdata = taskData.run_targets[depid][0]
  418. if depdata is not None:
  419. taskid = taskData.gettask_id_fromfnid(depdata, idependtask)
  420. if taskid is None:
  421. bb.msg.fatal("RunQueue", "Task %s in %s rdepends upon non-existent task %s in %s" % (taskData.tasks_name[task], fn, idependtask, taskData.fn_index[depdata]))
  422. depends.add(taskid)
  423. # Resolve recursive 'recrdeptask' dependencies (Part A)
  424. #
  425. # e.g. do_sometask[recrdeptask] = "do_someothertask"
  426. # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
  427. # We cover the recursive part of the dependencies below
  428. if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
  429. tasknames = task_deps['recrdeptask'][taskData.tasks_name[task]].split()
  430. recursivetasks[task] = tasknames
  431. add_build_dependencies(taskData.depids[fnid], tasknames, depends)
  432. add_runtime_dependencies(taskData.rdepids[fnid], tasknames, depends)
  433. if taskData.tasks_name[task] in tasknames:
  434. recursivetasksselfref.add(task)
  435. self.runq_fnid.append(taskData.tasks_fnid[task])
  436. self.runq_task.append(taskData.tasks_name[task])
  437. self.runq_depends.append(depends)
  438. self.runq_revdeps.append(set())
  439. self.runq_hash.append("")
  440. runq_build.append(0)
  441. # Resolve recursive 'recrdeptask' dependencies (Part B)
  442. #
  443. # e.g. do_sometask[recrdeptask] = "do_someothertask"
  444. # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
  445. # We need to do this separately since we need all of self.runq_depends to be complete before this is processed
  446. extradeps = {}
  447. for task in recursivetasks:
  448. extradeps[task] = set(self.runq_depends[task])
  449. tasknames = recursivetasks[task]
  450. seendeps = set()
  451. seenfnid = []
  452. def generate_recdeps(t):
  453. newdeps = set()
  454. add_resolved_dependencies([taskData.tasks_fnid[t]], tasknames, newdeps)
  455. extradeps[task].update(newdeps)
  456. seendeps.add(t)
  457. newdeps.add(t)
  458. for i in newdeps:
  459. for n in self.runq_depends[i]:
  460. if n not in seendeps:
  461. generate_recdeps(n)
  462. generate_recdeps(task)
  463. # Remove circular references so that do_a[recrdeptask] = "do_a do_b" can work
  464. for task in recursivetasks:
  465. extradeps[task].difference_update(recursivetasksselfref)
  466. for task in xrange(len(taskData.tasks_name)):
  467. # Add in extra dependencies
  468. if task in extradeps:
  469. self.runq_depends[task] = extradeps[task]
  470. # Remove all self references
  471. if task in self.runq_depends[task]:
  472. logger.debug(2, "Task %s (%s %s) contains self reference! %s", task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], self.runq_depends[task])
  473. self.runq_depends[task].remove(task)
  474. # Step B - Mark all active tasks
  475. #
  476. # Start with the tasks we were asked to run and mark all dependencies
  477. # as active too. If the task is to be 'forced', clear its stamp. Once
  478. # all active tasks are marked, prune the ones we don't need.
  479. logger.verbose("Marking Active Tasks")
  480. def mark_active(listid, depth):
  481. """
  482. Mark an item as active along with its depends
  483. (calls itself recursively)
  484. """
  485. if runq_build[listid] == 1:
  486. return
  487. runq_build[listid] = 1
  488. depends = self.runq_depends[listid]
  489. for depend in depends:
  490. mark_active(depend, depth+1)
  491. self.target_pairs = []
  492. for target in self.targets:
  493. targetid = taskData.getbuild_id(target[0])
  494. if targetid not in taskData.build_targets:
  495. continue
  496. if targetid in taskData.failed_deps:
  497. continue
  498. fnid = taskData.build_targets[targetid][0]
  499. fn = taskData.fn_index[fnid]
  500. self.target_pairs.append((fn, target[1]))
  501. if fnid in taskData.failed_fnids:
  502. continue
  503. if target[1] not in taskData.tasks_lookup[fnid]:
  504. bb.msg.fatal("RunQueue", "Task %s does not exist for target %s" % (target[1], target[0]))
  505. listid = taskData.tasks_lookup[fnid][target[1]]
  506. mark_active(listid, 1)
  507. # Step C - Prune all inactive tasks
  508. #
  509. # Once all active tasks are marked, prune the ones we don't need.
  510. maps = []
  511. delcount = 0
  512. for listid in xrange(len(self.runq_fnid)):
  513. if runq_build[listid-delcount] == 1:
  514. maps.append(listid-delcount)
  515. else:
  516. del self.runq_fnid[listid-delcount]
  517. del self.runq_task[listid-delcount]
  518. del self.runq_depends[listid-delcount]
  519. del runq_build[listid-delcount]
  520. del self.runq_revdeps[listid-delcount]
  521. del self.runq_hash[listid-delcount]
  522. delcount = delcount + 1
  523. maps.append(-1)
  524. #
  525. # Step D - Sanity checks and computation
  526. #
  527. # Check to make sure we still have tasks to run
  528. if len(self.runq_fnid) == 0:
  529. if not taskData.abort:
  530. bb.msg.fatal("RunQueue", "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
  531. else:
  532. bb.msg.fatal("RunQueue", "No active tasks and not in --continue mode?! Please report this bug.")
  533. logger.verbose("Pruned %s inactive tasks, %s left", delcount, len(self.runq_fnid))
  534. # Remap the dependencies to account for the deleted tasks
  535. # Check we didn't delete a task we depend on
  536. for listid in xrange(len(self.runq_fnid)):
  537. newdeps = []
  538. origdeps = self.runq_depends[listid]
  539. for origdep in origdeps:
  540. if maps[origdep] == -1:
  541. bb.msg.fatal("RunQueue", "Invalid mapping - Should never happen!")
  542. newdeps.append(maps[origdep])
  543. self.runq_depends[listid] = set(newdeps)
  544. logger.verbose("Assign Weightings")
  545. # Generate a list of reverse dependencies to ease future calculations
  546. for listid in xrange(len(self.runq_fnid)):
  547. for dep in self.runq_depends[listid]:
  548. self.runq_revdeps[dep].add(listid)
  549. # Identify tasks at the end of dependency chains
  550. # Error on circular dependency loops (length two)
  551. endpoints = []
  552. for listid in xrange(len(self.runq_fnid)):
  553. revdeps = self.runq_revdeps[listid]
  554. if len(revdeps) == 0:
  555. endpoints.append(listid)
  556. for dep in revdeps:
  557. if dep in self.runq_depends[listid]:
  558. #self.dump_data(taskData)
  559. bb.msg.fatal("RunQueue", "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep], taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
  560. logger.verbose("Compute totals (have %s endpoint(s))", len(endpoints))
  561. # Calculate task weights
  562. # Check of higher length circular dependencies
  563. self.runq_weight = self.calculate_task_weights(endpoints)
  564. # Sanity Check - Check for multiple tasks building the same provider
  565. prov_list = {}
  566. seen_fn = []
  567. for task in xrange(len(self.runq_fnid)):
  568. fn = taskData.fn_index[self.runq_fnid[task]]
  569. if fn in seen_fn:
  570. continue
  571. seen_fn.append(fn)
  572. for prov in self.dataCache.fn_provides[fn]:
  573. if prov not in prov_list:
  574. prov_list[prov] = [fn]
  575. elif fn not in prov_list[prov]:
  576. prov_list[prov].append(fn)
  577. for prov in prov_list:
  578. if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
  579. msg = "Multiple .bb files are due to be built which each provide %s (%s)." % (prov, " ".join(prov_list[prov]))
  580. if self.warn_multi_bb:
  581. logger.warn(msg)
  582. else:
  583. msg += "\n This usually means one provides something the other doesn't and should."
  584. logger.error(msg)
  585. # Create a whitelist usable by the stamp checks
  586. stampfnwhitelist = []
  587. for entry in self.stampwhitelist.split():
  588. entryid = self.taskData.getbuild_id(entry)
  589. if entryid not in self.taskData.build_targets:
  590. continue
  591. fnid = self.taskData.build_targets[entryid][0]
  592. fn = self.taskData.fn_index[fnid]
  593. stampfnwhitelist.append(fn)
  594. self.stampfnwhitelist = stampfnwhitelist
  595. # Iterate over the task list looking for tasks with a 'setscene' function
  596. self.runq_setscene = []
  597. if not self.cooker.configuration.nosetscene:
  598. for task in range(len(self.runq_fnid)):
  599. setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
  600. if not setscene:
  601. continue
  602. self.runq_setscene.append(task)
  603. def invalidate_task(fn, taskname, error_nostamp):
  604. taskdep = self.dataCache.task_deps[fn]
  605. if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
  606. if error_nostamp:
  607. bb.fatal("Task %s is marked nostamp, cannot invalidate this task" % taskname)
  608. else:
  609. bb.debug(1, "Task %s is marked nostamp, cannot invalidate this task" % taskname)
  610. else:
  611. logger.verbose("Invalidate task %s, %s", taskname, fn)
  612. bb.parse.siggen.invalidate_task(taskname, self.dataCache, fn)
  613. # Invalidate task if force mode active
  614. if self.cooker.configuration.force:
  615. for (fn, target) in self.target_pairs:
  616. invalidate_task(fn, target, False)
  617. # Invalidate task if invalidate mode active
  618. if self.cooker.configuration.invalidate_stamp:
  619. for (fn, target) in self.target_pairs:
  620. for st in self.cooker.configuration.invalidate_stamp.split(','):
  621. invalidate_task(fn, "do_%s" % st, True)
  622. # Interate over the task list and call into the siggen code
  623. dealtwith = set()
  624. todeal = set(range(len(self.runq_fnid)))
  625. while len(todeal) > 0:
  626. for task in todeal.copy():
  627. if len(self.runq_depends[task] - dealtwith) == 0:
  628. dealtwith.add(task)
  629. todeal.remove(task)
  630. procdep = []
  631. for dep in self.runq_depends[task]:
  632. procdep.append(self.taskData.fn_index[self.runq_fnid[dep]] + "." + self.runq_task[dep])
  633. self.runq_hash[task] = bb.parse.siggen.get_taskhash(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task], procdep, self.dataCache)
  634. self.hashes = {}
  635. self.hash_deps = {}
  636. for task in xrange(len(self.runq_fnid)):
  637. identifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[task]],
  638. self.runq_task[task])
  639. self.hashes[identifier] = self.runq_hash[task]
  640. deps = []
  641. for dep in self.runq_depends[task]:
  642. depidentifier = '%s.%s' % (self.taskData.fn_index[self.runq_fnid[dep]],
  643. self.runq_task[dep])
  644. deps.append(depidentifier)
  645. self.hash_deps[identifier] = deps
  646. return len(self.runq_fnid)
  647. def dump_data(self, taskQueue):
  648. """
  649. Dump some debug information on the internal data structures
  650. """
  651. logger.debug(3, "run_tasks:")
  652. for task in xrange(len(self.rqdata.runq_task)):
  653. logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
  654. taskQueue.fn_index[self.rqdata.runq_fnid[task]],
  655. self.rqdata.runq_task[task],
  656. self.rqdata.runq_weight[task],
  657. self.rqdata.runq_depends[task],
  658. self.rqdata.runq_revdeps[task])
  659. logger.debug(3, "sorted_tasks:")
  660. for task1 in xrange(len(self.rqdata.runq_task)):
  661. if task1 in self.prio_map:
  662. task = self.prio_map[task1]
  663. logger.debug(3, " (%s)%s - %s: %s Deps %s RevDeps %s", task,
  664. taskQueue.fn_index[self.rqdata.runq_fnid[task]],
  665. self.rqdata.runq_task[task],
  666. self.rqdata.runq_weight[task],
  667. self.rqdata.runq_depends[task],
  668. self.rqdata.runq_revdeps[task])
  669. class RunQueue:
  670. def __init__(self, cooker, cfgData, dataCache, taskData, targets):
  671. self.cooker = cooker
  672. self.cfgData = cfgData
  673. self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
  674. self.stamppolicy = cfgData.getVar("BB_STAMP_POLICY", True) or "perfile"
  675. self.hashvalidate = cfgData.getVar("BB_HASHCHECK_FUNCTION", True) or None
  676. self.setsceneverify = cfgData.getVar("BB_SETSCENE_VERIFY_FUNCTION", True) or None
  677. self.depvalidate = cfgData.getVar("BB_SETSCENE_DEPVALID", True) or None
  678. self.state = runQueuePrepare
  679. # For disk space monitor
  680. self.dm = monitordisk.diskMonitor(cfgData)
  681. self.rqexe = None
  682. self.worker = None
  683. self.workerpipe = None
  684. self.fakeworker = None
  685. self.fakeworkerpipe = None
  686. def _start_worker(self, fakeroot = False, rqexec = None):
  687. logger.debug(1, "Starting bitbake-worker")
  688. if fakeroot:
  689. fakerootcmd = self.cfgData.getVar("FAKEROOTCMD", True)
  690. fakerootenv = (self.cfgData.getVar("FAKEROOTBASEENV", True) or "").split()
  691. env = os.environ.copy()
  692. for key, value in (var.split('=') for var in fakerootenv):
  693. env[key] = value
  694. worker = subprocess.Popen([fakerootcmd, "bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE, env=env)
  695. else:
  696. worker = subprocess.Popen(["bitbake-worker", "decafbad"], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
  697. bb.utils.nonblockingfd(worker.stdout)
  698. workerpipe = runQueuePipe(worker.stdout, None, self.cfgData, rqexec)
  699. workerdata = {
  700. "taskdeps" : self.rqdata.dataCache.task_deps,
  701. "fakerootenv" : self.rqdata.dataCache.fakerootenv,
  702. "fakerootdirs" : self.rqdata.dataCache.fakerootdirs,
  703. "fakerootnoenv" : self.rqdata.dataCache.fakerootnoenv,
  704. "hashes" : self.rqdata.hashes,
  705. "hash_deps" : self.rqdata.hash_deps,
  706. "sigchecksums" : bb.parse.siggen.file_checksum_values,
  707. "runq_hash" : self.rqdata.runq_hash,
  708. "logdefaultdebug" : bb.msg.loggerDefaultDebugLevel,
  709. "logdefaultverbose" : bb.msg.loggerDefaultVerbose,
  710. "logdefaultverboselogs" : bb.msg.loggerVerboseLogs,
  711. "logdefaultdomain" : bb.msg.loggerDefaultDomains,
  712. "prhost" : self.cooker.prhost,
  713. }
  714. worker.stdin.write("<cookerconfig>" + pickle.dumps(self.cooker.configuration) + "</cookerconfig>")
  715. worker.stdin.write("<workerdata>" + pickle.dumps(workerdata) + "</workerdata>")
  716. worker.stdin.flush()
  717. return worker, workerpipe
  718. def _teardown_worker(self, worker, workerpipe):
  719. if not worker:
  720. return
  721. logger.debug(1, "Teardown for bitbake-worker")
  722. worker.stdin.write("<quit></quit>")
  723. worker.stdin.flush()
  724. while worker.returncode is None:
  725. workerpipe.read()
  726. worker.poll()
  727. while workerpipe.read():
  728. continue
  729. workerpipe.close()
  730. def start_worker(self):
  731. if self.worker:
  732. self.teardown_workers()
  733. self.worker, self.workerpipe = self._start_worker()
  734. def start_fakeworker(self, rqexec):
  735. if not self.fakeworker:
  736. self.fakeworker, self.fakeworkerpipe = self._start_worker(True, rqexec)
  737. def teardown_workers(self):
  738. self._teardown_worker(self.worker, self.workerpipe)
  739. self.worker = None
  740. self.workerpipe = None
  741. self._teardown_worker(self.fakeworker, self.fakeworkerpipe)
  742. self.fakeworker = None
  743. self.fakeworkerpipe = None
  744. def read_workers(self):
  745. self.workerpipe.read()
  746. if self.fakeworkerpipe:
  747. self.fakeworkerpipe.read()
  748. def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
  749. def get_timestamp(f):
  750. try:
  751. if not os.access(f, os.F_OK):
  752. return None
  753. return os.stat(f)[stat.ST_MTIME]
  754. except:
  755. return None
  756. if self.stamppolicy == "perfile":
  757. fulldeptree = False
  758. else:
  759. fulldeptree = True
  760. stampwhitelist = []
  761. if self.stamppolicy == "whitelist":
  762. stampwhitelist = self.rqdata.stampfnwhitelist
  763. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
  764. if taskname is None:
  765. taskname = self.rqdata.runq_task[task]
  766. stampfile = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
  767. # If the stamp is missing its not current
  768. if not os.access(stampfile, os.F_OK):
  769. logger.debug(2, "Stampfile %s not available", stampfile)
  770. return False
  771. # If its a 'nostamp' task, it's not current
  772. taskdep = self.rqdata.dataCache.task_deps[fn]
  773. if 'nostamp' in taskdep and taskname in taskdep['nostamp']:
  774. logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
  775. return False
  776. if taskname != "do_setscene" and taskname.endswith("_setscene"):
  777. return True
  778. if cache is None:
  779. cache = {}
  780. iscurrent = True
  781. t1 = get_timestamp(stampfile)
  782. for dep in self.rqdata.runq_depends[task]:
  783. if iscurrent:
  784. fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
  785. taskname2 = self.rqdata.runq_task[dep]
  786. stampfile2 = bb.build.stampfile(taskname2, self.rqdata.dataCache, fn2)
  787. stampfile3 = bb.build.stampfile(taskname2 + "_setscene", self.rqdata.dataCache, fn2)
  788. t2 = get_timestamp(stampfile2)
  789. t3 = get_timestamp(stampfile3)
  790. if t3 and t3 > t2:
  791. continue
  792. if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
  793. if not t2:
  794. logger.debug(2, 'Stampfile %s does not exist', stampfile2)
  795. iscurrent = False
  796. if t1 < t2:
  797. logger.debug(2, 'Stampfile %s < %s', stampfile, stampfile2)
  798. iscurrent = False
  799. if recurse and iscurrent:
  800. if dep in cache:
  801. iscurrent = cache[dep]
  802. if not iscurrent:
  803. logger.debug(2, 'Stampfile for dependency %s:%s invalid (cached)' % (fn2, taskname2))
  804. else:
  805. iscurrent = self.check_stamp_task(dep, recurse=True, cache=cache)
  806. cache[dep] = iscurrent
  807. if recurse:
  808. cache[task] = iscurrent
  809. return iscurrent
  810. def _execute_runqueue(self):
  811. """
  812. Run the tasks in a queue prepared by rqdata.prepare()
  813. Upon failure, optionally try to recover the build using any alternate providers
  814. (if the abort on failure configuration option isn't set)
  815. """
  816. retval = 0.5
  817. if self.state is runQueuePrepare:
  818. self.rqexe = RunQueueExecuteDummy(self)
  819. if self.rqdata.prepare() == 0:
  820. self.state = runQueueComplete
  821. else:
  822. self.state = runQueueSceneInit
  823. if self.state is runQueueSceneInit:
  824. if self.cooker.configuration.dump_signatures:
  825. self.dump_signatures()
  826. else:
  827. self.start_worker()
  828. self.rqexe = RunQueueExecuteScenequeue(self)
  829. if self.state in [runQueueSceneRun, runQueueRunning, runQueueCleanUp]:
  830. self.dm.check(self)
  831. if self.state is runQueueSceneRun:
  832. retval = self.rqexe.execute()
  833. if self.state is runQueueRunInit:
  834. logger.info("Executing RunQueue Tasks")
  835. self.rqexe = RunQueueExecuteTasks(self)
  836. self.state = runQueueRunning
  837. if self.state is runQueueRunning:
  838. retval = self.rqexe.execute()
  839. if self.state is runQueueCleanUp:
  840. self.rqexe.finish()
  841. if self.state is runQueueComplete or self.state is runQueueFailed:
  842. self.teardown_workers()
  843. if self.rqexe.stats.failed:
  844. logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed + self.rqexe.stats.failed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
  845. else:
  846. # Let's avoid the word "failed" if nothing actually did
  847. logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and all succeeded.", self.rqexe.stats.completed, self.rqexe.stats.skipped)
  848. if self.state is runQueueFailed:
  849. if not self.rqdata.taskData.tryaltconfigs:
  850. raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
  851. for fnid in self.rqexe.failed_fnids:
  852. self.rqdata.taskData.fail_fnid(fnid)
  853. self.rqdata.reset()
  854. if self.state is runQueueComplete:
  855. # All done
  856. return False
  857. # Loop
  858. return retval
  859. def execute_runqueue(self):
  860. # Catch unexpected exceptions and ensure we exit when an error occurs, not loop.
  861. try:
  862. return self._execute_runqueue()
  863. except bb.runqueue.TaskFailure:
  864. raise
  865. except SystemExit:
  866. raise
  867. except:
  868. logger.error("An uncaught exception occured in runqueue, please see the failure below:")
  869. try:
  870. self.teardown_workers()
  871. except:
  872. pass
  873. self.state = runQueueComplete
  874. raise
  875. def finish_runqueue(self, now = False):
  876. if not self.rqexe:
  877. return
  878. if now:
  879. self.rqexe.finish_now()
  880. else:
  881. self.rqexe.finish()
  882. def dump_signatures(self):
  883. self.state = runQueueComplete
  884. done = set()
  885. bb.note("Reparsing files to collect dependency data")
  886. for task in range(len(self.rqdata.runq_fnid)):
  887. if self.rqdata.runq_fnid[task] not in done:
  888. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
  889. the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.collection.get_file_appends(fn), self.cooker.data)
  890. done.add(self.rqdata.runq_fnid[task])
  891. bb.parse.siggen.dump_sigs(self.rqdata.dataCache)
  892. return
  893. class RunQueueExecute:
  894. def __init__(self, rq):
  895. self.rq = rq
  896. self.cooker = rq.cooker
  897. self.cfgData = rq.cfgData
  898. self.rqdata = rq.rqdata
  899. self.number_tasks = int(self.cfgData.getVar("BB_NUMBER_THREADS", True) or 1)
  900. self.scheduler = self.cfgData.getVar("BB_SCHEDULER", True) or "speed"
  901. self.runq_buildable = []
  902. self.runq_running = []
  903. self.runq_complete = []
  904. self.build_stamps = {}
  905. self.failed_fnids = []
  906. self.stampcache = {}
  907. rq.workerpipe.setrunqueueexec(self)
  908. if rq.fakeworkerpipe:
  909. rq.fakeworkerpipe.setrunqueueexec(self)
  910. def runqueue_process_waitpid(self, task, status):
  911. # self.build_stamps[pid] may not exist when use shared work directory.
  912. if task in self.build_stamps:
  913. del self.build_stamps[task]
  914. if status != 0:
  915. self.task_fail(task, status)
  916. else:
  917. self.task_complete(task)
  918. return True
  919. def finish_now(self):
  920. self.rq.worker.stdin.write("<finishnow></finishnow>")
  921. self.rq.worker.stdin.flush()
  922. if self.rq.fakeworker:
  923. self.rq.fakeworker.stdin.write("<finishnow></finishnow>")
  924. self.rq.fakeworker.stdin.flush()
  925. if len(self.failed_fnids) != 0:
  926. self.rq.state = runQueueFailed
  927. return
  928. self.rq.state = runQueueComplete
  929. return
  930. def finish(self):
  931. self.rq.state = runQueueCleanUp
  932. if self.stats.active > 0:
  933. bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
  934. self.rq.read_workers()
  935. return
  936. if len(self.failed_fnids) != 0:
  937. self.rq.state = runQueueFailed
  938. return
  939. self.rq.state = runQueueComplete
  940. return
  941. def check_dependencies(self, task, taskdeps, setscene = False):
  942. if not self.rq.depvalidate:
  943. return False
  944. taskdata = {}
  945. taskdeps.add(task)
  946. for dep in taskdeps:
  947. if setscene:
  948. depid = self.rqdata.runq_setscene[dep]
  949. else:
  950. depid = dep
  951. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[depid]]
  952. pn = self.rqdata.dataCache.pkg_fn[fn]
  953. taskname = self.rqdata.runq_task[depid]
  954. taskdata[dep] = [pn, taskname, fn]
  955. call = self.rq.depvalidate + "(task, taskdata, notneeded, d)"
  956. locs = { "task" : task, "taskdata" : taskdata, "notneeded" : self.scenequeue_notneeded, "d" : self.cooker.data }
  957. valid = bb.utils.better_eval(call, locs)
  958. return valid
  959. class RunQueueExecuteDummy(RunQueueExecute):
  960. def __init__(self, rq):
  961. self.rq = rq
  962. self.stats = RunQueueStats(0)
  963. def finish(self):
  964. self.rq.state = runQueueComplete
  965. return
  966. class RunQueueExecuteTasks(RunQueueExecute):
  967. def __init__(self, rq):
  968. RunQueueExecute.__init__(self, rq)
  969. self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
  970. self.stampcache = {}
  971. # Mark initial buildable tasks
  972. for task in xrange(self.stats.total):
  973. self.runq_running.append(0)
  974. self.runq_complete.append(0)
  975. if len(self.rqdata.runq_depends[task]) == 0:
  976. self.runq_buildable.append(1)
  977. else:
  978. self.runq_buildable.append(0)
  979. if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
  980. self.rq.scenequeue_covered.add(task)
  981. found = True
  982. while found:
  983. found = False
  984. for task in xrange(self.stats.total):
  985. if task in self.rq.scenequeue_covered:
  986. continue
  987. logger.debug(1, 'Considering %s (%s): %s' % (task, self.rqdata.get_user_idstring(task), str(self.rqdata.runq_revdeps[task])))
  988. if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered) and task not in self.rq.scenequeue_notcovered:
  989. found = True
  990. self.rq.scenequeue_covered.add(task)
  991. logger.debug(1, 'Skip list (pre setsceneverify) %s', sorted(self.rq.scenequeue_covered))
  992. # Allow the metadata to elect for setscene tasks to run anyway
  993. covered_remove = set()
  994. if self.rq.setsceneverify:
  995. invalidtasks = []
  996. for task in xrange(len(self.rqdata.runq_task)):
  997. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
  998. taskname = self.rqdata.runq_task[task]
  999. taskdep = self.rqdata.dataCache.task_deps[fn]
  1000. if 'noexec' in taskdep and taskname in taskdep['noexec']:
  1001. continue
  1002. if self.rq.check_stamp_task(task, taskname + "_setscene", cache=self.stampcache):
  1003. logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
  1004. continue
  1005. if self.rq.check_stamp_task(task, taskname, recurse = True, cache=self.stampcache):
  1006. logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(task))
  1007. continue
  1008. invalidtasks.append(task)
  1009. call = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d, invalidtasks=invalidtasks)"
  1010. call2 = self.rq.setsceneverify + "(covered, tasknames, fnids, fns, d)"
  1011. locs = { "covered" : self.rq.scenequeue_covered, "tasknames" : self.rqdata.runq_task, "fnids" : self.rqdata.runq_fnid, "fns" : self.rqdata.taskData.fn_index, "d" : self.cooker.data, "invalidtasks" : invalidtasks }
  1012. # Backwards compatibility with older versions without invalidtasks
  1013. try:
  1014. covered_remove = bb.utils.better_eval(call, locs)
  1015. except TypeError:
  1016. covered_remove = bb.utils.better_eval(call2, locs)
  1017. for task in covered_remove:
  1018. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
  1019. taskname = self.rqdata.runq_task[task] + '_setscene'
  1020. bb.build.del_stamp(taskname, self.rqdata.dataCache, fn)
  1021. logger.debug(1, 'Not skipping task %s due to setsceneverify', task)
  1022. self.rq.scenequeue_covered.remove(task)
  1023. logger.debug(1, 'Full skip list %s', self.rq.scenequeue_covered)
  1024. event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
  1025. schedulers = self.get_schedulers()
  1026. for scheduler in schedulers:
  1027. if self.scheduler == scheduler.name:
  1028. self.sched = scheduler(self, self.rqdata)
  1029. logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
  1030. break
  1031. else:
  1032. bb.fatal("Invalid scheduler '%s'. Available schedulers: %s" %
  1033. (self.scheduler, ", ".join(obj.name for obj in schedulers)))
  1034. def get_schedulers(self):
  1035. schedulers = set(obj for obj in globals().values()
  1036. if type(obj) is type and
  1037. issubclass(obj, RunQueueScheduler))
  1038. user_schedulers = self.cfgData.getVar("BB_SCHEDULERS", True)
  1039. if user_schedulers:
  1040. for sched in user_schedulers.split():
  1041. if not "." in sched:
  1042. bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
  1043. continue
  1044. modname, name = sched.rsplit(".", 1)
  1045. try:
  1046. module = __import__(modname, fromlist=(name,))
  1047. except ImportError as exc:
  1048. logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
  1049. raise SystemExit(1)
  1050. else:
  1051. schedulers.add(getattr(module, name))
  1052. return schedulers
  1053. def task_completeoutright(self, task):
  1054. """
  1055. Mark a task as completed
  1056. Look at the reverse dependencies and mark any task with
  1057. completed dependencies as buildable
  1058. """
  1059. self.runq_complete[task] = 1
  1060. for revdep in self.rqdata.runq_revdeps[task]:
  1061. if self.runq_running[revdep] == 1:
  1062. continue
  1063. if self.runq_buildable[revdep] == 1:
  1064. continue
  1065. alldeps = 1
  1066. for dep in self.rqdata.runq_depends[revdep]:
  1067. if self.runq_complete[dep] != 1:
  1068. alldeps = 0
  1069. if alldeps == 1:
  1070. self.runq_buildable[revdep] = 1
  1071. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[revdep]]
  1072. taskname = self.rqdata.runq_task[revdep]
  1073. logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
  1074. def task_complete(self, task):
  1075. self.stats.taskCompleted()
  1076. bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
  1077. self.task_completeoutright(task)
  1078. def task_fail(self, task, exitcode):
  1079. """
  1080. Called when a task has failed
  1081. Updates the state engine with the failure
  1082. """
  1083. self.stats.taskFailed()
  1084. fnid = self.rqdata.runq_fnid[task]
  1085. self.failed_fnids.append(fnid)
  1086. bb.event.fire(runQueueTaskFailed(task, self.stats, exitcode, self.rq), self.cfgData)
  1087. if self.rqdata.taskData.abort:
  1088. self.rq.state = runQueueCleanUp
  1089. def task_skip(self, task):
  1090. self.runq_running[task] = 1
  1091. self.runq_buildable[task] = 1
  1092. self.task_completeoutright(task)
  1093. self.stats.taskCompleted()
  1094. self.stats.taskSkipped()
  1095. def execute(self):
  1096. """
  1097. Run the tasks in a queue prepared by rqdata.prepare()
  1098. """
  1099. self.rq.read_workers()
  1100. if self.stats.total == 0:
  1101. # nothing to do
  1102. self.rq.state = runQueueCleanUp
  1103. task = self.sched.next()
  1104. if task is not None:
  1105. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
  1106. taskname = self.rqdata.runq_task[task]
  1107. if task in self.rq.scenequeue_covered:
  1108. logger.debug(2, "Setscene covered task %s (%s)", task,
  1109. self.rqdata.get_user_idstring(task))
  1110. self.task_skip(task)
  1111. return True
  1112. if self.rq.check_stamp_task(task, taskname, cache=self.stampcache):
  1113. logger.debug(2, "Stamp current task %s (%s)", task,
  1114. self.rqdata.get_user_idstring(task))
  1115. self.task_skip(task)
  1116. return True
  1117. taskdep = self.rqdata.dataCache.task_deps[fn]
  1118. if 'noexec' in taskdep and taskname in taskdep['noexec']:
  1119. startevent = runQueueTaskStarted(task, self.stats, self.rq,
  1120. noexec=True)
  1121. bb.event.fire(startevent, self.cfgData)
  1122. self.runq_running[task] = 1
  1123. self.stats.taskActive()
  1124. bb.build.make_stamp(taskname, self.rqdata.dataCache, fn)
  1125. self.task_complete(task)
  1126. return True
  1127. else:
  1128. startevent = runQueueTaskStarted(task, self.stats, self.rq)
  1129. bb.event.fire(startevent, self.cfgData)
  1130. taskdep = self.rqdata.dataCache.task_deps[fn]
  1131. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
  1132. if not self.rq.fakeworker:
  1133. self.rq.start_fakeworker(self)
  1134. self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
  1135. self.rq.fakeworker.stdin.flush()
  1136. else:
  1137. self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, task, taskname, False, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
  1138. self.rq.worker.stdin.flush()
  1139. self.build_stamps[task] = bb.build.stampfile(taskname, self.rqdata.dataCache, fn)
  1140. self.runq_running[task] = 1
  1141. self.stats.taskActive()
  1142. if self.stats.active < self.number_tasks:
  1143. return True
  1144. if self.stats.active > 0:
  1145. self.rq.read_workers()
  1146. return 0.5
  1147. if len(self.failed_fnids) != 0:
  1148. self.rq.state = runQueueFailed
  1149. return True
  1150. # Sanity Checks
  1151. for task in xrange(self.stats.total):
  1152. if self.runq_buildable[task] == 0:
  1153. logger.error("Task %s never buildable!", task)
  1154. if self.runq_running[task] == 0:
  1155. logger.error("Task %s never ran!", task)
  1156. if self.runq_complete[task] == 0:
  1157. logger.error("Task %s never completed!", task)
  1158. self.rq.state = runQueueComplete
  1159. return True
  1160. class RunQueueExecuteScenequeue(RunQueueExecute):
  1161. def __init__(self, rq):
  1162. RunQueueExecute.__init__(self, rq)
  1163. self.scenequeue_covered = set()
  1164. self.scenequeue_notcovered = set()
  1165. self.scenequeue_notneeded = set()
  1166. # If we don't have any setscene functions, skip this step
  1167. if len(self.rqdata.runq_setscene) == 0:
  1168. rq.scenequeue_covered = set()
  1169. rq.state = runQueueRunInit
  1170. return
  1171. self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
  1172. sq_revdeps = []
  1173. sq_revdeps_new = []
  1174. sq_revdeps_squash = []
  1175. # We need to construct a dependency graph for the setscene functions. Intermediate
  1176. # dependencies between the setscene tasks only complicate the code. This code
  1177. # therefore aims to collapse the huge runqueue dependency tree into a smaller one
  1178. # only containing the setscene functions.
  1179. for task in xrange(self.stats.total):
  1180. self.runq_running.append(0)
  1181. self.runq_complete.append(0)
  1182. self.runq_buildable.append(0)
  1183. # First process the chains up to the first setscene task.
  1184. endpoints = {}
  1185. for task in xrange(len(self.rqdata.runq_fnid)):
  1186. sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
  1187. sq_revdeps_new.append(set())
  1188. if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
  1189. endpoints[task] = set()
  1190. # Secondly process the chains between setscene tasks.
  1191. for task in self.rqdata.runq_setscene:
  1192. for dep in self.rqdata.runq_depends[task]:
  1193. if dep not in endpoints:
  1194. endpoints[dep] = set()
  1195. endpoints[dep].add(task)
  1196. def process_endpoints(endpoints):
  1197. newendpoints = {}
  1198. for point, task in endpoints.items():
  1199. tasks = set()
  1200. if task:
  1201. tasks |= task
  1202. if sq_revdeps_new[point]:
  1203. tasks |= sq_revdeps_new[point]
  1204. sq_revdeps_new[point] = set()
  1205. if point in self.rqdata.runq_setscene:
  1206. sq_revdeps_new[point] = tasks
  1207. for dep in self.rqdata.runq_depends[point]:
  1208. if point in sq_revdeps[dep]:
  1209. sq_revdeps[dep].remove(point)
  1210. if tasks:
  1211. sq_revdeps_new[dep] |= tasks
  1212. if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
  1213. newendpoints[dep] = task
  1214. if len(newendpoints) != 0:
  1215. process_endpoints(newendpoints)
  1216. process_endpoints(endpoints)
  1217. # Build a list of setscene tasks which as "unskippable"
  1218. # These are direct endpoints referenced by the build
  1219. endpoints2 = {}
  1220. sq_revdeps2 = []
  1221. sq_revdeps_new2 = []
  1222. def process_endpoints2(endpoints):
  1223. newendpoints = {}
  1224. for point, task in endpoints.items():
  1225. tasks = set([point])
  1226. if task:
  1227. tasks |= task
  1228. if sq_revdeps_new2[point]:
  1229. tasks |= sq_revdeps_new2[point]
  1230. sq_revdeps_new2[point] = set()
  1231. if point in self.rqdata.runq_setscene:
  1232. sq_revdeps_new2[point] = tasks
  1233. for dep in self.rqdata.runq_depends[point]:
  1234. if point in sq_revdeps2[dep]:
  1235. sq_revdeps2[dep].remove(point)
  1236. if tasks:
  1237. sq_revdeps_new2[dep] |= tasks
  1238. if (len(sq_revdeps2[dep]) == 0 or len(sq_revdeps_new2[dep]) != 0) and dep not in self.rqdata.runq_setscene:
  1239. newendpoints[dep] = tasks
  1240. if len(newendpoints) != 0:
  1241. process_endpoints2(newendpoints)
  1242. for task in xrange(len(self.rqdata.runq_fnid)):
  1243. sq_revdeps2.append(copy.copy(self.rqdata.runq_revdeps[task]))
  1244. sq_revdeps_new2.append(set())
  1245. if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
  1246. endpoints2[task] = set()
  1247. process_endpoints2(endpoints2)
  1248. self.unskippable = []
  1249. for task in self.rqdata.runq_setscene:
  1250. if sq_revdeps_new2[task]:
  1251. self.unskippable.append(self.rqdata.runq_setscene.index(task))
  1252. for task in xrange(len(self.rqdata.runq_fnid)):
  1253. if task in self.rqdata.runq_setscene:
  1254. deps = set()
  1255. for dep in sq_revdeps_new[task]:
  1256. deps.add(self.rqdata.runq_setscene.index(dep))
  1257. sq_revdeps_squash.append(deps)
  1258. elif len(sq_revdeps_new[task]) != 0:
  1259. bb.msg.fatal("RunQueue", "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
  1260. # Resolve setscene inter-task dependencies
  1261. # e.g. do_sometask_setscene[depends] = "targetname:do_someothertask_setscene"
  1262. # Note that anything explicitly depended upon will have its reverse dependencies removed to avoid circular dependencies
  1263. for task in self.rqdata.runq_setscene:
  1264. realid = self.rqdata.taskData.gettask_id(self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]], self.rqdata.runq_task[task] + "_setscene", False)
  1265. idepends = self.rqdata.taskData.tasks_idepends[realid]
  1266. for (depid, idependtask) in idepends:
  1267. if depid not in self.rqdata.taskData.build_targets:
  1268. continue
  1269. depdata = self.rqdata.taskData.build_targets[depid][0]
  1270. if depdata is None:
  1271. continue
  1272. dep = self.rqdata.taskData.fn_index[depdata]
  1273. taskid = self.rqdata.get_task_id(self.rqdata.taskData.getfn_id(dep), idependtask.replace("_setscene", ""))
  1274. if taskid is None:
  1275. bb.msg.fatal("RunQueue", "Task %s:%s depends upon non-existent task %s:%s" % (self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realid]], self.rqdata.taskData.tasks_name[realid], dep, idependtask))
  1276. sq_revdeps_squash[self.rqdata.runq_setscene.index(task)].add(self.rqdata.runq_setscene.index(taskid))
  1277. # Have to zero this to avoid circular dependencies
  1278. sq_revdeps_squash[self.rqdata.runq_setscene.index(taskid)] = set()
  1279. #for task in xrange(len(sq_revdeps_squash)):
  1280. # print "Task %s: %s.%s is %s " % (task, self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[self.rqdata.runq_setscene[task]]], self.rqdata.runq_task[self.rqdata.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
  1281. self.sq_deps = []
  1282. self.sq_revdeps = sq_revdeps_squash
  1283. self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
  1284. for task in xrange(len(self.sq_revdeps)):
  1285. self.sq_deps.append(set())
  1286. for task in xrange(len(self.sq_revdeps)):
  1287. for dep in self.sq_revdeps[task]:
  1288. self.sq_deps[dep].add(task)
  1289. for task in xrange(len(self.sq_revdeps)):
  1290. if len(self.sq_revdeps[task]) == 0:
  1291. self.runq_buildable[task] = 1
  1292. if self.rq.hashvalidate:
  1293. sq_hash = []
  1294. sq_hashfn = []
  1295. sq_fn = []
  1296. sq_taskname = []
  1297. sq_task = []
  1298. noexec = []
  1299. stamppresent = []
  1300. for task in xrange(len(self.sq_revdeps)):
  1301. realtask = self.rqdata.runq_setscene[task]
  1302. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
  1303. taskname = self.rqdata.runq_task[realtask]
  1304. taskdep = self.rqdata.dataCache.task_deps[fn]
  1305. if 'noexec' in taskdep and taskname in taskdep['noexec']:
  1306. noexec.append(task)
  1307. self.task_skip(task)
  1308. bb.build.make_stamp(taskname + "_setscene", self.rqdata.dataCache, fn)
  1309. continue
  1310. if self.rq.check_stamp_task(realtask, taskname + "_setscene", cache=self.stampcache):
  1311. logger.debug(2, 'Setscene stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
  1312. stamppresent.append(task)
  1313. self.task_skip(task)
  1314. continue
  1315. if self.rq.check_stamp_task(realtask, taskname, recurse = True, cache=self.stampcache):
  1316. logger.debug(2, 'Normal stamp current for task %s(%s)', task, self.rqdata.get_user_idstring(realtask))
  1317. stamppresent.append(task)
  1318. self.task_skip(task)
  1319. continue
  1320. sq_fn.append(fn)
  1321. sq_hashfn.append(self.rqdata.dataCache.hashfn[fn])
  1322. sq_hash.append(self.rqdata.runq_hash[realtask])
  1323. sq_taskname.append(taskname)
  1324. sq_task.append(task)
  1325. call = self.rq.hashvalidate + "(sq_fn, sq_task, sq_hash, sq_hashfn, d)"
  1326. locs = { "sq_fn" : sq_fn, "sq_task" : sq_taskname, "sq_hash" : sq_hash, "sq_hashfn" : sq_hashfn, "d" : self.cooker.data }
  1327. valid = bb.utils.better_eval(call, locs)
  1328. valid_new = stamppresent
  1329. for v in valid:
  1330. valid_new.append(sq_task[v])
  1331. for task in xrange(len(self.sq_revdeps)):
  1332. if task not in valid_new and task not in noexec:
  1333. realtask = self.rqdata.runq_setscene[task]
  1334. logger.debug(2, 'No package found, so skipping setscene task %s',
  1335. self.rqdata.get_user_idstring(realtask))
  1336. self.task_failoutright(task)
  1337. logger.info('Executing SetScene Tasks')
  1338. self.rq.state = runQueueSceneRun
  1339. def scenequeue_updatecounters(self, task):
  1340. for dep in self.sq_deps[task]:
  1341. self.sq_revdeps2[dep].remove(task)
  1342. if len(self.sq_revdeps2[dep]) == 0:
  1343. self.runq_buildable[dep] = 1
  1344. def task_completeoutright(self, task):
  1345. """
  1346. Mark a task as completed
  1347. Look at the reverse dependencies and mark any task with
  1348. completed dependencies as buildable
  1349. """
  1350. index = self.rqdata.runq_setscene[task]
  1351. logger.debug(1, 'Found task %s which could be accelerated',
  1352. self.rqdata.get_user_idstring(index))
  1353. self.scenequeue_covered.add(task)
  1354. self.scenequeue_updatecounters(task)
  1355. def task_complete(self, task):
  1356. self.stats.taskCompleted()
  1357. self.task_completeoutright(task)
  1358. def task_fail(self, task, result):
  1359. self.stats.taskFailed()
  1360. bb.event.fire(sceneQueueTaskFailed(task, self.stats, result, self), self.cfgData)
  1361. self.scenequeue_notcovered.add(task)
  1362. self.scenequeue_updatecounters(task)
  1363. def task_failoutright(self, task):
  1364. self.runq_running[task] = 1
  1365. self.runq_buildable[task] = 1
  1366. self.stats.taskCompleted()
  1367. self.stats.taskSkipped()
  1368. index = self.rqdata.runq_setscene[task]
  1369. self.scenequeue_notcovered.add(task)
  1370. self.scenequeue_updatecounters(task)
  1371. def task_skip(self, task):
  1372. self.runq_running[task] = 1
  1373. self.runq_buildable[task] = 1
  1374. self.task_completeoutright(task)
  1375. self.stats.taskCompleted()
  1376. self.stats.taskSkipped()
  1377. def execute(self):
  1378. """
  1379. Run the tasks in a queue prepared by prepare_runqueue
  1380. """
  1381. self.rq.read_workers()
  1382. task = None
  1383. if self.stats.active < self.number_tasks:
  1384. # Find the next setscene to run
  1385. for nexttask in xrange(self.stats.total):
  1386. if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
  1387. if nexttask in self.unskippable:
  1388. logger.debug(2, "Setscene task %s is unskippable" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
  1389. if nexttask not in self.unskippable and len(self.sq_revdeps[nexttask]) > 0 and self.sq_revdeps[nexttask].issubset(self.scenequeue_covered) and self.check_dependencies(nexttask, self.sq_revdeps[nexttask], True):
  1390. logger.debug(2, "Skipping setscene for task %s" % self.rqdata.get_user_idstring(self.rqdata.runq_setscene[nexttask]))
  1391. self.task_skip(nexttask)
  1392. self.scenequeue_notneeded.add(nexttask)
  1393. return True
  1394. task = nexttask
  1395. break
  1396. if task is not None:
  1397. realtask = self.rqdata.runq_setscene[task]
  1398. fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
  1399. taskname = self.rqdata.runq_task[realtask] + "_setscene"
  1400. if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask], recurse = True, cache=self.stampcache):
  1401. logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
  1402. task, self.rqdata.get_user_idstring(realtask))
  1403. self.task_failoutright(task)
  1404. return True
  1405. if self.cooker.configuration.force:
  1406. for target in self.rqdata.target_pairs:
  1407. if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
  1408. self.task_failoutright(task)
  1409. return True
  1410. if self.rq.check_stamp_task(realtask, taskname, cache=self.stampcache):
  1411. logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
  1412. task, self.rqdata.get_user_idstring(realtask))
  1413. self.task_skip(task)
  1414. return True
  1415. startevent = sceneQueueTaskStarted(task, self.stats, self.rq)
  1416. bb.event.fire(startevent, self.cfgData)
  1417. taskdep = self.rqdata.dataCache.task_deps[fn]
  1418. if 'fakeroot' in taskdep and taskname in taskdep['fakeroot']:
  1419. if not self.rq.fakeworker:
  1420. self.rq.start_fakeworker(self)
  1421. self.rq.fakeworker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
  1422. self.rq.fakeworker.stdin.flush()
  1423. else:
  1424. self.rq.worker.stdin.write("<runtask>" + pickle.dumps((fn, realtask, taskname, True, self.cooker.collection.get_file_appends(fn))) + "</runtask>")
  1425. self.rq.worker.stdin.flush()
  1426. self.runq_running[task] = 1
  1427. self.stats.taskActive()
  1428. if self.stats.active < self.number_tasks:
  1429. return True
  1430. if self.stats.active > 0:
  1431. self.rq.read_workers()
  1432. return 0.5
  1433. # Convert scenequeue_covered task numbers into full taskgraph ids
  1434. oldcovered = self.scenequeue_covered
  1435. self.rq.scenequeue_covered = set()
  1436. for task in oldcovered:
  1437. self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
  1438. self.rq.scenequeue_notcovered = set()
  1439. for task in self.scenequeue_notcovered:
  1440. self.rq.scenequeue_notcovered.add(self.rqdata.runq_setscene[task])
  1441. logger.debug(1, 'We can skip tasks %s', sorted(self.rq.scenequeue_covered))
  1442. self.rq.state = runQueueRunInit
  1443. return True
  1444. def runqueue_process_waitpid(self, task, status):
  1445. task = self.rq.rqdata.runq_setscene.index(task)
  1446. RunQueueExecute.runqueue_process_waitpid(self, task, status)
  1447. class TaskFailure(Exception):
  1448. """
  1449. Exception raised when a task in a runqueue fails
  1450. """
  1451. def __init__(self, x):
  1452. self.args = x
  1453. class runQueueExitWait(bb.event.Event):
  1454. """
  1455. Event when waiting for task processes to exit
  1456. """
  1457. def __init__(self, remain):
  1458. self.remain = remain
  1459. self.message = "Waiting for %s active tasks to finish" % remain
  1460. bb.event.Event.__init__(self)
  1461. class runQueueEvent(bb.event.Event):
  1462. """
  1463. Base runQueue event class
  1464. """
  1465. def __init__(self, task, stats, rq):
  1466. self.taskid = task
  1467. self.taskstring = rq.rqdata.get_user_idstring(task)
  1468. self.stats = stats.copy()
  1469. bb.event.Event.__init__(self)
  1470. class sceneQueueEvent(runQueueEvent):
  1471. """
  1472. Base sceneQueue event class
  1473. """
  1474. def __init__(self, task, stats, rq, noexec=False):
  1475. runQueueEvent.__init__(self, task, stats, rq)
  1476. realtask = rq.rqdata.runq_setscene[task]
  1477. self.taskstring = rq.rqdata.get_user_idstring(realtask, "_setscene")
  1478. class runQueueTaskStarted(runQueueEvent):
  1479. """
  1480. Event notifing a task was started
  1481. """
  1482. def __init__(self, task, stats, rq, noexec=False):
  1483. runQueueEvent.__init__(self, task, stats, rq)
  1484. self.noexec = noexec
  1485. class sceneQueueTaskStarted(sceneQueueEvent):
  1486. """
  1487. Event notifing a setscene task was started
  1488. """
  1489. def __init__(self, task, stats, rq, noexec=False):
  1490. sceneQueueEvent.__init__(self, task, stats, rq)
  1491. self.noexec = noexec
  1492. class runQueueTaskFailed(runQueueEvent):
  1493. """
  1494. Event notifing a task failed
  1495. """
  1496. def __init__(self, task, stats, exitcode, rq):
  1497. runQueueEvent.__init__(self, task, stats, rq)
  1498. self.exitcode = exitcode
  1499. class sceneQueueTaskFailed(sceneQueueEvent):
  1500. """
  1501. Event notifing a setscene task failed
  1502. """
  1503. def __init__(self, task, stats, exitcode, rq):
  1504. sceneQueueEvent.__init__(self, task, stats, rq)
  1505. self.exitcode = exitcode
  1506. class runQueueTaskCompleted(runQueueEvent):
  1507. """
  1508. Event notifing a task completed
  1509. """
  1510. class runQueuePipe():
  1511. """
  1512. Abstraction for a pipe between a worker thread and the server
  1513. """
  1514. def __init__(self, pipein, pipeout, d, rq):
  1515. self.input = pipein
  1516. if pipeout:
  1517. pipeout.close()
  1518. bb.utils.nonblockingfd(self.input)
  1519. self.queue = ""
  1520. self.d = d
  1521. self.rq = rq
  1522. def setrunqueueexec(self, rq):
  1523. self.rq = rq
  1524. def read(self):
  1525. start = len(self.queue)
  1526. try:
  1527. self.queue = self.queue + self.input.read(102400)
  1528. except (OSError, IOError) as e:
  1529. if e.errno != errno.EAGAIN:
  1530. raise
  1531. end = len(self.queue)
  1532. found = True
  1533. while found and len(self.queue):
  1534. found = False
  1535. index = self.queue.find("</event>")
  1536. while index != -1 and self.queue.startswith("<event>"):
  1537. event = pickle.loads(self.queue[7:index])
  1538. bb.event.fire_from_worker(event, self.d)
  1539. found = True
  1540. self.queue = self.queue[index+8:]
  1541. index = self.queue.find("</event>")
  1542. index = self.queue.find("</exitcode>")
  1543. while index != -1 and self.queue.startswith("<exitcode>"):
  1544. task, status = pickle.loads(self.queue[10:index])
  1545. self.rq.runqueue_process_waitpid(task, status)
  1546. found = True
  1547. self.queue = self.queue[index+11:]
  1548. index = self.queue.find("</exitcode>")
  1549. return (end > start)
  1550. def close(self):
  1551. while self.read():
  1552. continue
  1553. if len(self.queue) > 0:
  1554. print("Warning, worker left partial message: %s" % self.queue)
  1555. self.input.close()