workers: handling exceptions in windows workers
authorWojciech Lis <wlis@fb.com>
Mon, 20 Nov 2017 10:27:41 -0800
changeset 35428 71427ff1dff8
parent 35427 02b36e860e0b
child 35431 471918fa7f46
workers: handling exceptions in windows workers This adds handling of exceptions from worker threads and resurfaces them as if the function ran without workers. If any of the threads throws, the main thread kills all running threads giving them 5 sec to handle the interruption and raises the first exception received. We don't have to join threads if is_alive() is false Test Plan: Ran multiple updates/enable/disable sparse profile and things worked well Ran test on CentOS- all tests passing on @ passed here Added a forged exception into the worker code and got it properly resurfaced and the rest of workers killed: P58642088 PS C:\open\<repo>> ..\facebook-hg-rpms\build\hg\hg.exe --config extensions.fsmonitor=! sparse --enable-profile <profile> updating [==> ] 1300/39166 1m57sException in thread Thread-3: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception Exception in thread Thread-2: Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 244, in run raise e Exception: Forged exception <...> Traceback (most recent call last): File "C:\open\facebook-hg-rpms\build\hg\hgexe.py", line 41, in <module> dispatch.run() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 85, in run status = (dispatch(req) or 0) & 255 File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 173, in dispatch ret = _runcatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 324, in _runcatch return _callcatch(ui, _runcatchfunc) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 332, in _callcatch return scmutil.callcatch(ui, func) File "C:\open\facebook-hg-rpms\build\hg\mercurial\scmutil.py", line 154, in callcatch return func() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 314, in _runcatchfunc return _dispatch(req) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 951, in _dispatch cmdpats, cmdoptions) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 415, in runcommand return orig(lui, repo, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\undo.py", line 118, in _runcommandwrapper result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hgext\journal.py", line 84, in runcommand return orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 268, in _tracksparseprofiles res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\perftweaks.py", line 256, in _trackdirstatesizes res = runcommand(lui, repo, *args) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\copytrace.py", line 144, in _runcommand return orig(lui, repo, cmd, fullargs, ui, *args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbamend\hiddenoverride.py", line 119, in runcommand result = orig(lui, repo, cmd, fullargs, *args) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 712, in runcommand ret = _runcommand(ui, options, cmd, d) File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 959, in _runcommand return cmdfunc() File "C:\open\facebook-hg-rpms\build\hg\mercurial\dispatch.py", line 948, in <lambda> d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) File "C:\open\facebook-hg-rpms\build\hg\mercurial\util.py", line 1183, in check return func(*args, **kwargs) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 860, in sparse disableprofile=disableprofile, force=force) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 949, in _config len, _refresh(ui, repo, oldstatus, oldsparsematch, force)) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\hgext3rd\fbsparse.py", line 1116, in _refresh mergemod.applyupdates(repo, typeactions, repo[None], repo['.'], False) File "C:\open\facebook-hg-rpms\build\hg\hg-python\lib\site-packages\remotefilelog\__init__.py", line 311, in applyupdates return orig(repo, actions, wctx, mctx, overwrite, labels=labels) File "C:\open\facebook-hg-rpms\build\hg\mercurial\merge.py", line 1464, in applyupdates for i, item in prog: File "C:\open\facebook-hg-rpms\build\hg\mercurial\worker.py", line 286, in _windowsworker raise t.exception Exception: Forged exception PS C:\open\ovrsource> Differential Revision: https://phab.mercurial-scm.org/D1459
mercurial/worker.py
--- a/mercurial/worker.py	Mon Nov 20 10:25:29 2017 -0800
+++ b/mercurial/worker.py	Mon Nov 20 10:27:41 2017 -0800
@@ -214,18 +214,45 @@
             self._resultqueue = resultqueue
             self._func = func
             self._staticargs = staticargs
+            self._interrupted = False
+            self.exception = None
+
+        def interrupt(self):
+            self._interrupted = True
 
         def run(self):
-            while not self._taskqueue.empty():
-                try:
-                    args = self._taskqueue.get_nowait()
-                    for res in self._func(*self._staticargs + (args,)):
-                        self._resultqueue.put(res)
-                except util.empty:
-                    break
+            try:
+                while not self._taskqueue.empty():
+                    try:
+                        args = self._taskqueue.get_nowait()
+                        for res in self._func(*self._staticargs + (args,)):
+                            self._resultqueue.put(res)
+                            # threading doesn't provide a native way to
+                            # interrupt execution. handle it manually at every
+                            # iteration.
+                            if self._interrupted:
+                                return
+                    except util.empty:
+                        break
+            except Exception as e:
+                # store the exception such that the main thread can resurface
+                # it as if the func was running without workers.
+                self.exception = e
+                raise
+
+    threads = []
+    def killworkers():
+        for t in threads:
+            t.interrupt()
+        for t in threads:
+            # try to let the threads handle interruption, but don't wait
+            # indefintely. the thread could be in infinite loop, handling
+            # a very long task or in a deadlock situation
+            t.join(5)
+            if t.is_alive():
+                raise error.Abort(_('failed to join worker thread'))
 
     workers = _numworkers(ui)
-    threads = []
     resultqueue = util.queue()
     taskqueue = util.queue()
     # partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-    while any(t.is_alive() for t in threads):
+
+    while len(threads) > 0:
         while not resultqueue.empty():
             yield resultqueue.get()
-        t = threads[0]
-        t.join(0.05)
-        if not t.is_alive():
+        threads[0].join(0.05)
+        finishedthreads = [_t for _t in threads if not _t.is_alive()]
+        for t in finishedthreads:
+            if t.exception is not None:
+                try:
+                    killworkers()
+                except Exception:
+                    # pass over the workers joining failure. it is more
+                    # important to surface the inital exception than the
+                    # fact that one of workers may be processing a large
+                    # task and does not get to handle the interruption.
+                    ui.warn(_("failed to kill worker threads while handling "
+                              "an exception"))
+                raise t.exception
             threads.remove(t)
     while not resultqueue.empty():
         yield resultqueue.get()