mercurial/thirdparty/concurrent/futures/process.py
changeset 48836 e020dbbc41e5
parent 48835 a0da5075bca3
child 48837 12cba4886e90
equal deleted inserted replaced
48835:a0da5075bca3 48836:e020dbbc41e5
     1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
       
     2 # Licensed to PSF under a Contributor Agreement.
       
     3 
       
     4 """Implements ProcessPoolExecutor.
       
     5 
       
     6 The follow diagram and text describe the data-flow through the system:
       
     7 
       
     8 |======================= In-process =====================|== Out-of-process ==|
       
     9 
       
    10 +----------+     +----------+       +--------+     +-----------+    +---------+
       
    11 |          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
       
    12 |          |     +----------+       |        |     +-----------+    |         |
       
    13 |          |     | ...      |       |        |     | ...       |    |         |
       
    14 |          |     | 6        |       |        |     | 5, call() |    |         |
       
    15 |          |     | 7        |       |        |     | ...       |    |         |
       
    16 | Process  |     | ...      |       | Local  |     +-----------+    | Process |
       
    17 |  Pool    |     +----------+       | Worker |                      |  #1..n  |
       
    18 | Executor |                        | Thread |                      |         |
       
    19 |          |     +----------- +     |        |     +-----------+    |         |
       
    20 |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
       
    21 |          |     +------------+     |        |     +-----------+    |         |
       
    22 |          |     | 6: call()  |     |        |     | ...       |    |         |
       
    23 |          |     |    future  |     |        |     | 4, result |    |         |
       
    24 |          |     | ...        |     |        |     | 3, except |    |         |
       
    25 +----------+     +------------+     +--------+     +-----------+    +---------+
       
    26 
       
    27 Executor.submit() called:
       
    28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
       
    29 - adds the id of the _WorkItem to the "Work Ids" queue
       
    30 
       
    31 Local worker thread:
       
    32 - reads work ids from the "Work Ids" queue and looks up the corresponding
       
    33   WorkItem from the "Work Items" dict: if the work item has been cancelled then
       
    34   it is simply removed from the dict, otherwise it is repackaged as a
       
    35   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
       
    36   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
       
    37   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
       
    38 - reads _ResultItems from "Result Q", updates the future stored in the
       
    39   "Work Items" dict and deletes the dict entry
       
    40 
       
    41 Process #1..n:
       
    42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
       
    43   _ResultItems in "Request Q"
       
    44 """
       
    45 
       
    46 from __future__ import absolute_import
       
    47 
       
    48 import atexit
       
    49 from . import _base
       
    50 import Queue as queue
       
    51 import multiprocessing
       
    52 import threading
       
    53 import weakref
       
    54 import sys
       
    55 
       
    56 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       
    57 
       
    58 # Workers are created as daemon threads and processes. This is done to allow the
       
    59 # interpreter to exit when there are still idle processes in a
       
    60 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
       
    61 # allowing workers to die with the interpreter has two undesirable properties:
       
    62 #   - The workers would still be running during interpretor shutdown,
       
    63 #     meaning that they would fail in unpredictable ways.
       
    64 #   - The workers could be killed while evaluating a work item, which could
       
    65 #     be bad if the callable being evaluated has external side-effects e.g.
       
    66 #     writing to a file.
       
    67 #
       
    68 # To work around this problem, an exit handler is installed which tells the
       
    69 # workers to exit when their work queues are empty and then waits until the
       
    70 # threads/processes finish.
       
    71 
       
    72 _threads_queues = weakref.WeakKeyDictionary()
       
    73 _shutdown = False
       
    74 
       
    75 def _python_exit():
       
    76     global _shutdown
       
    77     _shutdown = True
       
    78     items = list(_threads_queues.items()) if _threads_queues else ()
       
    79     for t, q in items:
       
    80         q.put(None)
       
    81     for t, q in items:
       
    82         t.join(sys.maxint)
       
    83 
       
    84 # Controls how many more calls than processes will be queued in the call queue.
       
    85 # A smaller number will mean that processes spend more time idle waiting for
       
    86 # work while a larger number will make Future.cancel() succeed less frequently
       
    87 # (Futures in the call queue cannot be cancelled).
       
    88 EXTRA_QUEUED_CALLS = 1
       
    89 
       
    90 class _WorkItem(object):
       
    91     def __init__(self, future, fn, args, kwargs):
       
    92         self.future = future
       
    93         self.fn = fn
       
    94         self.args = args
       
    95         self.kwargs = kwargs
       
    96 
       
    97 class _ResultItem(object):
       
    98     def __init__(self, work_id, exception=None, result=None):
       
    99         self.work_id = work_id
       
   100         self.exception = exception
       
   101         self.result = result
       
   102 
       
   103 class _CallItem(object):
       
   104     def __init__(self, work_id, fn, args, kwargs):
       
   105         self.work_id = work_id
       
   106         self.fn = fn
       
   107         self.args = args
       
   108         self.kwargs = kwargs
       
   109 
       
   110 def _process_worker(call_queue, result_queue):
       
   111     """Evaluates calls from call_queue and places the results in result_queue.
       
   112 
       
   113     This worker is run in a separate process.
       
   114 
       
   115     Args:
       
   116         call_queue: A multiprocessing.Queue of _CallItems that will be read and
       
   117             evaluated by the worker.
       
   118         result_queue: A multiprocessing.Queue of _ResultItems that will written
       
   119             to by the worker.
       
   120         shutdown: A multiprocessing.Event that will be set as a signal to the
       
   121             worker that it should exit when call_queue is empty.
       
   122     """
       
   123     while True:
       
   124         call_item = call_queue.get(block=True)
       
   125         if call_item is None:
       
   126             # Wake up queue management thread
       
   127             result_queue.put(None)
       
   128             return
       
   129         try:
       
   130             r = call_item.fn(*call_item.args, **call_item.kwargs)
       
   131         except:
       
   132             e = sys.exc_info()[1]
       
   133             result_queue.put(_ResultItem(call_item.work_id,
       
   134                                          exception=e))
       
   135         else:
       
   136             result_queue.put(_ResultItem(call_item.work_id,
       
   137                                          result=r))
       
   138 
       
   139 def _add_call_item_to_queue(pending_work_items,
       
   140                             work_ids,
       
   141                             call_queue):
       
   142     """Fills call_queue with _WorkItems from pending_work_items.
       
   143 
       
   144     This function never blocks.
       
   145 
       
   146     Args:
       
   147         pending_work_items: A dict mapping work ids to _WorkItems e.g.
       
   148             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
       
   149         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
       
   150             are consumed and the corresponding _WorkItems from
       
   151             pending_work_items are transformed into _CallItems and put in
       
   152             call_queue.
       
   153         call_queue: A multiprocessing.Queue that will be filled with _CallItems
       
   154             derived from _WorkItems.
       
   155     """
       
   156     while True:
       
   157         if call_queue.full():
       
   158             return
       
   159         try:
       
   160             work_id = work_ids.get(block=False)
       
   161         except queue.Empty:
       
   162             return
       
   163         else:
       
   164             work_item = pending_work_items[work_id]
       
   165 
       
   166             if work_item.future.set_running_or_notify_cancel():
       
   167                 call_queue.put(_CallItem(work_id,
       
   168                                          work_item.fn,
       
   169                                          work_item.args,
       
   170                                          work_item.kwargs),
       
   171                                block=True)
       
   172             else:
       
   173                 del pending_work_items[work_id]
       
   174                 continue
       
   175 
       
   176 def _queue_management_worker(executor_reference,
       
   177                              processes,
       
   178                              pending_work_items,
       
   179                              work_ids_queue,
       
   180                              call_queue,
       
   181                              result_queue):
       
   182     """Manages the communication between this process and the worker processes.
       
   183 
       
   184     This function is run in a local thread.
       
   185 
       
   186     Args:
       
   187         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
       
   188             this thread. Used to determine if the ProcessPoolExecutor has been
       
   189             garbage collected and that this function can exit.
       
   190         process: A list of the multiprocessing.Process instances used as
       
   191             workers.
       
   192         pending_work_items: A dict mapping work ids to _WorkItems e.g.
       
   193             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
       
   194         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
       
   195         call_queue: A multiprocessing.Queue that will be filled with _CallItems
       
   196             derived from _WorkItems for processing by the process workers.
       
   197         result_queue: A multiprocessing.Queue of _ResultItems generated by the
       
   198             process workers.
       
   199     """
       
   200     nb_shutdown_processes = [0]
       
   201     def shutdown_one_process():
       
   202         """Tell a worker to terminate, which will in turn wake us again"""
       
   203         call_queue.put(None)
       
   204         nb_shutdown_processes[0] += 1
       
   205     while True:
       
   206         _add_call_item_to_queue(pending_work_items,
       
   207                                 work_ids_queue,
       
   208                                 call_queue)
       
   209 
       
   210         result_item = result_queue.get(block=True)
       
   211         if result_item is not None:
       
   212             work_item = pending_work_items[result_item.work_id]
       
   213             del pending_work_items[result_item.work_id]
       
   214 
       
   215             if result_item.exception:
       
   216                 work_item.future.set_exception(result_item.exception)
       
   217             else:
       
   218                 work_item.future.set_result(result_item.result)
       
   219             # Delete references to object. See issue16284
       
   220             del work_item
       
   221         # Check whether we should start shutting down.
       
   222         executor = executor_reference()
       
   223         # No more work items can be added if:
       
   224         #   - The interpreter is shutting down OR
       
   225         #   - The executor that owns this worker has been collected OR
       
   226         #   - The executor that owns this worker has been shutdown.
       
   227         if _shutdown or executor is None or executor._shutdown_thread:
       
   228             # Since no new work items can be added, it is safe to shutdown
       
   229             # this thread if there are no pending work items.
       
   230             if not pending_work_items:
       
   231                 while nb_shutdown_processes[0] < len(processes):
       
   232                     shutdown_one_process()
       
   233                 # If .join() is not called on the created processes then
       
   234                 # some multiprocessing.Queue methods may deadlock on Mac OS
       
   235                 # X.
       
   236                 for p in processes:
       
   237                     p.join()
       
   238                 call_queue.close()
       
   239                 return
       
   240         del executor
       
   241 
       
   242 _system_limits_checked = False
       
   243 _system_limited = None
       
   244 def _check_system_limits():
       
   245     global _system_limits_checked, _system_limited
       
   246     if _system_limits_checked:
       
   247         if _system_limited:
       
   248             raise NotImplementedError(_system_limited)
       
   249     _system_limits_checked = True
       
   250     try:
       
   251         import os
       
   252         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
       
   253     except (AttributeError, ValueError):
       
   254         # sysconf not available or setting not available
       
   255         return
       
   256     if nsems_max == -1:
       
   257         # indetermine limit, assume that limit is determined
       
   258         # by available memory only
       
   259         return
       
   260     if nsems_max >= 256:
       
   261         # minimum number of semaphores available
       
   262         # according to POSIX
       
   263         return
       
   264     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
       
   265     raise NotImplementedError(_system_limited)
       
   266 
       
   267 
       
   268 class ProcessPoolExecutor(_base.Executor):
       
   269     def __init__(self, max_workers=None):
       
   270         """Initializes a new ProcessPoolExecutor instance.
       
   271 
       
   272         Args:
       
   273             max_workers: The maximum number of processes that can be used to
       
   274                 execute the given calls. If None or not given then as many
       
   275                 worker processes will be created as the machine has processors.
       
   276         """
       
   277         _check_system_limits()
       
   278 
       
   279         if max_workers is None:
       
   280             self._max_workers = multiprocessing.cpu_count()
       
   281         else:
       
   282             if max_workers <= 0:
       
   283                 raise ValueError("max_workers must be greater than 0")
       
   284 
       
   285             self._max_workers = max_workers
       
   286 
       
   287         # Make the call queue slightly larger than the number of processes to
       
   288         # prevent the worker processes from idling. But don't make it too big
       
   289         # because futures in the call queue cannot be cancelled.
       
   290         self._call_queue = multiprocessing.Queue(self._max_workers +
       
   291                                                  EXTRA_QUEUED_CALLS)
       
   292         self._result_queue = multiprocessing.Queue()
       
   293         self._work_ids = queue.Queue()
       
   294         self._queue_management_thread = None
       
   295         self._processes = set()
       
   296 
       
   297         # Shutdown is a two-step process.
       
   298         self._shutdown_thread = False
       
   299         self._shutdown_lock = threading.Lock()
       
   300         self._queue_count = 0
       
   301         self._pending_work_items = {}
       
   302 
       
   303     def _start_queue_management_thread(self):
       
   304         # When the executor gets lost, the weakref callback will wake up
       
   305         # the queue management thread.
       
   306         def weakref_cb(_, q=self._result_queue):
       
   307             q.put(None)
       
   308         if self._queue_management_thread is None:
       
   309             self._queue_management_thread = threading.Thread(
       
   310                     target=_queue_management_worker,
       
   311                     args=(weakref.ref(self, weakref_cb),
       
   312                           self._processes,
       
   313                           self._pending_work_items,
       
   314                           self._work_ids,
       
   315                           self._call_queue,
       
   316                           self._result_queue))
       
   317             self._queue_management_thread.daemon = True
       
   318             self._queue_management_thread.start()
       
   319             _threads_queues[self._queue_management_thread] = self._result_queue
       
   320 
       
   321     def _adjust_process_count(self):
       
   322         for _ in range(len(self._processes), self._max_workers):
       
   323             p = multiprocessing.Process(
       
   324                     target=_process_worker,
       
   325                     args=(self._call_queue,
       
   326                           self._result_queue))
       
   327             p.start()
       
   328             self._processes.add(p)
       
   329 
       
   330     def submit(self, fn, *args, **kwargs):
       
   331         with self._shutdown_lock:
       
   332             if self._shutdown_thread:
       
   333                 raise RuntimeError('cannot schedule new futures after shutdown')
       
   334 
       
   335             f = _base.Future()
       
   336             w = _WorkItem(f, fn, args, kwargs)
       
   337 
       
   338             self._pending_work_items[self._queue_count] = w
       
   339             self._work_ids.put(self._queue_count)
       
   340             self._queue_count += 1
       
   341             # Wake up queue management thread
       
   342             self._result_queue.put(None)
       
   343 
       
   344             self._start_queue_management_thread()
       
   345             self._adjust_process_count()
       
   346             return f
       
   347     submit.__doc__ = _base.Executor.submit.__doc__
       
   348 
       
   349     def shutdown(self, wait=True):
       
   350         with self._shutdown_lock:
       
   351             self._shutdown_thread = True
       
   352         if self._queue_management_thread:
       
   353             # Wake up queue management thread
       
   354             self._result_queue.put(None)
       
   355             if wait:
       
   356                 self._queue_management_thread.join(sys.maxint)
       
   357         # To reduce the risk of openning too many files, remove references to
       
   358         # objects that use file descriptors.
       
   359         self._queue_management_thread = None
       
   360         self._call_queue = None
       
   361         self._result_queue = None
       
   362         self._processes = None
       
   363     shutdown.__doc__ = _base.Executor.shutdown.__doc__
       
   364 
       
   365 atexit.register(_python_exit)