changeset 48836 e020dbbc41e5
parent 48835 a0da5075bca3
child 48837 12cba4886e90
equal deleted inserted replaced
48835:a0da5075bca3 48836:e020dbbc41e5
     1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
     2 # Licensed to PSF under a Contributor Agreement.
     4 """Implements ProcessPoolExecutor.
     6 The follow diagram and text describe the data-flow through the system:
     8 |======================= In-process =====================|== Out-of-process ==|
    10 +----------+     +----------+       +--------+     +-----------+    +---------+
    11 |          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
    12 |          |     +----------+       |        |     +-----------+    |         |
    13 |          |     | ...      |       |        |     | ...       |    |         |
    14 |          |     | 6        |       |        |     | 5, call() |    |         |
    15 |          |     | 7        |       |        |     | ...       |    |         |
    16 | Process  |     | ...      |       | Local  |     +-----------+    | Process |
    17 |  Pool    |     +----------+       | Worker |                      |  #1..n  |
    18 | Executor |                        | Thread |                      |         |
    19 |          |     +----------- +     |        |     +-----------+    |         |
    20 |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
    21 |          |     +------------+     |        |     +-----------+    |         |
    22 |          |     | 6: call()  |     |        |     | ...       |    |         |
    23 |          |     |    future  |     |        |     | 4, result |    |         |
    24 |          |     | ...        |     |        |     | 3, except |    |         |
    25 +----------+     +------------+     +--------+     +-----------+    +---------+
    27 Executor.submit() called:
    28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
    29 - adds the id of the _WorkItem to the "Work Ids" queue
    31 Local worker thread:
    32 - reads work ids from the "Work Ids" queue and looks up the corresponding
    33   WorkItem from the "Work Items" dict: if the work item has been cancelled then
    34   it is simply removed from the dict, otherwise it is repackaged as a
    35   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
    36   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
    37   calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
    38 - reads _ResultItems from "Result Q", updates the future stored in the
    39   "Work Items" dict and deletes the dict entry
    41 Process #1..n:
    42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting
    43   _ResultItems in "Request Q"
    44 """
    46 from __future__ import absolute_import
    48 import atexit
    49 from . import _base
    50 import Queue as queue
    51 import multiprocessing
    52 import threading
    53 import weakref
    54 import sys
    56 __author__ = 'Brian Quinlan ('
    58 # Workers are created as daemon threads and processes. This is done to allow the
    59 # interpreter to exit when there are still idle processes in a
    60 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
    61 # allowing workers to die with the interpreter has two undesirable properties:
    62 #   - The workers would still be running during interpretor shutdown,
    63 #     meaning that they would fail in unpredictable ways.
    64 #   - The workers could be killed while evaluating a work item, which could
    65 #     be bad if the callable being evaluated has external side-effects e.g.
    66 #     writing to a file.
    67 #
    68 # To work around this problem, an exit handler is installed which tells the
    69 # workers to exit when their work queues are empty and then waits until the
    70 # threads/processes finish.
    72 _threads_queues = weakref.WeakKeyDictionary()
    73 _shutdown = False
    75 def _python_exit():
    76     global _shutdown
    77     _shutdown = True
    78     items = list(_threads_queues.items()) if _threads_queues else ()
    79     for t, q in items:
    80         q.put(None)
    81     for t, q in items:
    82         t.join(sys.maxint)
    84 # Controls how many more calls than processes will be queued in the call queue.
    85 # A smaller number will mean that processes spend more time idle waiting for
    86 # work while a larger number will make Future.cancel() succeed less frequently
    87 # (Futures in the call queue cannot be cancelled).
    90 class _WorkItem(object):
    91     def __init__(self, future, fn, args, kwargs):
    92         self.future = future
    93         self.fn = fn
    94         self.args = args
    95         self.kwargs = kwargs
    97 class _ResultItem(object):
    98     def __init__(self, work_id, exception=None, result=None):
    99         self.work_id = work_id
   100         self.exception = exception
   101         self.result = result
   103 class _CallItem(object):
   104     def __init__(self, work_id, fn, args, kwargs):
   105         self.work_id = work_id
   106         self.fn = fn
   107         self.args = args
   108         self.kwargs = kwargs
   110 def _process_worker(call_queue, result_queue):
   111     """Evaluates calls from call_queue and places the results in result_queue.
   113     This worker is run in a separate process.
   115     Args:
   116         call_queue: A multiprocessing.Queue of _CallItems that will be read and
   117             evaluated by the worker.
   118         result_queue: A multiprocessing.Queue of _ResultItems that will written
   119             to by the worker.
   120         shutdown: A multiprocessing.Event that will be set as a signal to the
   121             worker that it should exit when call_queue is empty.
   122     """
   123     while True:
   124         call_item = call_queue.get(block=True)
   125         if call_item is None:
   126             # Wake up queue management thread
   127             result_queue.put(None)
   128             return
   129         try:
   130             r = call_item.fn(*call_item.args, **call_item.kwargs)
   131         except:
   132             e = sys.exc_info()[1]
   133             result_queue.put(_ResultItem(call_item.work_id,
   134                                          exception=e))
   135         else:
   136             result_queue.put(_ResultItem(call_item.work_id,
   137                                          result=r))
   139 def _add_call_item_to_queue(pending_work_items,
   140                             work_ids,
   141                             call_queue):
   142     """Fills call_queue with _WorkItems from pending_work_items.
   144     This function never blocks.
   146     Args:
   147         pending_work_items: A dict mapping work ids to _WorkItems e.g.
   148             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
   149         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
   150             are consumed and the corresponding _WorkItems from
   151             pending_work_items are transformed into _CallItems and put in
   152             call_queue.
   153         call_queue: A multiprocessing.Queue that will be filled with _CallItems
   154             derived from _WorkItems.
   155     """
   156     while True:
   157         if call_queue.full():
   158             return
   159         try:
   160             work_id = work_ids.get(block=False)
   161         except queue.Empty:
   162             return
   163         else:
   164             work_item = pending_work_items[work_id]
   166             if work_item.future.set_running_or_notify_cancel():
   167                 call_queue.put(_CallItem(work_id,
   168                                          work_item.fn,
   169                                          work_item.args,
   170                                          work_item.kwargs),
   171                                block=True)
   172             else:
   173                 del pending_work_items[work_id]
   174                 continue
   176 def _queue_management_worker(executor_reference,
   177                              processes,
   178                              pending_work_items,
   179                              work_ids_queue,
   180                              call_queue,
   181                              result_queue):
   182     """Manages the communication between this process and the worker processes.
   184     This function is run in a local thread.
   186     Args:
   187         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
   188             this thread. Used to determine if the ProcessPoolExecutor has been
   189             garbage collected and that this function can exit.
   190         process: A list of the multiprocessing.Process instances used as
   191             workers.
   192         pending_work_items: A dict mapping work ids to _WorkItems e.g.
   193             {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
   194         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
   195         call_queue: A multiprocessing.Queue that will be filled with _CallItems
   196             derived from _WorkItems for processing by the process workers.
   197         result_queue: A multiprocessing.Queue of _ResultItems generated by the
   198             process workers.
   199     """
   200     nb_shutdown_processes = [0]
   201     def shutdown_one_process():
   202         """Tell a worker to terminate, which will in turn wake us again"""
   203         call_queue.put(None)
   204         nb_shutdown_processes[0] += 1
   205     while True:
   206         _add_call_item_to_queue(pending_work_items,
   207                                 work_ids_queue,
   208                                 call_queue)
   210         result_item = result_queue.get(block=True)
   211         if result_item is not None:
   212             work_item = pending_work_items[result_item.work_id]
   213             del pending_work_items[result_item.work_id]
   215             if result_item.exception:
   216                 work_item.future.set_exception(result_item.exception)
   217             else:
   218                 work_item.future.set_result(result_item.result)
   219             # Delete references to object. See issue16284
   220             del work_item
   221         # Check whether we should start shutting down.
   222         executor = executor_reference()
   223         # No more work items can be added if:
   224         #   - The interpreter is shutting down OR
   225         #   - The executor that owns this worker has been collected OR
   226         #   - The executor that owns this worker has been shutdown.
   227         if _shutdown or executor is None or executor._shutdown_thread:
   228             # Since no new work items can be added, it is safe to shutdown
   229             # this thread if there are no pending work items.
   230             if not pending_work_items:
   231                 while nb_shutdown_processes[0] < len(processes):
   232                     shutdown_one_process()
   233                 # If .join() is not called on the created processes then
   234                 # some multiprocessing.Queue methods may deadlock on Mac OS
   235                 # X.
   236                 for p in processes:
   237                     p.join()
   238                 call_queue.close()
   239                 return
   240         del executor
   242 _system_limits_checked = False
   243 _system_limited = None
   244 def _check_system_limits():
   245     global _system_limits_checked, _system_limited
   246     if _system_limits_checked:
   247         if _system_limited:
   248             raise NotImplementedError(_system_limited)
   249     _system_limits_checked = True
   250     try:
   251         import os
   252         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
   253     except (AttributeError, ValueError):
   254         # sysconf not available or setting not available
   255         return
   256     if nsems_max == -1:
   257         # indetermine limit, assume that limit is determined
   258         # by available memory only
   259         return
   260     if nsems_max >= 256:
   261         # minimum number of semaphores available
   262         # according to POSIX
   263         return
   264     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
   265     raise NotImplementedError(_system_limited)
   268 class ProcessPoolExecutor(_base.Executor):
   269     def __init__(self, max_workers=None):
   270         """Initializes a new ProcessPoolExecutor instance.
   272         Args:
   273             max_workers: The maximum number of processes that can be used to
   274                 execute the given calls. If None or not given then as many
   275                 worker processes will be created as the machine has processors.
   276         """
   277         _check_system_limits()
   279         if max_workers is None:
   280             self._max_workers = multiprocessing.cpu_count()
   281         else:
   282             if max_workers <= 0:
   283                 raise ValueError("max_workers must be greater than 0")
   285             self._max_workers = max_workers
   287         # Make the call queue slightly larger than the number of processes to
   288         # prevent the worker processes from idling. But don't make it too big
   289         # because futures in the call queue cannot be cancelled.
   290         self._call_queue = multiprocessing.Queue(self._max_workers +
   291                                                  EXTRA_QUEUED_CALLS)
   292         self._result_queue = multiprocessing.Queue()
   293         self._work_ids = queue.Queue()
   294         self._queue_management_thread = None
   295         self._processes = set()
   297         # Shutdown is a two-step process.
   298         self._shutdown_thread = False
   299         self._shutdown_lock = threading.Lock()
   300         self._queue_count = 0
   301         self._pending_work_items = {}
   303     def _start_queue_management_thread(self):
   304         # When the executor gets lost, the weakref callback will wake up
   305         # the queue management thread.
   306         def weakref_cb(_, q=self._result_queue):
   307             q.put(None)
   308         if self._queue_management_thread is None:
   309             self._queue_management_thread = threading.Thread(
   310                     target=_queue_management_worker,
   311                     args=(weakref.ref(self, weakref_cb),
   312                           self._processes,
   313                           self._pending_work_items,
   314                           self._work_ids,
   315                           self._call_queue,
   316                           self._result_queue))
   317             self._queue_management_thread.daemon = True
   318             self._queue_management_thread.start()
   319             _threads_queues[self._queue_management_thread] = self._result_queue
   321     def _adjust_process_count(self):
   322         for _ in range(len(self._processes), self._max_workers):
   323             p = multiprocessing.Process(
   324                     target=_process_worker,
   325                     args=(self._call_queue,
   326                           self._result_queue))
   327             p.start()
   328             self._processes.add(p)
   330     def submit(self, fn, *args, **kwargs):
   331         with self._shutdown_lock:
   332             if self._shutdown_thread:
   333                 raise RuntimeError('cannot schedule new futures after shutdown')
   335             f = _base.Future()
   336             w = _WorkItem(f, fn, args, kwargs)
   338             self._pending_work_items[self._queue_count] = w
   339             self._work_ids.put(self._queue_count)
   340             self._queue_count += 1
   341             # Wake up queue management thread
   342             self._result_queue.put(None)
   344             self._start_queue_management_thread()
   345             self._adjust_process_count()
   346             return f
   347     submit.__doc__ = _base.Executor.submit.__doc__
   349     def shutdown(self, wait=True):
   350         with self._shutdown_lock:
   351             self._shutdown_thread = True
   352         if self._queue_management_thread:
   353             # Wake up queue management thread
   354             self._result_queue.put(None)
   355             if wait:
   356                 self._queue_management_thread.join(sys.maxint)
   357         # To reduce the risk of openning too many files, remove references to
   358         # objects that use file descriptors.
   359         self._queue_management_thread = None
   360         self._call_queue = None
   361         self._result_queue = None
   362         self._processes = None
   363     shutdown.__doc__ = _base.Executor.shutdown.__doc__
   365 atexit.register(_python_exit)