process.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. #
  2. # BitBake Process based server.
  3. #
  4. # Copyright (C) 2010 Bob Foerster <robert@erafx.com>
  5. #
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License version 2 as
  8. # published by the Free Software Foundation.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License along
  16. # with this program; if not, write to the Free Software Foundation, Inc.,
  17. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  18. """
  19. This module implements a multiprocessing.Process based server for bitbake.
  20. """
  21. import bb
  22. import bb.event
  23. import itertools
  24. import logging
  25. import multiprocessing
  26. import os
  27. import signal
  28. import sys
  29. import time
  30. from Queue import Empty
  31. from multiprocessing import Event, Process, util, Queue, Pipe, queues
  32. logger = logging.getLogger('BitBake')
  33. class ServerCommunicator():
  34. def __init__(self, connection):
  35. self.connection = connection
  36. def runCommand(self, command):
  37. # @todo try/except
  38. self.connection.send(command)
  39. while True:
  40. # don't let the user ctrl-c while we're waiting for a response
  41. try:
  42. if self.connection.poll(.5):
  43. return self.connection.recv()
  44. else:
  45. return None, "Timeout while attempting to communicate with bitbake server"
  46. except KeyboardInterrupt:
  47. pass
  48. class EventAdapter():
  49. """
  50. Adapter to wrap our event queue since the caller (bb.event) expects to
  51. call a send() method, but our actual queue only has put()
  52. """
  53. def __init__(self, queue):
  54. self.queue = queue
  55. def send(self, event):
  56. try:
  57. self.queue.put(event)
  58. except Exception as err:
  59. print("EventAdapter puked: %s" % str(err))
  60. class ProcessServer(Process):
  61. profile_filename = "profile.log"
  62. profile_processed_filename = "profile.log.processed"
  63. def __init__(self, command_channel, event_queue):
  64. Process.__init__(self)
  65. self.command_channel = command_channel
  66. self.event_queue = event_queue
  67. self.event = EventAdapter(event_queue)
  68. self._idlefunctions = {}
  69. self.quit = False
  70. self.keep_running = Event()
  71. self.keep_running.set()
  72. def register_idle_function(self, function, data):
  73. """Register a function to be called while the server is idle"""
  74. assert hasattr(function, '__call__')
  75. self._idlefunctions[function] = data
  76. def run(self):
  77. for event in bb.event.ui_queue:
  78. self.event_queue.put(event)
  79. self.event_handle = bb.event.register_UIHhandler(self)
  80. bb.cooker.server_main(self.cooker, self.main)
  81. def main(self):
  82. # Ignore SIGINT within the server, as all SIGINT handling is done by
  83. # the UI and communicated to us
  84. signal.signal(signal.SIGINT, signal.SIG_IGN)
  85. while self.keep_running.is_set():
  86. try:
  87. if self.command_channel.poll():
  88. command = self.command_channel.recv()
  89. self.runCommand(command)
  90. self.idle_commands(.1)
  91. except Exception:
  92. logger.exception('Running command %s', command)
  93. self.event_queue.cancel_join_thread()
  94. bb.event.unregister_UIHhandler(self.event_handle)
  95. self.command_channel.close()
  96. self.cooker.stop()
  97. self.idle_commands(.1)
  98. def idle_commands(self, delay):
  99. nextsleep = delay
  100. for function, data in self._idlefunctions.items():
  101. try:
  102. retval = function(self, data, False)
  103. if retval is False:
  104. del self._idlefunctions[function]
  105. elif retval is True:
  106. nextsleep = None
  107. elif nextsleep is None:
  108. continue
  109. elif retval < nextsleep:
  110. nextsleep = retval
  111. except SystemExit:
  112. raise
  113. except Exception:
  114. logger.exception('Running idle function')
  115. if nextsleep is not None:
  116. time.sleep(nextsleep)
  117. def runCommand(self, command):
  118. """
  119. Run a cooker command on the server
  120. """
  121. self.command_channel.send(self.cooker.command.runCommand(command))
  122. def stop(self):
  123. self.keep_running.clear()
  124. def bootstrap_2_6_6(self):
  125. """Pulled from python 2.6.6. Needed to ensure we have the fix from
  126. http://bugs.python.org/issue5313 when running on python version 2.6.2
  127. or lower."""
  128. try:
  129. self._children = set()
  130. self._counter = itertools.count(1)
  131. try:
  132. sys.stdin.close()
  133. sys.stdin = open(os.devnull)
  134. except (OSError, ValueError):
  135. pass
  136. multiprocessing._current_process = self
  137. util._finalizer_registry.clear()
  138. util._run_after_forkers()
  139. util.info('child process calling self.run()')
  140. try:
  141. self.run()
  142. exitcode = 0
  143. finally:
  144. util._exit_function()
  145. except SystemExit as e:
  146. if not e.args:
  147. exitcode = 1
  148. elif type(e.args[0]) is int:
  149. exitcode = e.args[0]
  150. else:
  151. sys.stderr.write(e.args[0] + '\n')
  152. sys.stderr.flush()
  153. exitcode = 1
  154. except:
  155. exitcode = 1
  156. import traceback
  157. sys.stderr.write('Process %s:\n' % self.name)
  158. sys.stderr.flush()
  159. traceback.print_exc()
  160. util.info('process exiting with exitcode %d' % exitcode)
  161. return exitcode
  162. # Python versions 2.6.0 through 2.6.2 suffer from a multiprocessing bug
  163. # which can result in a bitbake server hang during the parsing process
  164. if (2, 6, 0) <= sys.version_info < (2, 6, 3):
  165. _bootstrap = bootstrap_2_6_6
  166. class BitBakeServerConnection():
  167. def __init__(self, server):
  168. self.server = server
  169. self.procserver = server.server
  170. self.connection = ServerCommunicator(server.ui_channel)
  171. self.events = server.event_queue
  172. def terminate(self, force = False):
  173. signal.signal(signal.SIGINT, signal.SIG_IGN)
  174. self.procserver.stop()
  175. if force:
  176. self.procserver.join(0.5)
  177. if self.procserver.is_alive():
  178. self.procserver.terminate()
  179. self.procserver.join()
  180. else:
  181. self.procserver.join()
  182. while True:
  183. try:
  184. event = self.server.event_queue.get(block=False)
  185. except (Empty, IOError):
  186. break
  187. if isinstance(event, logging.LogRecord):
  188. logger.handle(event)
  189. self.server.ui_channel.close()
  190. self.server.event_queue.close()
  191. if force:
  192. sys.exit(1)
  193. # Wrap Queue to provide API which isn't server implementation specific
  194. class ProcessEventQueue(multiprocessing.queues.Queue):
  195. def waitEvent(self, timeout):
  196. try:
  197. return self.get(True, timeout)
  198. except Empty:
  199. return None
  200. def getEvent(self):
  201. try:
  202. return self.get(False)
  203. except Empty:
  204. return None
  205. class BitBakeServer(object):
  206. def initServer(self):
  207. # establish communication channels. We use bidirectional pipes for
  208. # ui <--> server command/response pairs
  209. # and a queue for server -> ui event notifications
  210. #
  211. self.ui_channel, self.server_channel = Pipe()
  212. self.event_queue = ProcessEventQueue(0)
  213. self.server = ProcessServer(self.server_channel, self.event_queue)
  214. def addcooker(self, cooker):
  215. self.cooker = cooker
  216. self.server.cooker = cooker
  217. def getServerIdleCB(self):
  218. return self.server.register_idle_function
  219. def saveConnectionDetails(self):
  220. return
  221. def detach(self, cooker_logfile):
  222. self.server.start()
  223. return
  224. def establishConnection(self):
  225. self.connection = BitBakeServerConnection(self)
  226. signal.signal(signal.SIGTERM, lambda i, s: self.connection.terminate(force=True))
  227. return self.connection
  228. def launchUI(self, uifunc, *args):
  229. return bb.cooker.server_main(self.cooker, uifunc, *args)