|
@@ -10,6 +10,7 @@ import bb
|
|
|
import select
|
|
|
import errno
|
|
|
import signal
|
|
|
+from multiprocessing import Lock
|
|
|
|
|
|
# Users shouldn't be running this code directly
|
|
|
if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
|
|
@@ -44,6 +45,9 @@ except ImportError:
|
|
|
|
|
|
worker_pipe = sys.stdout.fileno()
|
|
|
bb.utils.nonblockingfd(worker_pipe)
|
|
|
+# Need to guard against multiprocessing being used in child processes
|
|
|
+# and multiple processes trying to write to the parent at the same time
|
|
|
+worker_pipe_lock = None
|
|
|
|
|
|
handler = bb.event.LogHandler()
|
|
|
logger.addHandler(handler)
|
|
@@ -85,10 +89,13 @@ def worker_flush():
|
|
|
|
|
|
def worker_child_fire(event, d):
|
|
|
global worker_pipe
|
|
|
+ global worker_pipe_lock
|
|
|
|
|
|
data = "<event>" + pickle.dumps(event) + "</event>"
|
|
|
try:
|
|
|
+ worker_pipe_lock.acquire()
|
|
|
worker_pipe.write(data)
|
|
|
+ worker_pipe_lock.release()
|
|
|
except IOError:
|
|
|
sigterm_handler(None, None)
|
|
|
raise
|
|
@@ -157,6 +164,7 @@ def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdat
|
|
|
if pid == 0:
|
|
|
def child():
|
|
|
global worker_pipe
|
|
|
+ global worker_pipe_lock
|
|
|
pipein.close()
|
|
|
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler)
|
|
@@ -169,6 +177,7 @@ def fork_off_task(cfg, data, workerdata, fn, task, taskname, appends, taskdepdat
|
|
|
bb.event.worker_pid = os.getpid()
|
|
|
bb.event.worker_fire = worker_child_fire
|
|
|
worker_pipe = pipeout
|
|
|
+ worker_pipe_lock = Lock()
|
|
|
|
|
|
# Make the child the process group leader and ensure no
|
|
|
# child process will be controlled by the current terminal
|