runqueue.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. #!/usr/bin/env python
  2. # ex:ts=4:sw=4:sts=4:et
  3. # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
  4. """
  5. BitBake 'RunQueue' implementation
  6. Handles preparation and execution of a queue of tasks
  7. """
  8. # Copyright (C) 2006-2007 Richard Purdie
  9. #
  10. # This program is free software; you can redistribute it and/or modify
  11. # it under the terms of the GNU General Public License version 2 as
  12. # published by the Free Software Foundation.
  13. #
  14. # This program is distributed in the hope that it will be useful,
  15. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  16. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  17. # GNU General Public License for more details.
  18. #
  19. # You should have received a copy of the GNU General Public License along
  20. # with this program; if not, write to the Free Software Foundation, Inc.,
  21. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  22. from bb import msg, data, event, mkdirhier, utils
  23. from sets import Set
  24. import bb, os, sys
  25. import signal
  26. class TaskFailure(Exception):
  27. """Exception raised when a task in a runqueue fails"""
  28. def __init__(self, x):
  29. self.args = x
  30. class RunQueueStats:
  31. """
  32. Holds statistics on the tasks handled by the associated runQueue
  33. """
  34. def __init__(self):
  35. self.completed = 0
  36. self.skipped = 0
  37. self.failed = 0
  38. def taskFailed(self):
  39. self.failed = self.failed + 1
  40. def taskCompleted(self):
  41. self.completed = self.completed + 1
  42. def taskSkipped(self):
  43. self.skipped = self.skipped + 1
  44. class RunQueueScheduler:
  45. """
  46. Control the order tasks are scheduled in.
  47. """
  48. def __init__(self, runqueue):
  49. """
  50. The default scheduler just returns the first buildable task (the
  51. priority map is sorted by task numer)
  52. """
  53. self.rq = runqueue
  54. numTasks = len(self.rq.runq_fnid)
  55. self.prio_map = []
  56. self.prio_map.extend(range(numTasks))
  57. def next(self):
  58. """
  59. Return the id of the first task we find that is buildable
  60. """
  61. for task1 in range(len(self.rq.runq_fnid)):
  62. task = self.prio_map[task1]
  63. if self.rq.runq_running[task] == 1:
  64. continue
  65. if self.rq.runq_buildable[task] == 1:
  66. return task
  67. class RunQueueSchedulerSpeed(RunQueueScheduler):
  68. """
  69. A scheduler optimised for speed. The priority map is sorted by task weight,
  70. heavier weighted tasks (tasks needed by the most other tasks) are run first.
  71. """
  72. def __init__(self, runqueue):
  73. """
  74. The priority map is sorted by task weight.
  75. """
  76. from copy import deepcopy
  77. self.rq = runqueue
  78. sortweight = deepcopy(self.rq.runq_weight)
  79. sortweight.sort()
  80. copyweight = deepcopy(self.rq.runq_weight)
  81. self.prio_map = []
  82. for weight in sortweight:
  83. idx = copyweight.index(weight)
  84. self.prio_map.append(idx)
  85. copyweight[idx] = -1
  86. self.prio_map.reverse()
  87. class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
  88. """
  89. A scheduler optimised to complete .bb files are quickly as possible. The
  90. priority map is sorted by task weight, but then reordered so once a given
  91. .bb file starts to build, its completed as quickly as possible. This works
  92. well where disk space is at a premium and classes like OE's rm_work are in
  93. force.
  94. """
  95. def __init__(self, runqueue):
  96. RunQueueSchedulerSpeed.__init__(self, runqueue)
  97. from copy import deepcopy
  98. #FIXME - whilst this groups all fnids together it does not reorder the
  99. #fnid groups optimally.
  100. basemap = deepcopy(self.prio_map)
  101. self.prio_map = []
  102. while (len(basemap) > 0):
  103. entry = basemap.pop(0)
  104. self.prio_map.append(entry)
  105. fnid = self.rq.runq_fnid[entry]
  106. todel = []
  107. for entry in basemap:
  108. entry_fnid = self.rq.runq_fnid[entry]
  109. if entry_fnid == fnid:
  110. todel.append(basemap.index(entry))
  111. self.prio_map.append(entry)
  112. todel.reverse()
  113. for idx in todel:
  114. del basemap[idx]
  115. class RunQueue:
  116. """
  117. BitBake Run Queue implementation
  118. """
  119. def __init__(self, cooker, cfgData, dataCache, taskData, targets):
  120. self.reset_runqueue()
  121. self.cooker = cooker
  122. self.dataCache = dataCache
  123. self.taskData = taskData
  124. self.targets = targets
  125. self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData) or 1)
  126. self.multi_provider_whitelist = (bb.data.getVar("MULTI_PROVIDER_WHITELIST", cfgData) or "").split()
  127. def reset_runqueue(self):
  128. self.runq_fnid = []
  129. self.runq_task = []
  130. self.runq_depends = []
  131. self.runq_revdeps = []
  132. def get_user_idstring(self, task):
  133. fn = self.taskData.fn_index[self.runq_fnid[task]]
  134. taskname = self.runq_task[task]
  135. return "%s, %s" % (fn, taskname)
  136. def circular_depchains_handler(self, tasks):
  137. """
  138. Some tasks aren't buildable, likely due to circular dependency issues.
  139. Identify the circular dependencies and print them in a user readable format.
  140. """
  141. from copy import deepcopy
  142. valid_chains = []
  143. explored_deps = {}
  144. msgs = []
  145. def chain_reorder(chain):
  146. """
  147. Reorder a dependency chain so the lowest task id is first
  148. """
  149. lowest = 0
  150. new_chain = []
  151. for entry in range(len(chain)):
  152. if chain[entry] < chain[lowest]:
  153. lowest = entry
  154. new_chain.extend(chain[lowest:])
  155. new_chain.extend(chain[:lowest])
  156. return new_chain
  157. def chain_compare_equal(chain1, chain2):
  158. """
  159. Compare two dependency chains and see if they're the same
  160. """
  161. if len(chain1) != len(chain2):
  162. return False
  163. for index in range(len(chain1)):
  164. if chain1[index] != chain2[index]:
  165. return False
  166. return True
  167. def chain_array_contains(chain, chain_array):
  168. """
  169. Return True if chain_array contains chain
  170. """
  171. for ch in chain_array:
  172. if chain_compare_equal(ch, chain):
  173. return True
  174. return False
  175. def find_chains(taskid, prev_chain):
  176. prev_chain.append(taskid)
  177. total_deps = []
  178. total_deps.extend(self.runq_revdeps[taskid])
  179. for revdep in self.runq_revdeps[taskid]:
  180. if revdep in prev_chain:
  181. idx = prev_chain.index(revdep)
  182. # To prevent duplicates, reorder the chain to start with the lowest taskid
  183. # and search through an array of those we've already printed
  184. chain = prev_chain[idx:]
  185. new_chain = chain_reorder(chain)
  186. if not chain_array_contains(new_chain, valid_chains):
  187. valid_chains.append(new_chain)
  188. msgs.append("Dependency loop #%d found:\n" % len(valid_chains))
  189. for dep in new_chain:
  190. msgs.append(" Task %s (%s) (depends: %s)\n" % (dep, self.get_user_idstring(dep), self.runq_depends[dep]))
  191. msgs.append("\n")
  192. if len(valid_chains) > 10:
  193. msgs.append("Aborted dependency loops search after 10 matches.\n")
  194. return msgs
  195. continue
  196. scan = False
  197. if revdep not in explored_deps:
  198. scan = True
  199. elif revdep in explored_deps[revdep]:
  200. scan = True
  201. else:
  202. for dep in prev_chain:
  203. if dep in explored_deps[revdep]:
  204. scan = True
  205. if scan:
  206. find_chains(revdep, deepcopy(prev_chain))
  207. for dep in explored_deps[revdep]:
  208. if dep not in total_deps:
  209. total_deps.append(dep)
  210. explored_deps[taskid] = total_deps
  211. for task in tasks:
  212. find_chains(task, [])
  213. return msgs
  214. def calculate_task_weights(self, endpoints):
  215. """
  216. Calculate a number representing the "weight" of each task. Heavier weighted tasks
  217. have more dependencies and hence should be executed sooner for maximum speed.
  218. This function also sanity checks the task list finding tasks that its not
  219. possible to execute due to circular dependencies.
  220. """
  221. numTasks = len(self.runq_fnid)
  222. weight = []
  223. deps_left = []
  224. task_done = []
  225. for listid in range(numTasks):
  226. task_done.append(False)
  227. weight.append(0)
  228. deps_left.append(len(self.runq_revdeps[listid]))
  229. for listid in endpoints:
  230. weight[listid] = 1
  231. task_done[listid] = True
  232. while 1:
  233. next_points = []
  234. for listid in endpoints:
  235. for revdep in self.runq_depends[listid]:
  236. weight[revdep] = weight[revdep] + weight[listid]
  237. deps_left[revdep] = deps_left[revdep] - 1
  238. if deps_left[revdep] == 0:
  239. next_points.append(revdep)
  240. task_done[revdep] = True
  241. endpoints = next_points
  242. if len(next_points) == 0:
  243. break
  244. # Circular dependency sanity check
  245. problem_tasks = []
  246. for task in range(numTasks):
  247. if task_done[task] is False or deps_left[task] != 0:
  248. problem_tasks.append(task)
  249. bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s) is not buildable\n" % (task, self.get_user_idstring(task)))
  250. bb.msg.debug(2, bb.msg.domain.RunQueue, "(Complete marker was %s and the remaining dependency count was %s)\n\n" % (task_done[task], deps_left[task]))
  251. if problem_tasks:
  252. message = "Unbuildable tasks were found.\n"
  253. message = message + "These are usually caused by circular dependencies and any circular dependency chains found will be printed below. Increase the debug level to see a list of unbuildable tasks.\n\n"
  254. message = message + "Identifying dependency loops (this may take a short while)...\n"
  255. bb.msg.error(bb.msg.domain.RunQueue, message)
  256. msgs = self.circular_depchains_handler(problem_tasks)
  257. message = "\n"
  258. for msg in msgs:
  259. message = message + msg
  260. bb.msg.fatal(bb.msg.domain.RunQueue, message)
  261. return weight
  262. def prepare_runqueue(self):
  263. """
  264. Turn a set of taskData into a RunQueue and compute data needed
  265. to optimise the execution order.
  266. """
  267. depends = []
  268. runq_build = []
  269. taskData = self.taskData
  270. if len(taskData.tasks_name) == 0:
  271. # Nothing to do
  272. return
  273. bb.msg.note(1, bb.msg.domain.RunQueue, "Preparing runqueue")
  274. # Step A - Work out a list of tasks to run
  275. #
  276. # Taskdata gives us a list of possible providers for a every target
  277. # ordered by priority (build_targets, run_targets). It also gives
  278. # information on each of those providers.
  279. #
  280. # To create the actual list of tasks to execute we fix the list of
  281. # providers and then resolve the dependencies into task IDs. This
  282. # process is repeated for each type of dependency (tdepends, deptask,
  283. # rdeptast, recrdeptask, idepends).
  284. for task in range(len(taskData.tasks_name)):
  285. fnid = taskData.tasks_fnid[task]
  286. fn = taskData.fn_index[fnid]
  287. task_deps = self.dataCache.task_deps[fn]
  288. if fnid not in taskData.failed_fnids:
  289. # Resolve task internal dependencies
  290. #
  291. # e.g. addtask before X after Y
  292. depends = taskData.tasks_tdepends[task]
  293. # Resolve 'deptask' dependencies
  294. #
  295. # e.g. do_sometask[deptask] = "do_someothertask"
  296. # (makes sure sometask runs after someothertask of all DEPENDS)
  297. if 'deptask' in task_deps and taskData.tasks_name[task] in task_deps['deptask']:
  298. tasknames = task_deps['deptask'][taskData.tasks_name[task]].split()
  299. for depid in taskData.depids[fnid]:
  300. # Won't be in build_targets if ASSUME_PROVIDED
  301. if depid in taskData.build_targets:
  302. depdata = taskData.build_targets[depid][0]
  303. if depdata is not None:
  304. dep = taskData.fn_index[depdata]
  305. for taskname in tasknames:
  306. depends.append(taskData.gettask_id(dep, taskname))
  307. # Resolve 'rdeptask' dependencies
  308. #
  309. # e.g. do_sometask[rdeptask] = "do_someothertask"
  310. # (makes sure sometask runs after someothertask of all RDEPENDS)
  311. if 'rdeptask' in task_deps and taskData.tasks_name[task] in task_deps['rdeptask']:
  312. taskname = task_deps['rdeptask'][taskData.tasks_name[task]]
  313. for depid in taskData.rdepids[fnid]:
  314. if depid in taskData.run_targets:
  315. depdata = taskData.run_targets[depid][0]
  316. if depdata is not None:
  317. dep = taskData.fn_index[depdata]
  318. depends.append(taskData.gettask_id(dep, taskname))
  319. # Resolve inter-task dependencies
  320. #
  321. # e.g. do_sometask[depends] = "targetname:do_someothertask"
  322. # (makes sure sometask runs after targetname's someothertask)
  323. idepends = taskData.tasks_idepends[task]
  324. for idepend in idepends:
  325. depid = int(idepend.split(":")[0])
  326. if depid in taskData.build_targets:
  327. # Won't be in build_targets if ASSUME_PROVIDED
  328. depdata = taskData.build_targets[depid][0]
  329. if depdata is not None:
  330. dep = taskData.fn_index[depdata]
  331. depends.append(taskData.gettask_id(dep, idepend.split(":")[1]))
  332. def add_recursive_build(depid, depfnid):
  333. """
  334. Add build depends of depid to depends
  335. (if we've not see it before)
  336. (calls itself recursively)
  337. """
  338. if str(depid) in dep_seen:
  339. return
  340. dep_seen.append(depid)
  341. if depid in taskData.build_targets:
  342. depdata = taskData.build_targets[depid][0]
  343. if depdata is not None:
  344. dep = taskData.fn_index[depdata]
  345. idepends = []
  346. # Need to avoid creating new tasks here
  347. taskid = taskData.gettask_id(dep, taskname, False)
  348. if taskid is not None:
  349. depends.append(taskid)
  350. fnid = taskData.tasks_fnid[taskid]
  351. idepends = taskData.tasks_idepends[taskid]
  352. #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
  353. else:
  354. fnid = taskData.getfn_id(dep)
  355. for nextdepid in taskData.depids[fnid]:
  356. if nextdepid not in dep_seen:
  357. add_recursive_build(nextdepid, fnid)
  358. for nextdepid in taskData.rdepids[fnid]:
  359. if nextdepid not in rdep_seen:
  360. add_recursive_run(nextdepid, fnid)
  361. for idepend in idepends:
  362. nextdepid = int(idepend.split(":")[0])
  363. if nextdepid not in dep_seen:
  364. add_recursive_build(nextdepid, fnid)
  365. def add_recursive_run(rdepid, depfnid):
  366. """
  367. Add runtime depends of rdepid to depends
  368. (if we've not see it before)
  369. (calls itself recursively)
  370. """
  371. if str(rdepid) in rdep_seen:
  372. return
  373. rdep_seen.append(rdepid)
  374. if rdepid in taskData.run_targets:
  375. depdata = taskData.run_targets[rdepid][0]
  376. if depdata is not None:
  377. dep = taskData.fn_index[depdata]
  378. idepends = []
  379. # Need to avoid creating new tasks here
  380. taskid = taskData.gettask_id(dep, taskname, False)
  381. if taskid is not None:
  382. depends.append(taskid)
  383. fnid = taskData.tasks_fnid[taskid]
  384. idepends = taskData.tasks_idepends[taskid]
  385. #print "Added %s (%s) due to %s" % (taskid, taskData.fn_index[fnid], taskData.fn_index[depfnid])
  386. else:
  387. fnid = taskData.getfn_id(dep)
  388. for nextdepid in taskData.depids[fnid]:
  389. if nextdepid not in dep_seen:
  390. add_recursive_build(nextdepid, fnid)
  391. for nextdepid in taskData.rdepids[fnid]:
  392. if nextdepid not in rdep_seen:
  393. add_recursive_run(nextdepid, fnid)
  394. for idepend in idepends:
  395. nextdepid = int(idepend.split(":")[0])
  396. if nextdepid not in dep_seen:
  397. add_recursive_build(nextdepid, fnid)
  398. # Resolve recursive 'recrdeptask' dependencies
  399. #
  400. # e.g. do_sometask[recrdeptask] = "do_someothertask"
  401. # (makes sure sometask runs after someothertask of all DEPENDS, RDEPENDS and intertask dependencies, recursively)
  402. if 'recrdeptask' in task_deps and taskData.tasks_name[task] in task_deps['recrdeptask']:
  403. for taskname in task_deps['recrdeptask'][taskData.tasks_name[task]].split():
  404. dep_seen = []
  405. rdep_seen = []
  406. idep_seen = []
  407. for depid in taskData.depids[fnid]:
  408. add_recursive_build(depid, fnid)
  409. for rdepid in taskData.rdepids[fnid]:
  410. add_recursive_run(rdepid, fnid)
  411. for idepend in idepends:
  412. depid = int(idepend.split(":")[0])
  413. add_recursive_build(depid, fnid)
  414. # Rmove all self references
  415. if task in depends:
  416. newdep = []
  417. bb.msg.debug(2, bb.msg.domain.RunQueue, "Task %s (%s %s) contains self reference! %s" % (task, taskData.fn_index[taskData.tasks_fnid[task]], taskData.tasks_name[task], depends))
  418. for dep in depends:
  419. if task != dep:
  420. newdep.append(dep)
  421. depends = newdep
  422. self.runq_fnid.append(taskData.tasks_fnid[task])
  423. self.runq_task.append(taskData.tasks_name[task])
  424. self.runq_depends.append(Set(depends))
  425. self.runq_revdeps.append(Set())
  426. runq_build.append(0)
  427. # Step B - Mark all active tasks
  428. #
  429. # Start with the tasks we were asked to run and mark all dependencies
  430. # as active too. If the task is to be 'forced', clear its stamp. Once
  431. # all active tasks are marked, prune the ones we don't need.
  432. bb.msg.note(2, bb.msg.domain.RunQueue, "Marking Active Tasks")
  433. def mark_active(listid, depth):
  434. """
  435. Mark an item as active along with its depends
  436. (calls itself recursively)
  437. """
  438. if runq_build[listid] == 1:
  439. return
  440. runq_build[listid] = 1
  441. depends = self.runq_depends[listid]
  442. for depend in depends:
  443. mark_active(depend, depth+1)
  444. for target in self.targets:
  445. targetid = taskData.getbuild_id(target[0])
  446. if targetid not in taskData.build_targets:
  447. continue
  448. if targetid in taskData.failed_deps:
  449. continue
  450. fnid = taskData.build_targets[targetid][0]
  451. # Remove stamps for targets if force mode active
  452. if self.cooker.configuration.force:
  453. fn = taskData.fn_index[fnid]
  454. bb.msg.note(2, bb.msg.domain.RunQueue, "Remove stamp %s, %s" % (target[1], fn))
  455. bb.build.del_stamp(target[1], self.dataCache, fn)
  456. if fnid in taskData.failed_fnids:
  457. continue
  458. if target[1] not in taskData.tasks_lookup[fnid]:
  459. bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s does not exist for target %s" % (target[1], target[0]))
  460. listid = taskData.tasks_lookup[fnid][target[1]]
  461. mark_active(listid, 1)
  462. # Step C - Prune all inactive tasks
  463. #
  464. # Once all active tasks are marked, prune the ones we don't need.
  465. maps = []
  466. delcount = 0
  467. for listid in range(len(self.runq_fnid)):
  468. if runq_build[listid-delcount] == 1:
  469. maps.append(listid-delcount)
  470. else:
  471. del self.runq_fnid[listid-delcount]
  472. del self.runq_task[listid-delcount]
  473. del self.runq_depends[listid-delcount]
  474. del runq_build[listid-delcount]
  475. del self.runq_revdeps[listid-delcount]
  476. delcount = delcount + 1
  477. maps.append(-1)
  478. #
  479. # Step D - Sanity checks and computation
  480. #
  481. # Check to make sure we still have tasks to run
  482. if len(self.runq_fnid) == 0:
  483. if not taskData.abort:
  484. bb.msg.fatal(bb.msg.domain.RunQueue, "All buildable tasks have been run but the build is incomplete (--continue mode). Errors for the tasks that failed will have been printed above.")
  485. else:
  486. bb.msg.fatal(bb.msg.domain.RunQueue, "No active tasks and not in --continue mode?! Please report this bug.")
  487. bb.msg.note(2, bb.msg.domain.RunQueue, "Pruned %s inactive tasks, %s left" % (delcount, len(self.runq_fnid)))
  488. # Remap the dependencies to account for the deleted tasks
  489. # Check we didn't delete a task we depend on
  490. for listid in range(len(self.runq_fnid)):
  491. newdeps = []
  492. origdeps = self.runq_depends[listid]
  493. for origdep in origdeps:
  494. if maps[origdep] == -1:
  495. bb.msg.fatal(bb.msg.domain.RunQueue, "Invalid mapping - Should never happen!")
  496. newdeps.append(maps[origdep])
  497. self.runq_depends[listid] = Set(newdeps)
  498. bb.msg.note(2, bb.msg.domain.RunQueue, "Assign Weightings")
  499. # Generate a list of reverse dependencies to ease future calculations
  500. for listid in range(len(self.runq_fnid)):
  501. for dep in self.runq_depends[listid]:
  502. self.runq_revdeps[dep].add(listid)
  503. # Identify tasks at the end of dependency chains
  504. # Error on circular dependency loops (length two)
  505. endpoints = []
  506. for listid in range(len(self.runq_fnid)):
  507. revdeps = self.runq_revdeps[listid]
  508. if len(revdeps) == 0:
  509. endpoints.append(listid)
  510. for dep in revdeps:
  511. if dep in self.runq_depends[listid]:
  512. #self.dump_data(taskData)
  513. bb.msg.fatal(bb.msg.domain.RunQueue, "Task %s (%s) has circular dependency on %s (%s)" % (taskData.fn_index[self.runq_fnid[dep]], self.runq_task[dep] , taskData.fn_index[self.runq_fnid[listid]], self.runq_task[listid]))
  514. bb.msg.note(2, bb.msg.domain.RunQueue, "Compute totals (have %s endpoint(s))" % len(endpoints))
  515. # Calculate task weights
  516. # Check of higher length circular dependencies
  517. self.runq_weight = self.calculate_task_weights(endpoints)
  518. # Decide what order to execute the tasks in, pick a scheduler
  519. # FIXME - Allow user selection
  520. #self.sched = RunQueueScheduler(self)
  521. self.sched = RunQueueSchedulerSpeed(self)
  522. #self.sched = RunQueueSchedulerCompletion(self)
  523. # Sanity Check - Check for multiple tasks building the same provider
  524. prov_list = {}
  525. seen_fn = []
  526. for task in range(len(self.runq_fnid)):
  527. fn = taskData.fn_index[self.runq_fnid[task]]
  528. if fn in seen_fn:
  529. continue
  530. seen_fn.append(fn)
  531. for prov in self.dataCache.fn_provides[fn]:
  532. if prov not in prov_list:
  533. prov_list[prov] = [fn]
  534. elif fn not in prov_list[prov]:
  535. prov_list[prov].append(fn)
  536. error = False
  537. for prov in prov_list:
  538. if len(prov_list[prov]) > 1 and prov not in self.multi_provider_whitelist:
  539. error = True
  540. bb.msg.error(bb.msg.domain.RunQueue, "Multiple .bb files are due to be built which each provide %s (%s).\n This usually means one provides something the other doesn't and should." % (prov, " ".join(prov_list[prov])))
  541. #if error:
  542. # bb.msg.fatal(bb.msg.domain.RunQueue, "Corrupted metadata configuration detected, aborting...")
  543. #self.dump_data(taskData)
  544. def execute_runqueue(self):
  545. """
  546. Run the tasks in a queue prepared by prepare_runqueue
  547. Upon failure, optionally try to recover the build using any alternate providers
  548. (if the abort on failure configuration option isn't set)
  549. """
  550. failures = 0
  551. while 1:
  552. failed_fnids = []
  553. try:
  554. self.execute_runqueue_internal()
  555. finally:
  556. if self.master_process:
  557. failed_fnids = self.finish_runqueue()
  558. if len(failed_fnids) == 0:
  559. return failures
  560. if self.taskData.abort:
  561. raise bb.runqueue.TaskFailure(failed_fnids)
  562. for fnid in failed_fnids:
  563. #print "Failure: %s %s %s" % (fnid, self.taskData.fn_index[fnid], self.runq_task[fnid])
  564. self.taskData.fail_fnid(fnid)
  565. failures = failures + 1
  566. self.reset_runqueue()
  567. self.prepare_runqueue()
  568. def execute_runqueue_initVars(self):
  569. self.stats = RunQueueStats()
  570. self.active_builds = 0
  571. self.runq_buildable = []
  572. self.runq_running = []
  573. self.runq_complete = []
  574. self.build_pids = {}
  575. self.failed_fnids = []
  576. self.master_process = True
  577. # Mark initial buildable tasks
  578. for task in range(len(self.runq_fnid)):
  579. self.runq_running.append(0)
  580. self.runq_complete.append(0)
  581. if len(self.runq_depends[task]) == 0:
  582. self.runq_buildable.append(1)
  583. else:
  584. self.runq_buildable.append(0)
  585. def task_complete(self, task):
  586. """
  587. Mark a task as completed
  588. Look at the reverse dependencies and mark any task with
  589. completed dependencies as buildable
  590. """
  591. self.runq_complete[task] = 1
  592. for revdep in self.runq_revdeps[task]:
  593. if self.runq_running[revdep] == 1:
  594. continue
  595. if self.runq_buildable[revdep] == 1:
  596. continue
  597. alldeps = 1
  598. for dep in self.runq_depends[revdep]:
  599. if self.runq_complete[dep] != 1:
  600. alldeps = 0
  601. if alldeps == 1:
  602. self.runq_buildable[revdep] = 1
  603. fn = self.taskData.fn_index[self.runq_fnid[revdep]]
  604. taskname = self.runq_task[revdep]
  605. bb.msg.debug(1, bb.msg.domain.RunQueue, "Marking task %s (%s, %s) as buildable" % (revdep, fn, taskname))
  606. def execute_runqueue_internal(self):
  607. """
  608. Run the tasks in a queue prepared by prepare_runqueue
  609. """
  610. bb.msg.note(1, bb.msg.domain.RunQueue, "Executing runqueue")
  611. self.execute_runqueue_initVars()
  612. if len(self.runq_fnid) == 0:
  613. # nothing to do
  614. return []
  615. def sigint_handler(signum, frame):
  616. raise KeyboardInterrupt
  617. # RP - this code allows tasks to run out of the correct order - disabled, FIXME
  618. # Find any tasks with current stamps and remove them from the queue
  619. #for task1 in range(len(self.runq_fnid)):
  620. # task = self.prio_map[task1]
  621. # fn = self.taskData.fn_index[self.runq_fnid[task]]
  622. # taskname = self.runq_task[task]
  623. # if bb.build.stamp_is_current(taskname, self.dataCache, fn):
  624. # bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
  625. # self.runq_running[task] = 1
  626. # self.task_complete(task)
  627. # self.stats.taskCompleted()
  628. # self.stats.taskSkipped()
  629. while True:
  630. task = self.sched.next()
  631. if task is not None:
  632. fn = self.taskData.fn_index[self.runq_fnid[task]]
  633. taskname = self.runq_task[task]
  634. if bb.build.stamp_is_current(taskname, self.dataCache, fn):
  635. bb.msg.debug(2, bb.msg.domain.RunQueue, "Stamp current task %s (%s)" % (task, self.get_user_idstring(task)))
  636. self.runq_running[task] = 1
  637. self.task_complete(task)
  638. self.stats.taskCompleted()
  639. self.stats.taskSkipped()
  640. continue
  641. bb.msg.note(1, bb.msg.domain.RunQueue, "Running task %d of %d (ID: %s, %s)" % (self.stats.completed + self.active_builds + 1, len(self.runq_fnid), task, self.get_user_idstring(task)))
  642. try:
  643. pid = os.fork()
  644. except OSError, e:
  645. bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
  646. if pid == 0:
  647. # Bypass master process' handling
  648. self.master_process = False
  649. # Stop Ctrl+C being sent to children
  650. # signal.signal(signal.SIGINT, signal.SIG_IGN)
  651. # Make the child the process group leader
  652. os.setpgid(0, 0)
  653. newsi = os.open('/dev/null', os.O_RDWR)
  654. os.dup2(newsi, sys.stdin.fileno())
  655. self.cooker.configuration.cmd = taskname[3:]
  656. try:
  657. self.cooker.tryBuild(fn, False)
  658. except bb.build.EventException:
  659. bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
  660. sys.exit(1)
  661. except:
  662. bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
  663. raise
  664. sys.exit(0)
  665. self.build_pids[pid] = task
  666. self.runq_running[task] = 1
  667. self.active_builds = self.active_builds + 1
  668. if self.active_builds < self.number_tasks:
  669. continue
  670. if self.active_builds > 0:
  671. result = os.waitpid(-1, 0)
  672. self.active_builds = self.active_builds - 1
  673. task = self.build_pids[result[0]]
  674. if result[1] != 0:
  675. del self.build_pids[result[0]]
  676. bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
  677. self.failed_fnids.append(self.runq_fnid[task])
  678. self.stats.taskFailed()
  679. break
  680. self.task_complete(task)
  681. self.stats.taskCompleted()
  682. del self.build_pids[result[0]]
  683. continue
  684. return
  685. def finish_runqueue(self):
  686. try:
  687. while self.active_builds > 0:
  688. bb.msg.note(1, bb.msg.domain.RunQueue, "Waiting for %s active tasks to finish" % self.active_builds)
  689. tasknum = 1
  690. for k, v in self.build_pids.iteritems():
  691. bb.msg.note(1, bb.msg.domain.RunQueue, "%s: %s (%s)" % (tasknum, self.get_user_idstring(v), k))
  692. tasknum = tasknum + 1
  693. result = os.waitpid(-1, 0)
  694. task = self.build_pids[result[0]]
  695. if result[1] != 0:
  696. bb.msg.error(bb.msg.domain.RunQueue, "Task %s (%s) failed" % (task, self.get_user_idstring(task)))
  697. self.failed_fnids.append(self.runq_fnid[task])
  698. self.stats.taskFailed()
  699. del self.build_pids[result[0]]
  700. self.active_builds = self.active_builds - 1
  701. bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
  702. return self.failed_fnids
  703. except KeyboardInterrupt:
  704. bb.msg.note(1, bb.msg.domain.RunQueue, "Sending SIGINT to remaining %s tasks" % self.active_builds)
  705. for k, v in self.build_pids.iteritems():
  706. try:
  707. os.kill(-k, signal.SIGINT)
  708. except:
  709. pass
  710. raise
  711. # Sanity Checks
  712. for task in range(len(self.runq_fnid)):
  713. if self.runq_buildable[task] == 0:
  714. bb.msg.error(bb.msg.domain.RunQueue, "Task %s never buildable!" % task)
  715. if self.runq_running[task] == 0:
  716. bb.msg.error(bb.msg.domain.RunQueue, "Task %s never ran!" % task)
  717. if self.runq_complete[task] == 0:
  718. bb.msg.error(bb.msg.domain.RunQueue, "Task %s never completed!" % task)
  719. bb.msg.note(1, bb.msg.domain.RunQueue, "Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed." % (self.stats.completed, self.stats.skipped, self.stats.failed))
  720. return self.failed_fnids
  721. def dump_data(self, taskQueue):
  722. """
  723. Dump some debug information on the internal data structures
  724. """
  725. bb.msg.debug(3, bb.msg.domain.RunQueue, "run_tasks:")
  726. for task in range(len(self.runq_fnid)):
  727. bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
  728. taskQueue.fn_index[self.runq_fnid[task]],
  729. self.runq_task[task],
  730. self.runq_weight[task],
  731. self.runq_depends[task],
  732. self.runq_revdeps[task]))
  733. bb.msg.debug(3, bb.msg.domain.RunQueue, "sorted_tasks:")
  734. for task1 in range(len(self.runq_fnid)):
  735. if task1 in self.prio_map:
  736. task = self.prio_map[task1]
  737. bb.msg.debug(3, bb.msg.domain.RunQueue, " (%s)%s - %s: %s Deps %s RevDeps %s" % (task,
  738. taskQueue.fn_index[self.runq_fnid[task]],
  739. self.runq_task[task],
  740. self.runq_weight[task],
  741. self.runq_depends[task],
  742. self.runq_revdeps[task]))