process.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #
  2. # BitBake Process based server.
  3. #
  4. # Copyright (C) 2010 Bob Foerster <robert@erafx.com>
  5. #
  6. # This program is free software; you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License version 2 as
  8. # published by the Free Software Foundation.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License along
  16. # with this program; if not, write to the Free Software Foundation, Inc.,
  17. # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
  18. """
  19. This module implements a multiprocessing.Process based server for bitbake.
  20. """
  21. import bb
  22. import bb.event
  23. import itertools
  24. import logging
  25. import multiprocessing
  26. import os
  27. import signal
  28. import sys
  29. import time
  30. import select
  31. from Queue import Empty
  32. from multiprocessing import Event, Process, util, Queue, Pipe, queues, Manager
  33. from . import BitBakeBaseServer, BitBakeBaseServerConnection, BaseImplServer
  34. logger = logging.getLogger('BitBake')
  35. class ServerCommunicator():
  36. def __init__(self, connection, event_handle, server):
  37. self.connection = connection
  38. self.event_handle = event_handle
  39. self.server = server
  40. def runCommand(self, command):
  41. # @todo try/except
  42. self.connection.send(command)
  43. if not self.server.is_alive():
  44. raise SystemExit
  45. while True:
  46. # don't let the user ctrl-c while we're waiting for a response
  47. try:
  48. if self.connection.poll(20):
  49. return self.connection.recv()
  50. else:
  51. bb.fatal("Timeout while attempting to communicate with bitbake server")
  52. except KeyboardInterrupt:
  53. pass
  54. def getEventHandle(self):
  55. return self.event_handle.value
  56. class EventAdapter():
  57. """
  58. Adapter to wrap our event queue since the caller (bb.event) expects to
  59. call a send() method, but our actual queue only has put()
  60. """
  61. def __init__(self, queue):
  62. self.queue = queue
  63. def send(self, event):
  64. try:
  65. self.queue.put(event)
  66. except Exception as err:
  67. print("EventAdapter puked: %s" % str(err))
  68. class ProcessServer(Process, BaseImplServer):
  69. profile_filename = "profile.log"
  70. profile_processed_filename = "profile.log.processed"
  71. def __init__(self, command_channel, event_queue, featurelist):
  72. BaseImplServer.__init__(self)
  73. Process.__init__(self)
  74. self.command_channel = command_channel
  75. self.event_queue = event_queue
  76. self.event = EventAdapter(event_queue)
  77. self.featurelist = featurelist
  78. self.quit = False
  79. self.quitin, self.quitout = Pipe()
  80. self.event_handle = multiprocessing.Value("i")
  81. def run(self):
  82. for event in bb.event.ui_queue:
  83. self.event_queue.put(event)
  84. self.event_handle.value = bb.event.register_UIHhandler(self)
  85. bb.cooker.server_main(self.cooker, self.main)
  86. def main(self):
  87. # Ignore SIGINT within the server, as all SIGINT handling is done by
  88. # the UI and communicated to us
  89. self.quitin.close()
  90. signal.signal(signal.SIGINT, signal.SIG_IGN)
  91. while not self.quit:
  92. try:
  93. if self.command_channel.poll():
  94. command = self.command_channel.recv()
  95. self.runCommand(command)
  96. if self.quitout.poll():
  97. self.quitout.recv()
  98. self.quit = True
  99. self.idle_commands(.1, [self.command_channel, self.quitout])
  100. except Exception:
  101. logger.exception('Running command %s', command)
  102. self.event_queue.close()
  103. bb.event.unregister_UIHhandler(self.event_handle.value)
  104. self.command_channel.close()
  105. self.cooker.shutdown(True)
  106. def idle_commands(self, delay, fds=None):
  107. nextsleep = delay
  108. if not fds:
  109. fds = []
  110. for function, data in self._idlefuns.items():
  111. try:
  112. retval = function(self, data, False)
  113. if retval is False:
  114. del self._idlefuns[function]
  115. nextsleep = None
  116. elif retval is True:
  117. nextsleep = None
  118. elif isinstance(retval, float):
  119. if (retval < nextsleep):
  120. nextsleep = retval
  121. elif nextsleep is None:
  122. continue
  123. else:
  124. fds = fds + retval
  125. except SystemExit:
  126. raise
  127. except Exception as exc:
  128. if not isinstance(exc, bb.BBHandledException):
  129. logger.exception('Running idle function')
  130. del self._idlefuns[function]
  131. self.quit = True
  132. if nextsleep is not None:
  133. select.select(fds,[],[],nextsleep)
  134. def runCommand(self, command):
  135. """
  136. Run a cooker command on the server
  137. """
  138. self.command_channel.send(self.cooker.command.runCommand(command))
  139. def stop(self):
  140. self.quitin.send("quit")
  141. self.quitin.close()
  142. class BitBakeProcessServerConnection(BitBakeBaseServerConnection):
  143. def __init__(self, serverImpl, ui_channel, event_queue):
  144. self.procserver = serverImpl
  145. self.ui_channel = ui_channel
  146. self.event_queue = event_queue
  147. self.connection = ServerCommunicator(self.ui_channel, self.procserver.event_handle, self.procserver)
  148. self.events = self.event_queue
  149. def sigterm_terminate(self):
  150. bb.error("UI received SIGTERM")
  151. self.terminate()
  152. def terminate(self):
  153. def flushevents():
  154. while True:
  155. try:
  156. event = self.event_queue.get(block=False)
  157. except (Empty, IOError):
  158. break
  159. if isinstance(event, logging.LogRecord):
  160. logger.handle(event)
  161. signal.signal(signal.SIGINT, signal.SIG_IGN)
  162. self.procserver.stop()
  163. while self.procserver.is_alive():
  164. flushevents()
  165. self.procserver.join(0.1)
  166. self.ui_channel.close()
  167. self.event_queue.close()
  168. self.event_queue.setexit()
  169. # Wrap Queue to provide API which isn't server implementation specific
  170. class ProcessEventQueue(multiprocessing.queues.Queue):
  171. def __init__(self, maxsize):
  172. multiprocessing.queues.Queue.__init__(self, maxsize)
  173. self.exit = False
  174. def setexit(self):
  175. self.exit = True
  176. def waitEvent(self, timeout):
  177. if self.exit:
  178. sys.exit(1)
  179. try:
  180. if not self.server.is_alive():
  181. self.setexit()
  182. return None
  183. return self.get(True, timeout)
  184. except Empty:
  185. return None
  186. def getEvent(self):
  187. try:
  188. if not self.server.is_alive():
  189. self.setexit()
  190. return None
  191. return self.get(False)
  192. except Empty:
  193. return None
  194. class BitBakeServer(BitBakeBaseServer):
  195. def initServer(self):
  196. # establish communication channels. We use bidirectional pipes for
  197. # ui <--> server command/response pairs
  198. # and a queue for server -> ui event notifications
  199. #
  200. self.ui_channel, self.server_channel = Pipe()
  201. self.event_queue = ProcessEventQueue(0)
  202. self.serverImpl = ProcessServer(self.server_channel, self.event_queue, None)
  203. self.event_queue.server = self.serverImpl
  204. def detach(self):
  205. self.serverImpl.start()
  206. return
  207. def establishConnection(self, featureset):
  208. self.connection = BitBakeProcessServerConnection(self.serverImpl, self.ui_channel, self.event_queue)
  209. _, error = self.connection.connection.runCommand(["setFeatures", featureset])
  210. if error:
  211. logger.error("Unable to set the cooker to the correct featureset: %s" % error)
  212. raise BaseException(error)
  213. signal.signal(signal.SIGTERM, lambda i, s: self.connection.sigterm_terminate())
  214. return self.connection