worker: handle worker failures more aggressively
authorBryan O'Sullivan <bryano@fb.com>
Wed, 20 Feb 2013 11:31:34 -0800
changeset 18709 9955fc5ee24b
parent 18708 86524a70c0f6
child 18710 49ef9d0ca815
worker: handle worker failures more aggressively We now wait for worker processes in a separate thread, so that we can spot failures in a timely way, wihout waiting for the progress pipe to drain. If a worker fails, we recover the pre-parallel-update behaviour of failing early by killing its peers before propagating the failure.
mercurial/worker.py
--- a/mercurial/worker.py	Wed Feb 20 11:31:31 2013 -0800
+++ b/mercurial/worker.py	Wed Feb 20 11:31:34 2013 -0800
@@ -6,7 +6,7 @@
 # GNU General Public License version 2 or any later version.
 
 from i18n import _
-import os, signal, sys, util
+import os, signal, sys, threading, util
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -77,6 +77,7 @@
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
+    pids, problem = [], [0]
     for pargs in partition(args, workers):
         pid = os.fork()
         if pid == 0:
@@ -88,26 +89,40 @@
                 os._exit(0)
             except KeyboardInterrupt:
                 os._exit(255)
+        pids.append(pid)
+    pids.reverse()
     os.close(wfd)
     fp = os.fdopen(rfd, 'rb', 0)
+    def killworkers():
+        # if one worker bails, there's no good reason to wait for the rest
+        for p in pids:
+            try:
+                os.kill(p, signal.SIGTERM)
+            except OSError, err:
+                if err.errno != errno.ESRCH:
+                    raise
+    def waitforworkers():
+        for _ in pids:
+            st = _exitstatus(os.wait()[1])
+            if st and not problem:
+                problem[0] = st
+                killworkers()
+    t = threading.Thread(target=waitforworkers)
+    t.start()
     def cleanup():
-        # python 2.4 is too dumb for try/yield/finally
         signal.signal(signal.SIGINT, oldhandler)
-        problem = None
-        for i in xrange(workers):
-            pid, st = os.wait()
-            st = _exitstatus(st)
-            if st and not problem:
-                problem = st
-        if problem:
-            if problem < 0:
-                os.kill(os.getpid(), -problem)
-            sys.exit(problem)
+        t.join()
+        status = problem[0]
+        if status:
+            if status < 0:
+                os.kill(os.getpid(), -status)
+            sys.exit(status)
     try:
         for line in fp:
             l = line.split(' ', 1)
             yield int(l[0]), l[1][:-1]
     except: # re-raises
+        killworkers()
         cleanup()
         raise
     cleanup()