process.py 33 KB


  1. #
  2. # BitBake Process based server.
  3. #
  4. # Copyright (C) 2010 Bob Foerster <robert@erafx.com>
  5. #
  6. # SPDX-License-Identifier: GPL-2.0-only
  7. #
  8. """
  9. This module implements a multiprocessing.Process based server for bitbake.
  10. """
  11. import bb
  12. import bb.event
  13. import logging
  14. import multiprocessing
  15. import threading
  16. import array
  17. import os
  18. import sys
  19. import time
  20. import select
  21. import socket
  22. import subprocess
  23. import errno
  24. import re
  25. import datetime
  26. import pickle
  27. import traceback
  28. import gc
  29. import stat
  30. import bb.server.xmlrpcserver
  31. from bb import daemonize
  32. from multiprocessing import queues
  33. logger = logging.getLogger('BitBake')
  34. class ProcessTimeout(SystemExit):
  35. pass
  36. def currenttime():
  37. return datetime.datetime.now().strftime('%H:%M:%S.%f')
  38. def serverlog(msg):
  39. print(str(os.getpid()) + " " + currenttime() + " " + msg)
  40. #Seems a flush here triggers filesytem sync like behaviour and long hangs in the server
  41. #sys.stdout.flush()
  42. #
  43. # When we have lockfile issues, try and find infomation about which process is
  44. # using the lockfile
  45. #
  46. def get_lockfile_process_msg(lockfile):
  47. # Some systems may not have lsof available
  48. procs = None
  49. try:
  50. procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
  51. except subprocess.CalledProcessError:
  52. # File was deleted?
  53. pass
  54. except OSError as e:
  55. if e.errno != errno.ENOENT:
  56. raise
  57. if procs is None:
  58. # Fall back to fuser if lsof is unavailable
  59. try:
  60. procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
  61. except subprocess.CalledProcessError:
  62. # File was deleted?
  63. pass
  64. except OSError as e:
  65. if e.errno != errno.ENOENT:
  66. raise
  67. if procs:
  68. return procs.decode("utf-8")
  69. return None
  70. class idleFinish():
  71. def __init__(self, msg):
  72. self.msg = msg
  73. class ProcessServer():
  74. profile_filename = "profile.log"
  75. profile_processed_filename = "profile.log.processed"
  76. def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
  77. self.command_channel = False
  78. self.command_channel_reply = False
  79. self.quit = False
  80. self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
  81. self.next_heartbeat = time.time()
  82. self.event_handle = None
  83. self.hadanyui = False
  84. self.haveui = False
  85. self.maxuiwait = 30
  86. self.xmlrpc = False
  87. self.idle = None
  88. # Need a lock for _idlefuns changes
  89. self._idlefuns = {}
  90. self._idlefuncsLock = threading.Lock()
  91. self.idle_cond = threading.Condition(self._idlefuncsLock)
  92. self.bitbake_lock = lock
  93. self.bitbake_lock_name = lockname
  94. self.sock = sock
  95. self.sockname = sockname
  96. # It is possible the directory may be renamed. Cache the inode of the socket file
  97. # so we can tell if things changed.
  98. self.sockinode = os.stat(self.sockname)[stat.ST_INO]
  99. self.server_timeout = server_timeout
  100. self.timeout = self.server_timeout
  101. self.xmlrpcinterface = xmlrpcinterface
  102. def register_idle_function(self, function, data):
  103. """Register a function to be called while the server is idle"""
  104. assert hasattr(function, '__call__')
  105. with bb.utils.lock_timeout(self._idlefuncsLock):
  106. self._idlefuns[function] = data
  107. serverlog("Registering idle function %s" % str(function))
  108. def run(self):
  109. if self.xmlrpcinterface[0]:
  110. self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
  111. serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
  112. try:
  113. self.bitbake_lock.seek(0)
  114. self.bitbake_lock.truncate()
  115. if self.xmlrpc:
  116. self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
  117. else:
  118. self.bitbake_lock.write("%s\n" % (os.getpid()))
  119. self.bitbake_lock.flush()
  120. except Exception as e:
  121. serverlog("Error writing to lock file: %s" % str(e))
  122. pass
  123. if self.cooker.configuration.profile:
  124. try:
  125. import cProfile as profile
  126. except:
  127. import profile
  128. prof = profile.Profile()
  129. ret = profile.Profile.runcall(prof, self.main)
  130. prof.dump_stats("profile.log")
  131. bb.utils.process_profilelog("profile.log")
  132. serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
  133. else:
  134. ret = self.main()
  135. return ret
  136. def _idle_check(self):
  137. return len(self._idlefuns) == 0 and self.cooker.command.currentAsyncCommand is None
  138. def wait_for_idle(self, timeout=30):
  139. # Wait for the idle loop to have cleared
  140. with bb.utils.lock_timeout(self._idlefuncsLock):
  141. return self.idle_cond.wait_for(self._idle_check, timeout) is not False
  142. def set_async_cmd(self, cmd):
  143. with bb.utils.lock_timeout(self._idlefuncsLock):
  144. ret = self.idle_cond.wait_for(self._idle_check, 30)
  145. if ret is False:
  146. return False
  147. self.cooker.command.currentAsyncCommand = cmd
  148. return True
  149. def clear_async_cmd(self):
  150. with bb.utils.lock_timeout(self._idlefuncsLock):
  151. self.cooker.command.currentAsyncCommand = None
  152. self.idle_cond.notify_all()
  153. def get_async_cmd(self):
  154. with bb.utils.lock_timeout(self._idlefuncsLock):
  155. return self.cooker.command.currentAsyncCommand
  156. def main(self):
  157. self.cooker.pre_serve()
  158. bb.utils.set_process_name("Cooker")
  159. ready = []
  160. newconnections = []
  161. self.controllersock = False
  162. fds = [self.sock]
  163. if self.xmlrpc:
  164. fds.append(self.xmlrpc)
  165. seendata = False
  166. serverlog("Entering server connection loop")
  167. serverlog("Lockfile is: %s\nSocket is %s (%s)" % (self.bitbake_lock_name, self.sockname, os.path.exists(self.sockname)))
  168. def disconnect_client(self, fds):
  169. serverlog("Disconnecting Client (socket: %s)" % os.path.exists(self.sockname))
  170. if self.controllersock:
  171. fds.remove(self.controllersock)
  172. self.controllersock.close()
  173. self.controllersock = False
  174. if self.haveui:
  175. # Wait for the idle loop to have cleared (30s max)
  176. if not self.wait_for_idle(30):
  177. serverlog("Idle loop didn't finish queued commands after 30s, exiting.")
  178. self.quit = True
  179. fds.remove(self.command_channel)
  180. bb.event.unregister_UIHhandler(self.event_handle, True)
  181. self.command_channel_reply.writer.close()
  182. self.event_writer.writer.close()
  183. self.command_channel.close()
  184. self.command_channel = False
  185. del self.event_writer
  186. self.lastui = time.time()
  187. self.cooker.clientComplete()
  188. self.haveui = False
  189. ready = select.select(fds,[],[],0)[0]
  190. if newconnections and not self.quit:
  191. serverlog("Starting new client")
  192. conn = newconnections.pop(-1)
  193. fds.append(conn)
  194. self.controllersock = conn
  195. elif not self.timeout and not ready:
  196. serverlog("No timeout, exiting.")
  197. self.quit = True
  198. self.lastui = time.time()
  199. while not self.quit:
  200. if self.sock in ready:
  201. while select.select([self.sock],[],[],0)[0]:
  202. controllersock, address = self.sock.accept()
  203. if self.controllersock:
  204. serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
  205. newconnections.append(controllersock)
  206. else:
  207. serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
  208. self.controllersock = controllersock
  209. fds.append(controllersock)
  210. if self.controllersock in ready:
  211. try:
  212. serverlog("Processing Client")
  213. ui_fds = recvfds(self.controllersock, 3)
  214. serverlog("Connecting Client")
  215. # Where to write events to
  216. writer = ConnectionWriter(ui_fds[0])
  217. self.event_handle = bb.event.register_UIHhandler(writer, True)
  218. self.event_writer = writer
  219. # Where to read commands from
  220. reader = ConnectionReader(ui_fds[1])
  221. fds.append(reader)
  222. self.command_channel = reader
  223. # Where to send command return values to
  224. writer = ConnectionWriter(ui_fds[2])
  225. self.command_channel_reply = writer
  226. self.haveui = True
  227. self.hadanyui = True
  228. except (EOFError, OSError):
  229. disconnect_client(self, fds)
  230. if not self.timeout == -1.0 and not self.haveui and self.timeout and \
  231. (self.lastui + self.timeout) < time.time():
  232. serverlog("Server timeout, exiting.")
  233. self.quit = True
  234. # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
  235. # one. We have had issue with processes hanging indefinitely so timing out UI-less
  236. # servers is useful.
  237. if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
  238. serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
  239. self.quit = True
  240. if self.command_channel in ready:
  241. try:
  242. command = self.command_channel.get()
  243. except EOFError:
  244. # Client connection shutting down
  245. ready = []
  246. disconnect_client(self, fds)
  247. continue
  248. if command[0] == "terminateServer":
  249. self.quit = True
  250. continue
  251. try:
  252. serverlog("Running command %s" % command)
  253. reply = self.cooker.command.runCommand(command, self)
  254. serverlog("Sending reply %s" % repr(reply))
  255. self.command_channel_reply.send(reply)
  256. serverlog("Command Completed (socket: %s)" % os.path.exists(self.sockname))
  257. except Exception as e:
  258. stack = traceback.format_exc()
  259. serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
  260. logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
  261. if self.xmlrpc in ready:
  262. self.xmlrpc.handle_requests()
  263. if not seendata and hasattr(self.cooker, "data"):
  264. heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
  265. if heartbeat_event:
  266. try:
  267. self.heartbeat_seconds = float(heartbeat_event)
  268. except:
  269. bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
  270. self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
  271. try:
  272. if self.timeout:
  273. self.timeout = float(self.timeout)
  274. except:
  275. bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
  276. seendata = True
  277. if not self.idle:
  278. self.idle = threading.Thread(target=self.idle_thread)
  279. self.idle.start()
  280. elif self.idle and not self.idle.is_alive():
  281. serverlog("Idle thread terminated, main thread exiting too")
  282. bb.error("Idle thread terminated, main thread exiting too")
  283. self.quit = True
  284. nextsleep = 0.1
  285. if self.xmlrpc:
  286. nextsleep = self.xmlrpc.get_timeout(nextsleep)
  287. try:
  288. ready = select.select(fds,[],[],nextsleep)[0]
  289. except InterruptedError:
  290. # Ignore EINTR
  291. ready = []
  292. if self.idle:
  293. self.idle.join()
  294. serverlog("Exiting (socket: %s)" % os.path.exists(self.sockname))
  295. # Remove the socket file so we don't get any more connections to avoid races
  296. # The build directory could have been renamed so if the file isn't the one we created
  297. # we shouldn't delete it.
  298. try:
  299. sockinode = os.stat(self.sockname)[stat.ST_INO]
  300. if sockinode == self.sockinode:
  301. os.unlink(self.sockname)
  302. else:
  303. serverlog("bitbake.sock inode mismatch (%s vs %s), not deleting." % (sockinode, self.sockinode))
  304. except Exception as err:
  305. serverlog("Removing socket file '%s' failed (%s)" % (self.sockname, err))
  306. self.sock.close()
  307. try:
  308. self.cooker.shutdown(True, idle=False)
  309. self.cooker.notifier.stop()
  310. self.cooker.confignotifier.stop()
  311. except:
  312. pass
  313. self.cooker.post_serve()
  314. if len(threading.enumerate()) != 1:
  315. serverlog("More than one thread left?: " + str(threading.enumerate()))
  316. # Flush logs before we release the lock
  317. sys.stdout.flush()
  318. sys.stderr.flush()
  319. # Finally release the lockfile but warn about other processes holding it open
  320. lock = self.bitbake_lock
  321. lockfile = self.bitbake_lock_name
  322. def get_lock_contents(lockfile):
  323. try:
  324. with open(lockfile, "r") as f:
  325. return f.readlines()
  326. except FileNotFoundError:
  327. return None
  328. lock.close()
  329. lock = None
  330. while not lock:
  331. i = 0
  332. lock = None
  333. if not os.path.exists(os.path.basename(lockfile)):
  334. serverlog("Lockfile directory gone, exiting.")
  335. return
  336. while not lock and i < 30:
  337. lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
  338. if not lock:
  339. newlockcontents = get_lock_contents(lockfile)
  340. if not newlockcontents[0].startswith([f"{os.getpid()}\n", f"{os.getpid()} "]):
  341. # A new server was started, the lockfile contents changed, we can exit
  342. serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
  343. return
  344. time.sleep(0.1)
  345. i += 1
  346. if lock:
  347. # We hold the lock so we can remove the file (hide stale pid data)
  348. # via unlockfile.
  349. bb.utils.unlockfile(lock)
  350. serverlog("Exiting as we could obtain the lock")
  351. return
  352. if not lock:
  353. procs = get_lockfile_process_msg(lockfile)
  354. msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
  355. if procs:
  356. msg.append(":\n%s" % procs)
  357. serverlog("".join(msg))
  358. def idle_thread(self):
  359. if self.cooker.configuration.profile:
  360. try:
  361. import cProfile as profile
  362. except:
  363. import profile
  364. prof = profile.Profile()
  365. ret = profile.Profile.runcall(prof, self.idle_thread_internal)
  366. prof.dump_stats("profile-mainloop.log")
  367. bb.utils.process_profilelog("profile-mainloop.log")
  368. serverlog("Raw profiling information saved to profile-mainloop.log and processed statistics to profile-mainloop.log.processed")
  369. else:
  370. self.idle_thread_internal()
  371. def idle_thread_internal(self):
  372. def remove_idle_func(function):
  373. with bb.utils.lock_timeout(self._idlefuncsLock):
  374. del self._idlefuns[function]
  375. self.idle_cond.notify_all()
  376. while not self.quit:
  377. nextsleep = 0.1
  378. fds = []
  379. with bb.utils.lock_timeout(self._idlefuncsLock):
  380. items = list(self._idlefuns.items())
  381. for function, data in items:
  382. try:
  383. retval = function(self, data, False)
  384. if isinstance(retval, idleFinish):
  385. serverlog("Removing idle function %s at idleFinish" % str(function))
  386. remove_idle_func(function)
  387. self.cooker.command.finishAsyncCommand(retval.msg)
  388. nextsleep = None
  389. elif retval is False:
  390. serverlog("Removing idle function %s" % str(function))
  391. remove_idle_func(function)
  392. nextsleep = None
  393. elif retval is True:
  394. nextsleep = None
  395. elif isinstance(retval, float) and nextsleep:
  396. if (retval < nextsleep):
  397. nextsleep = retval
  398. elif nextsleep is None:
  399. continue
  400. else:
  401. fds = fds + retval
  402. except SystemExit:
  403. raise
  404. except Exception as exc:
  405. if not isinstance(exc, bb.BBHandledException):
  406. logger.exception('Running idle function')
  407. remove_idle_func(function)
  408. serverlog("Exception %s broke the idle_thread, exiting" % traceback.format_exc())
  409. self.quit = True
  410. # Create new heartbeat event?
  411. now = time.time()
  412. if items and bb.event._heartbeat_enabled and now >= self.next_heartbeat:
  413. # We might have missed heartbeats. Just trigger once in
  414. # that case and continue after the usual delay.
  415. self.next_heartbeat += self.heartbeat_seconds
  416. if self.next_heartbeat <= now:
  417. self.next_heartbeat = now + self.heartbeat_seconds
  418. if hasattr(self.cooker, "data"):
  419. heartbeat = bb.event.HeartbeatEvent(now)
  420. try:
  421. bb.event.fire(heartbeat, self.cooker.data)
  422. except Exception as exc:
  423. if not isinstance(exc, bb.BBHandledException):
  424. logger.exception('Running heartbeat function')
  425. serverlog("Exception %s broke in idle_thread, exiting" % traceback.format_exc())
  426. self.quit = True
  427. if nextsleep and bb.event._heartbeat_enabled and now + nextsleep > self.next_heartbeat:
  428. # Shorten timeout so that we we wake up in time for
  429. # the heartbeat.
  430. nextsleep = self.next_heartbeat - now
  431. if nextsleep is not None:
  432. select.select(fds,[],[],nextsleep)[0]
  433. class ServerCommunicator():
  434. def __init__(self, connection, recv):
  435. self.connection = connection
  436. self.recv = recv
  437. def runCommand(self, command):
  438. try:
  439. self.connection.send(command)
  440. except BrokenPipeError as e:
  441. raise BrokenPipeError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
  442. if not self.recv.poll(30):
  443. logger.info("No reply from server in 30s (for command %s at %s)" % (command[0], currenttime()))
  444. if not self.recv.poll(30):
  445. raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s at %s)" % currenttime())
  446. try:
  447. ret, exc = self.recv.get()
  448. except EOFError as e:
  449. raise EOFError("bitbake-server might have died or been forcibly stopped, ie. OOM killed") from e
  450. # Should probably turn all exceptions in exc back into exceptions?
  451. # For now, at least handle BBHandledException
  452. if exc and ("BBHandledException" in exc or "SystemExit" in exc):
  453. raise bb.BBHandledException()
  454. return ret, exc
  455. def updateFeatureSet(self, featureset):
  456. _, error = self.runCommand(["setFeatures", featureset])
  457. if error:
  458. logger.error("Unable to set the cooker to the correct featureset: %s" % error)
  459. raise BaseException(error)
  460. def getEventHandle(self):
  461. handle, error = self.runCommand(["getUIHandlerNum"])
  462. if error:
  463. logger.error("Unable to get UI Handler Number: %s" % error)
  464. raise BaseException(error)
  465. return handle
  466. def terminateServer(self):
  467. self.connection.send(['terminateServer'])
  468. return
  469. class BitBakeProcessServerConnection(object):
  470. def __init__(self, ui_channel, recv, eq, sock):
  471. self.connection = ServerCommunicator(ui_channel, recv)
  472. self.events = eq
  473. # Save sock so it doesn't get gc'd for the life of our connection
  474. self.socket_connection = sock
  475. def terminate(self):
  476. self.events.close()
  477. self.socket_connection.close()
  478. self.connection.connection.close()
  479. self.connection.recv.close()
  480. return
  481. start_log_format = '--- Starting bitbake server pid %s at %s ---'
  482. start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
  483. class BitBakeServer(object):
  484. def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface, profile):
  485. self.server_timeout = server_timeout
  486. self.xmlrpcinterface = xmlrpcinterface
  487. self.featureset = featureset
  488. self.sockname = sockname
  489. self.bitbake_lock = lock
  490. self.profile = profile
  491. self.readypipe, self.readypipein = os.pipe()
  492. # Place the log in the builddirectory alongside the lock file
  493. logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
  494. self.logfile = logfile
  495. startdatetime = datetime.datetime.now()
  496. bb.daemonize.createDaemon(self._startServer, logfile)
  497. self.bitbake_lock.close()
  498. os.close(self.readypipein)
  499. ready = ConnectionReader(self.readypipe)
  500. r = ready.poll(5)
  501. if not r:
  502. bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
  503. r = ready.poll(90)
  504. if r:
  505. try:
  506. r = ready.get()
  507. except EOFError:
  508. # Trap the child exiting/closing the pipe and error out
  509. r = None
  510. if not r or r[0] != "r":
  511. ready.close()
  512. bb.error("Unable to start bitbake server (%s)" % str(r))
  513. if os.path.exists(logfile):
  514. logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
  515. started = False
  516. lines = []
  517. lastlines = []
  518. with open(logfile, "r") as f:
  519. for line in f:
  520. if started:
  521. lines.append(line)
  522. else:
  523. lastlines.append(line)
  524. res = logstart_re.search(line.rstrip())
  525. if res:
  526. ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
  527. if ldatetime >= startdatetime:
  528. started = True
  529. lines.append(line)
  530. if len(lastlines) > 60:
  531. lastlines = lastlines[-60:]
  532. if lines:
  533. if len(lines) > 60:
  534. bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
  535. else:
  536. bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
  537. elif lastlines:
  538. bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
  539. else:
  540. bb.error("%s doesn't exist" % logfile)
  541. raise SystemExit(1)
  542. ready.close()
  543. def _startServer(self):
  544. os.close(self.readypipe)
  545. os.set_inheritable(self.bitbake_lock.fileno(), True)
  546. os.set_inheritable(self.readypipein, True)
  547. serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
  548. os.execl(sys.executable, sys.executable, serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(int(self.profile)), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
  549. def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface, profile):
  550. import bb.cookerdata
  551. import bb.cooker
  552. serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
  553. try:
  554. bitbake_lock = os.fdopen(lockfd, "w")
  555. # Create server control socket
  556. if os.path.exists(sockname):
  557. serverlog("WARNING: removing existing socket file '%s'" % sockname)
  558. os.unlink(sockname)
  559. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  560. # AF_UNIX has path length issues so chdir here to workaround
  561. cwd = os.getcwd()
  562. try:
  563. os.chdir(os.path.dirname(sockname))
  564. sock.bind(os.path.basename(sockname))
  565. finally:
  566. os.chdir(cwd)
  567. sock.listen(1)
  568. server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
  569. writer = ConnectionWriter(readypipeinfd)
  570. try:
  571. featureset = []
  572. cooker = bb.cooker.BBCooker(featureset, server)
  573. cooker.configuration.profile = profile
  574. except bb.BBHandledException:
  575. return None
  576. writer.send("r")
  577. writer.close()
  578. server.cooker = cooker
  579. serverlog("Started bitbake server pid %d" % os.getpid())
  580. server.run()
  581. finally:
  582. # Flush any messages/errors to the logfile before exit
  583. sys.stdout.flush()
  584. sys.stderr.flush()
  585. def connectProcessServer(sockname, featureset):
  586. # Connect to socket
  587. sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  588. # AF_UNIX has path length issues so chdir here to workaround
  589. cwd = os.getcwd()
  590. readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
  591. eq = command_chan_recv = command_chan = None
  592. sock.settimeout(10)
  593. try:
  594. try:
  595. os.chdir(os.path.dirname(sockname))
  596. finished = False
  597. while not finished:
  598. try:
  599. sock.connect(os.path.basename(sockname))
  600. finished = True
  601. except IOError as e:
  602. if e.errno == errno.EWOULDBLOCK:
  603. pass
  604. raise
  605. finally:
  606. os.chdir(cwd)
  607. # Send an fd for the remote to write events to
  608. readfd, writefd = os.pipe()
  609. eq = BBUIEventQueue(readfd)
  610. # Send an fd for the remote to recieve commands from
  611. readfd1, writefd1 = os.pipe()
  612. command_chan = ConnectionWriter(writefd1)
  613. # Send an fd for the remote to write commands results to
  614. readfd2, writefd2 = os.pipe()
  615. command_chan_recv = ConnectionReader(readfd2)
  616. sendfds(sock, [writefd, readfd1, writefd2])
  617. server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
  618. # Close the ends of the pipes we won't use
  619. for i in [writefd, readfd1, writefd2]:
  620. os.close(i)
  621. server_connection.connection.updateFeatureSet(featureset)
  622. except (Exception, SystemExit) as e:
  623. if command_chan_recv:
  624. command_chan_recv.close()
  625. if command_chan:
  626. command_chan.close()
  627. for i in [writefd, readfd1, writefd2]:
  628. try:
  629. if i:
  630. os.close(i)
  631. except OSError:
  632. pass
  633. sock.close()
  634. raise
  635. return server_connection
  636. def sendfds(sock, fds):
  637. '''Send an array of fds over an AF_UNIX socket.'''
  638. fds = array.array('i', fds)
  639. msg = bytes([len(fds) % 256])
  640. sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
  641. def recvfds(sock, size):
  642. '''Receive an array of fds over an AF_UNIX socket.'''
  643. a = array.array('i')
  644. bytes_size = a.itemsize * size
  645. msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
  646. if not msg and not ancdata:
  647. raise EOFError
  648. try:
  649. if len(ancdata) != 1:
  650. raise RuntimeError('received %d items of ancdata' %
  651. len(ancdata))
  652. cmsg_level, cmsg_type, cmsg_data = ancdata[0]
  653. if (cmsg_level == socket.SOL_SOCKET and
  654. cmsg_type == socket.SCM_RIGHTS):
  655. if len(cmsg_data) % a.itemsize != 0:
  656. raise ValueError
  657. a.frombytes(cmsg_data)
  658. assert len(a) % 256 == msg[0]
  659. return list(a)
  660. except (ValueError, IndexError):
  661. pass
  662. raise RuntimeError('Invalid data received')
  663. class BBUIEventQueue:
  664. def __init__(self, readfd):
  665. self.eventQueue = []
  666. self.eventQueueLock = threading.Lock()
  667. self.eventQueueNotify = threading.Event()
  668. self.reader = ConnectionReader(readfd)
  669. self.t = threading.Thread()
  670. self.t.run = self.startCallbackHandler
  671. self.t.start()
  672. def getEvent(self):
  673. with bb.utils.lock_timeout(self.eventQueueLock):
  674. if len(self.eventQueue) == 0:
  675. return None
  676. item = self.eventQueue.pop(0)
  677. if len(self.eventQueue) == 0:
  678. self.eventQueueNotify.clear()
  679. return item
  680. def waitEvent(self, delay):
  681. self.eventQueueNotify.wait(delay)
  682. return self.getEvent()
  683. def queue_event(self, event):
  684. with bb.utils.lock_timeout(self.eventQueueLock):
  685. self.eventQueue.append(event)
  686. self.eventQueueNotify.set()
  687. def send_event(self, event):
  688. self.queue_event(pickle.loads(event))
  689. def startCallbackHandler(self):
  690. bb.utils.set_process_name("UIEventQueue")
  691. while True:
  692. try:
  693. ready = self.reader.wait(0.25)
  694. if ready:
  695. event = self.reader.get()
  696. self.queue_event(event)
  697. except (EOFError, OSError, TypeError):
  698. # Easiest way to exit is to close the file descriptor to cause an exit
  699. break
  700. def close(self):
  701. self.reader.close()
  702. self.t.join()
  703. class ConnectionReader(object):
  704. def __init__(self, fd):
  705. self.reader = multiprocessing.connection.Connection(fd, writable=False)
  706. self.rlock = multiprocessing.Lock()
  707. def wait(self, timeout=None):
  708. return multiprocessing.connection.wait([self.reader], timeout)
  709. def poll(self, timeout=None):
  710. return self.reader.poll(timeout)
  711. def get(self):
  712. with bb.utils.lock_timeout(self.rlock):
  713. res = self.reader.recv_bytes()
  714. return multiprocessing.reduction.ForkingPickler.loads(res)
  715. def fileno(self):
  716. return self.reader.fileno()
  717. def close(self):
  718. return self.reader.close()
  719. class ConnectionWriter(object):
  720. def __init__(self, fd):
  721. self.writer = multiprocessing.connection.Connection(fd, readable=False)
  722. self.wlock = multiprocessing.Lock()
  723. # Why bb.event needs this I have no idea
  724. self.event = self
  725. def _send(self, obj):
  726. gc.disable()
  727. with bb.utils.lock_timeout(self.wlock):
  728. self.writer.send_bytes(obj)
  729. gc.enable()
  730. def send(self, obj):
  731. obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
  732. # See notes/code in CookerParser
  733. # We must not terminate holding this lock else processes will hang.
  734. # For SIGTERM, raising afterwards avoids this.
  735. # For SIGINT, we don't want to have written partial data to the pipe.
  736. # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
  737. process = multiprocessing.current_process()
  738. if process and hasattr(process, "queue_signals"):
  739. with bb.utils.lock_timeout(process.signal_threadlock):
  740. process.queue_signals = True
  741. self._send(obj)
  742. process.queue_signals = False
  743. while len(process.signal_received) > 0:
  744. sig = process.signal_received.pop()
  745. process.handle_sig(sig, None)
  746. else:
  747. self._send(obj)
  748. def fileno(self):
  749. return self.writer.fileno()
  750. def close(self):
  751. return self.writer.close()