mercurial/worker.py
changeset 35427 02b36e860e0b
parent 34646 238abf65a8ad
child 35428 71427ff1dff8
--- a/mercurial/worker.py	Mon Dec 11 16:51:13 2017 -0800
+++ b/mercurial/worker.py	Mon Nov 20 10:25:29 2017 -0800
@@ -11,6 +11,7 @@
 import os
 import signal
 import sys
+import threading
 
 from .i18n import _
 from . import (
@@ -53,7 +54,7 @@
             raise error.Abort(_('number of cpus must be an integer'))
     return min(max(countcpus(), 4), 32)
 
-if pycompat.isposix:
+if pycompat.isposix or pycompat.iswindows:
     _startupcost = 0.01
 else:
     _startupcost = 1e30
@@ -203,7 +204,51 @@
     elif os.WIFSIGNALED(code):
         return -os.WTERMSIG(code)
 
-if not pycompat.iswindows:
+def _windowsworker(ui, func, staticargs, args):
+    class Worker(threading.Thread):
+        def __init__(self, taskqueue, resultqueue, func, staticargs,
+                     group=None, target=None, name=None, verbose=None):
+            threading.Thread.__init__(self, group=group, target=target,
+                                      name=name, verbose=verbose)
+            self._taskqueue = taskqueue
+            self._resultqueue = resultqueue
+            self._func = func
+            self._staticargs = staticargs
+
+        def run(self):
+            while not self._taskqueue.empty():
+                try:
+                    args = self._taskqueue.get_nowait()
+                    for res in self._func(*self._staticargs + (args,)):
+                        self._resultqueue.put(res)
+                except util.empty:
+                    break
+
+    workers = _numworkers(ui)
+    threads = []
+    resultqueue = util.queue()
+    taskqueue = util.queue()
+    # partition work to more pieces than workers to minimize the chance
+    # of uneven distribution of large tasks between the workers
+    for pargs in partition(args, workers * 20):
+        taskqueue.put(pargs)
+    for _i in range(workers):
+        t = Worker(taskqueue, resultqueue, func, staticargs)
+        threads.append(t)
+        t.start()
+    while any(t.is_alive() for t in threads):
+        while not resultqueue.empty():
+            yield resultqueue.get()
+        t = threads[0]
+        t.join(0.05)
+        if not t.is_alive():
+            threads.remove(t)
+    while not resultqueue.empty():
+        yield resultqueue.get()
+
+if pycompat.iswindows:
+    _platformworker = _windowsworker
+else:
     _platformworker = _posixworker
     _exitstatus = _posixexitstatus