mercurial/worker.py
branchstable
changeset 49366 288de6f5d724
parent 49309 d54b213c4380
child 49544 3556f0392808
equal deleted inserted replaced
49364:e8ea403b1c46 49366:288de6f5d724
     3 # Copyright 2013 Facebook, Inc.
     3 # Copyright 2013 Facebook, Inc.
     4 #
     4 #
     5 # This software may be used and distributed according to the terms of the
     5 # This software may be used and distributed according to the terms of the
     6 # GNU General Public License version 2 or any later version.
     6 # GNU General Public License version 2 or any later version.
     7 
     7 
     8 from __future__ import absolute_import
     8 
     9 
       
    10 import errno
       
    11 import os
     9 import os
       
    10 import pickle
       
    11 import selectors
    12 import signal
    12 import signal
    13 import sys
    13 import sys
    14 import threading
    14 import threading
    15 import time
    15 import time
    16 
       
    17 try:
       
    18     import selectors
       
    19 
       
    20     selectors.BaseSelector
       
    21 except ImportError:
       
    22     from .thirdparty import selectors2 as selectors
       
    23 
    16 
    24 from .i18n import _
    17 from .i18n import _
    25 from . import (
    18 from . import (
    26     encoding,
    19     encoding,
    27     error,
    20     error,
    28     pycompat,
    21     pycompat,
    29     scmutil,
    22     scmutil,
    30     util,
       
    31 )
    23 )
    32 
    24 
    33 
    25 
    34 def countcpus():
    26 def countcpus():
    35     '''try to count the number of CPUs on the system'''
    27     '''try to count the number of CPUs on the system'''
    63         except ValueError:
    55         except ValueError:
    64             raise error.Abort(_(b'number of cpus must be an integer'))
    56             raise error.Abort(_(b'number of cpus must be an integer'))
    65     return min(max(countcpus(), 4), 32)
    57     return min(max(countcpus(), 4), 32)
    66 
    58 
    67 
    59 
    68 if pycompat.ispy3:
    60 def ismainthread():
    69 
    61     return threading.current_thread() == threading.main_thread()
    70     def ismainthread():
    62 
    71         return threading.current_thread() == threading.main_thread()
    63 
    72 
    64 class _blockingreader:
    73     class _blockingreader(object):
    65     """Wrap unbuffered stream such that pickle.load() works with it.
    74         def __init__(self, wrapped):
    66 
    75             self._wrapped = wrapped
    67     pickle.load() expects that calls to read() and readinto() read as many
    76 
    68     bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
    77         # Do NOT implement readinto() by making it delegate to
    69     pickle.load() raises an EOFError.
    78         # _wrapped.readinto(), since that is unbuffered. The unpickler is fine
    70     """
    79         # with just read() and readline(), so we don't need to implement it.
    71 
    80 
    72     def __init__(self, wrapped):
    81         if (3, 8, 0) <= sys.version_info[:3] < (3, 8, 2):
    73         self._wrapped = wrapped
    82 
    74 
    83             # This is required for python 3.8, prior to 3.8.2.  See issue6444.
    75     def readline(self):
    84             def readinto(self, b):
    76         return self._wrapped.readline()
    85                 pos = 0
    77 
    86                 size = len(b)
    78     def readinto(self, buf):
    87 
    79         pos = 0
    88                 while pos < size:
    80         size = len(buf)
    89                     ret = self._wrapped.readinto(b[pos:])
    81 
    90                     if not ret:
    82         with memoryview(buf) as view:
    91                         break
       
    92                     pos += ret
       
    93 
       
    94                 return pos
       
    95 
       
    96         def readline(self):
       
    97             return self._wrapped.readline()
       
    98 
       
    99         # issue multiple reads until size is fulfilled
       
   100         def read(self, size=-1):
       
   101             if size < 0:
       
   102                 return self._wrapped.readall()
       
   103 
       
   104             buf = bytearray(size)
       
   105             view = memoryview(buf)
       
   106             pos = 0
       
   107 
       
   108             while pos < size:
    83             while pos < size:
   109                 ret = self._wrapped.readinto(view[pos:])
    84                 with view[pos:] as subview:
       
    85                     ret = self._wrapped.readinto(subview)
   110                 if not ret:
    86                 if not ret:
   111                     break
    87                     break
   112                 pos += ret
    88                 pos += ret
   113 
    89 
   114             del view
    90         return pos
   115             del buf[pos:]
    91 
   116             return bytes(buf)
    92     # issue multiple reads until size is fulfilled (or EOF is encountered)
   117 
    93     def read(self, size=-1):
   118 
    94         if size < 0:
   119 else:
    95             return self._wrapped.readall()
   120 
    96 
   121     def ismainthread():
    97         buf = bytearray(size)
   122         # pytype: disable=module-attr
    98         n_read = self.readinto(buf)
   123         return isinstance(threading.current_thread(), threading._MainThread)
    99         del buf[n_read:]
   124         # pytype: enable=module-attr
   100         return bytes(buf)
   125 
       
   126     def _blockingreader(wrapped):
       
   127         return wrapped
       
   128 
   101 
   129 
   102 
   130 if pycompat.isposix or pycompat.iswindows:
   103 if pycompat.isposix or pycompat.iswindows:
   131     _STARTUP_COST = 0.01
   104     _STARTUP_COST = 0.01
   132     # The Windows worker is thread based. If tasks are CPU bound, threads
   105     # The Windows worker is thread based. If tasks are CPU bound, threads
   201         signal.signal(signal.SIGCHLD, oldchldhandler)
   174         signal.signal(signal.SIGCHLD, oldchldhandler)
   202         # if one worker bails, there's no good reason to wait for the rest
   175         # if one worker bails, there's no good reason to wait for the rest
   203         for p in pids:
   176         for p in pids:
   204             try:
   177             try:
   205                 os.kill(p, signal.SIGTERM)
   178                 os.kill(p, signal.SIGTERM)
   206             except OSError as err:
   179             except ProcessLookupError:
   207                 if err.errno != errno.ESRCH:
   180                 pass
   208                     raise
       
   209 
   181 
   210     def waitforworkers(blocking=True):
   182     def waitforworkers(blocking=True):
   211         for pid in pids.copy():
   183         for pid in pids.copy():
   212             p = st = 0
   184             p = st = 0
   213             while True:
   185             try:
   214                 try:
   186                 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
   215                     p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
   187             except ChildProcessError:
   216                     break
   188                 # child would already be reaped, but pids yet been
   217                 except OSError as e:
   189                 # updated (maybe interrupted just after waitpid)
   218                     if e.errno == errno.EINTR:
   190                 pids.discard(pid)
   219                         continue
       
   220                     elif e.errno == errno.ECHILD:
       
   221                         # child would already be reaped, but pids yet been
       
   222                         # updated (maybe interrupted just after waitpid)
       
   223                         pids.discard(pid)
       
   224                         break
       
   225                     else:
       
   226                         raise
       
   227             if not p:
   191             if not p:
   228                 # skip subsequent steps, because child process should
   192                 # skip subsequent steps, because child process should
   229                 # be still running in this case
   193                 # be still running in this case
   230                 continue
   194                 continue
   231             pids.discard(p)
   195             pids.discard(p)
   268                 def workerfunc():
   232                 def workerfunc():
   269                     for r, w in pipes[:-1]:
   233                     for r, w in pipes[:-1]:
   270                         os.close(r)
   234                         os.close(r)
   271                         os.close(w)
   235                         os.close(w)
   272                     os.close(rfd)
   236                     os.close(rfd)
   273                     for result in func(*(staticargs + (pargs,))):
   237                     with os.fdopen(wfd, 'wb') as wf:
   274                         os.write(wfd, util.pickle.dumps(result))
   238                         for result in func(*(staticargs + (pargs,))):
       
   239                             pickle.dump(result, wf)
       
   240                             wf.flush()
   275                     return 0
   241                     return 0
   276 
   242 
   277                 ret = scmutil.callcatch(ui, workerfunc)
   243                 ret = scmutil.callcatch(ui, workerfunc)
   278         except:  # parent re-raises, child never returns
   244         except:  # parent re-raises, child never returns
   279             if os.getpid() == parentpid:
   245             if os.getpid() == parentpid:
   291                     os._exit(ret & 255)
   257                     os._exit(ret & 255)
   292         pids.add(pid)
   258         pids.add(pid)
   293     selector = selectors.DefaultSelector()
   259     selector = selectors.DefaultSelector()
   294     for rfd, wfd in pipes:
   260     for rfd, wfd in pipes:
   295         os.close(wfd)
   261         os.close(wfd)
       
   262         # The stream has to be unbuffered. Otherwise, if all data is read from
       
   263         # the raw file into the buffer, the selector thinks that the FD is not
       
   264         # ready to read while pickle.load() could read from the buffer. This
       
   265         # would delay the processing of readable items.
   296         selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
   266         selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
   297 
   267 
   298     def cleanup():
   268     def cleanup():
   299         signal.signal(signal.SIGINT, oldhandler)
   269         signal.signal(signal.SIGINT, oldhandler)
   300         waitforworkers()
   270         waitforworkers()
   305     try:
   275     try:
   306         openpipes = len(pipes)
   276         openpipes = len(pipes)
   307         while openpipes > 0:
   277         while openpipes > 0:
   308             for key, events in selector.select():
   278             for key, events in selector.select():
   309                 try:
   279                 try:
   310                     res = util.pickle.load(_blockingreader(key.fileobj))
   280                     # The pytype error likely goes away on a modern version of
       
   281                     # pytype having a modern typeshed snapshot.
       
   282                     # pytype: disable=wrong-arg-types
       
   283                     res = pickle.load(_blockingreader(key.fileobj))
       
   284                     # pytype: enable=wrong-arg-types
   311                     if hasretval and res[0]:
   285                     if hasretval and res[0]:
   312                         retval.update(res[1])
   286                         retval.update(res[1])
   313                     else:
   287                     else:
   314                         yield res
   288                         yield res
   315                 except EOFError:
   289                 except EOFError:
   316                     selector.unregister(key.fileobj)
   290                     selector.unregister(key.fileobj)
       
   291                     # pytype: disable=attribute-error
   317                     key.fileobj.close()
   292                     key.fileobj.close()
       
   293                     # pytype: enable=attribute-error
   318                     openpipes -= 1
   294                     openpipes -= 1
   319                 except IOError as e:
       
   320                     if e.errno == errno.EINTR:
       
   321                         continue
       
   322                     raise
       
   323     except:  # re-raises
   295     except:  # re-raises
   324         killworkers()
   296         killworkers()
   325         cleanup()
   297         cleanup()
   326         raise
   298         raise
   327     status = cleanup()
   299     status = cleanup()