mercurial/worker.py
changeset 35432 86b8cc1f244e
parent 35431 471918fa7f46
child 35453 44fd4cfc6c0a
--- a/mercurial/worker.py	Sun Dec 17 11:26:25 2017 -0800
+++ b/mercurial/worker.py	Thu Nov 30 16:01:53 2017 -0800
@@ -12,6 +12,7 @@
 import signal
 import sys
 import threading
+import time
 
 from .i18n import _
 from . import (
@@ -216,6 +217,7 @@
             self._func = func
             self._staticargs = staticargs
             self._interrupted = False
+            self.daemon = True
             self.exception = None
 
         def interrupt(self):
@@ -242,16 +244,22 @@
                 raise
 
     threads = []
-    def killworkers():
+    def trykillworkers():
+        # Allow up to 1 second to clean worker threads nicely
+        cleanupend = time.time() + 1
         for t in threads:
             t.interrupt()
         for t in threads:
-            # try to let the threads handle interruption, but don't wait
-            # indefintely. the thread could be in infinite loop, handling
-            # a very long task or in a deadlock situation
-            t.join(5)
+            remainingtime = cleanupend - time.time()
+            t.join(remainingtime)
             if t.is_alive():
-                raise error.Abort(_('failed to join worker thread'))
+                # pass over the workers joining failure. it is more
+                # important to surface the inital exception than the
+                # fact that one of workers may be processing a large
+                # task and does not get to handle the interruption.
+                ui.warn(_("failed to kill worker threads while "
+                          "handling an exception\n"))
+                return
 
     workers = _numworkers(ui)
     resultqueue = util.queue()
@@ -264,25 +272,19 @@
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-
-    while len(threads) > 0:
-        while not resultqueue.empty():
-            yield resultqueue.get()
-        threads[0].join(0.05)
-        finishedthreads = [_t for _t in threads if not _t.is_alive()]
-        for t in finishedthreads:
-            if t.exception is not None:
-                try:
-                    killworkers()
-                except Exception:
-                    # pass over the workers joining failure. it is more
-                    # important to surface the inital exception than the
-                    # fact that one of workers may be processing a large
-                    # task and does not get to handle the interruption.
-                    ui.warn(_("failed to kill worker threads while handling "
-                              "an exception"))
-                raise t.exception
-            threads.remove(t)
+    try:
+        while len(threads) > 0:
+            while not resultqueue.empty():
+                yield resultqueue.get()
+            threads[0].join(0.05)
+            finishedthreads = [_t for _t in threads if not _t.is_alive()]
+            for t in finishedthreads:
+                if t.exception is not None:
+                    raise t.exception
+                threads.remove(t)
+    except Exception: # re-raises
+        trykillworkers()
+        raise
     while not resultqueue.empty():
         yield resultqueue.get()