--- a/mercurial/worker.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/worker.py Sun Oct 06 09:45:02 2019 -0400
@@ -16,6 +16,7 @@
try:
import selectors
+
selectors.BaseSelector
except ImportError:
from .thirdparty import selectors2 as selectors
@@ -29,6 +30,7 @@
util,
)
+
def countcpus():
'''try to count the number of CPUs on the system'''
@@ -50,6 +52,7 @@
return 1
+
def _numworkers(ui):
s = ui.config('worker', 'numcpus')
if s:
@@ -61,6 +64,7 @@
raise error.Abort(_('number of cpus must be an integer'))
return min(max(countcpus(), 4), 32)
+
if pycompat.isposix or pycompat.iswindows:
_STARTUP_COST = 0.01
# The Windows worker is thread based. If tasks are CPU bound, threads
@@ -71,6 +75,7 @@
_STARTUP_COST = 1e30
_DISALLOW_THREAD_UNSAFE = False
+
def worthwhile(ui, costperop, nops, threadsafe=True):
'''try to determine whether the benefit of multiple processes can
outweigh the cost of starting them'''
@@ -83,8 +88,10 @@
benefit = linear - (_STARTUP_COST * workers + linear / workers)
return benefit >= 0.15
-def worker(ui, costperarg, func, staticargs, args, hasretval=False,
- threadsafe=True):
+
+def worker(
+ ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
+):
'''run a function, possibly in parallel in multiple worker
processes.
@@ -113,11 +120,13 @@
return _platformworker(ui, func, staticargs, args, hasretval)
return func(*staticargs + (args,))
+
def _posixworker(ui, func, staticargs, args, hasretval):
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
pids, problem = set(), [0]
+
def killworkers():
# unregister SIGCHLD handler as all children will be killed. This
# function shouldn't be interrupted by another SIGCHLD; otherwise pids
@@ -130,6 +139,7 @@
except OSError as err:
if err.errno != errno.ESRCH:
raise
+
def waitforworkers(blocking=True):
for pid in pids.copy():
p = st = 0
@@ -155,10 +165,12 @@
st = _exitstatus(st)
if st and not problem[0]:
problem[0] = st
+
def sigchldhandler(signum, frame):
waitforworkers(blocking=False)
if problem[0]:
killworkers()
+
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
ui.flush()
parentpid = os.getpid()
@@ -196,7 +208,7 @@
return 0
ret = scmutil.callcatch(ui, workerfunc)
- except: # parent re-raises, child never returns
+ except: # parent re-raises, child never returns
if os.getpid() == parentpid:
raise
exctype = sys.exc_info()[0]
@@ -206,7 +218,7 @@
if os.getpid() != parentpid:
try:
ui.flush()
- except: # never returns, no re-raises
+ except: # never returns, no re-raises
pass
finally:
os._exit(ret & 255)
@@ -215,12 +227,14 @@
for rfd, wfd in pipes:
os.close(wfd)
selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
+
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
waitforworkers()
signal.signal(signal.SIGCHLD, oldchldhandler)
selector.close()
return problem[0]
+
try:
openpipes = len(pipes)
while openpipes > 0:
@@ -239,7 +253,7 @@
if e.errno == errno.EINTR:
continue
raise
- except: # re-raises
+ except: # re-raises
killworkers()
cleanup()
raise
@@ -251,6 +265,7 @@
if hasretval:
yield True, retval
+
def _posixexitstatus(code):
'''convert a posix exit status into the same form returned by
os.spawnv
@@ -259,12 +274,14 @@
if os.WIFEXITED(code):
return os.WEXITSTATUS(code)
elif os.WIFSIGNALED(code):
- return -os.WTERMSIG(code)
+ return -(os.WTERMSIG(code))
+
def _windowsworker(ui, func, staticargs, args, hasretval):
class Worker(threading.Thread):
- def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
- **kwargs):
+ def __init__(
+ self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
+ ):
threading.Thread.__init__(self, *args, **kwargs)
self._taskqueue = taskqueue
self._resultqueue = resultqueue
@@ -298,6 +315,7 @@
raise
threads = []
+
def trykillworkers():
# Allow up to 1 second to clean worker threads nicely
cleanupend = time.time() + 1
@@ -311,8 +329,12 @@
# important to surface the inital exception than the
# fact that one of workers may be processing a large
# task and does not get to handle the interruption.
- ui.warn(_("failed to kill worker threads while "
- "handling an exception\n"))
+ ui.warn(
+ _(
+ "failed to kill worker threads while "
+ "handling an exception\n"
+ )
+ )
return
workers = _numworkers(ui)
@@ -341,7 +363,7 @@
if t.exception is not None:
raise t.exception
threads.remove(t)
- except (Exception, KeyboardInterrupt): # re-raises
+ except (Exception, KeyboardInterrupt): # re-raises
trykillworkers()
raise
while not resultqueue.empty():
@@ -353,12 +375,14 @@
if hasretval:
yield True, retval
+
if pycompat.iswindows:
_platformworker = _windowsworker
else:
_platformworker = _posixworker
_exitstatus = _posixexitstatus
+
def partition(lst, nslices):
'''partition a list into N slices of roughly equal size