|
@@ -12,7 +12,9 @@ import errno
|
|
|
import signal
|
|
|
import pickle
|
|
|
import traceback
|
|
|
+import queue
|
|
|
from multiprocessing import Lock
|
|
|
+from threading import Thread
|
|
|
|
|
|
if sys.getfilesystemencoding() != "utf-8":
|
|
|
sys.exit("Please use a locale setting which supports utf-8.\nPython can't change the filesystem locale after loading so we need a utf-8 when python starts or things won't work.")
|
|
@@ -64,7 +66,7 @@ if 0:
|
|
|
consolelog.setFormatter(conlogformat)
|
|
|
logger.addHandler(consolelog)
|
|
|
|
|
|
-worker_queue = b""
|
|
|
+worker_queue = queue.Queue()
|
|
|
|
|
|
def worker_fire(event, d):
|
|
|
data = b"<event>" + pickle.dumps(event) + b"</event>"
|
|
@@ -73,21 +75,38 @@ def worker_fire(event, d):
|
|
|
def worker_fire_prepickled(event):
|
|
|
global worker_queue
|
|
|
|
|
|
- worker_queue = worker_queue + event
|
|
|
- worker_flush()
|
|
|
+ worker_queue.put(event)
|
|
|
|
|
|
-def worker_flush():
|
|
|
- global worker_queue, worker_pipe
|
|
|
+#
|
|
|
+# We can end up with write contention with the cooker, it can be trying to send commands
|
|
|
+# and we can be trying to send event data back. Therefore use a separate thread for writing
|
|
|
+# back data to cooker.
|
|
|
+#
|
|
|
+worker_thread_exit = False
|
|
|
|
|
|
- if not worker_queue:
|
|
|
- return
|
|
|
+def worker_flush(worker_queue):
|
|
|
+ worker_queue_int = b""
|
|
|
+ global worker_pipe, worker_thread_exit
|
|
|
|
|
|
- try:
|
|
|
- written = os.write(worker_pipe, worker_queue)
|
|
|
- worker_queue = worker_queue[written:]
|
|
|
- except (IOError, OSError) as e:
|
|
|
- if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
|
|
|
- raise
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
|
|
|
+ except queue.Empty:
|
|
|
+ pass
|
|
|
+ while (worker_queue_int or not worker_queue.empty()):
|
|
|
+ try:
|
|
|
+ if not worker_queue.empty():
|
|
|
+ worker_queue_int = worker_queue_int + worker_queue.get()
|
|
|
+ written = os.write(worker_pipe, worker_queue_int)
|
|
|
+ worker_queue_int = worker_queue_int[written:]
|
|
|
+ except (IOError, OSError) as e:
|
|
|
+ if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
|
|
|
+ raise
|
|
|
+ if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
|
|
|
+ return
|
|
|
+
|
|
|
+worker_thread = Thread(target=worker_flush, args=(worker_queue,))
|
|
|
+worker_thread.start()
|
|
|
|
|
|
def worker_child_fire(event, d):
|
|
|
global worker_pipe
|
|
@@ -353,7 +372,6 @@ class BitbakeWorker(object):
|
|
|
self.build_pipes[pipe].read()
|
|
|
if len(self.build_pids):
|
|
|
self.process_waitpid()
|
|
|
- worker_flush()
|
|
|
|
|
|
|
|
|
def handle_item(self, item, func):
|
|
@@ -458,8 +476,10 @@ except BaseException as e:
|
|
|
import traceback
|
|
|
sys.stderr.write(traceback.format_exc())
|
|
|
sys.stderr.write(str(e))
|
|
|
-while len(worker_queue):
|
|
|
- worker_flush()
|
|
|
+
|
|
|
+worker_thread_exit = True
|
|
|
+worker_thread.join()
|
|
|
+
|
|
|
workerlog_write("exitting")
|
|
|
sys.exit(0)
|
|
|
|