mercurial/thirdparty/concurrent/futures/thread.py
changeset 48836 e020dbbc41e5
parent 48835 a0da5075bca3
child 48837 12cba4886e90
equal deleted inserted replaced
48835:a0da5075bca3 48836:e020dbbc41e5
     1 # Copyright 2009 Brian Quinlan. All Rights Reserved.
       
     2 # Licensed to PSF under a Contributor Agreement.
       
     3 
       
     4 """Implements ThreadPoolExecutor."""
       
     5 
       
     6 from __future__ import absolute_import
       
     7 
       
     8 import atexit
       
     9 from . import _base
       
    10 import itertools
       
    11 import Queue as queue
       
    12 import threading
       
    13 import weakref
       
    14 import sys
       
    15 
       
    16 try:
       
    17     from multiprocessing import cpu_count
       
    18 except ImportError:
       
    19     # some platforms don't have multiprocessing
       
    20     def cpu_count():
       
    21         return None
       
    22 
       
    23 __author__ = 'Brian Quinlan (brian@sweetapp.com)'
       
    24 
       
    25 # Workers are created as daemon threads. This is done to allow the interpreter
       
    26 # to exit when there are still idle threads in a ThreadPoolExecutor's thread
       
    27 # pool (i.e. shutdown() was not called). However, allowing workers to die with
       
    28 # the interpreter has two undesirable properties:
       
    29 #   - The workers would still be running during interpretor shutdown,
       
    30 #     meaning that they would fail in unpredictable ways.
       
    31 #   - The workers could be killed while evaluating a work item, which could
       
    32 #     be bad if the callable being evaluated has external side-effects e.g.
       
    33 #     writing to a file.
       
    34 #
       
    35 # To work around this problem, an exit handler is installed which tells the
       
    36 # workers to exit when their work queues are empty and then waits until the
       
    37 # threads finish.
       
    38 
       
    39 _threads_queues = weakref.WeakKeyDictionary()
       
    40 _shutdown = False
       
    41 
       
    42 def _python_exit():
       
    43     global _shutdown
       
    44     _shutdown = True
       
    45     items = list(_threads_queues.items()) if _threads_queues else ()
       
    46     for t, q in items:
       
    47         q.put(None)
       
    48     for t, q in items:
       
    49         t.join(sys.maxint)
       
    50 
       
    51 atexit.register(_python_exit)
       
    52 
       
    53 class _WorkItem(object):
       
    54     def __init__(self, future, fn, args, kwargs):
       
    55         self.future = future
       
    56         self.fn = fn
       
    57         self.args = args
       
    58         self.kwargs = kwargs
       
    59 
       
    60     def run(self):
       
    61         if not self.future.set_running_or_notify_cancel():
       
    62             return
       
    63 
       
    64         try:
       
    65             result = self.fn(*self.args, **self.kwargs)
       
    66         except:
       
    67             e, tb = sys.exc_info()[1:]
       
    68             self.future.set_exception_info(e, tb)
       
    69         else:
       
    70             self.future.set_result(result)
       
    71 
       
    72 def _worker(executor_reference, work_queue):
       
    73     try:
       
    74         while True:
       
    75             work_item = work_queue.get(block=True)
       
    76             if work_item is not None:
       
    77                 work_item.run()
       
    78                 # Delete references to object. See issue16284
       
    79                 del work_item
       
    80                 continue
       
    81             executor = executor_reference()
       
    82             # Exit if:
       
    83             #   - The interpreter is shutting down OR
       
    84             #   - The executor that owns the worker has been collected OR
       
    85             #   - The executor that owns the worker has been shutdown.
       
    86             if _shutdown or executor is None or executor._shutdown:
       
    87                 # Notice other workers
       
    88                 work_queue.put(None)
       
    89                 return
       
    90             del executor
       
    91     except:
       
    92         _base.LOGGER.critical('Exception in worker', exc_info=True)
       
    93 
       
    94 
       
    95 class ThreadPoolExecutor(_base.Executor):
       
    96 
       
    97     # Used to assign unique thread names when thread_name_prefix is not supplied.
       
    98     _counter = itertools.count().next
       
    99 
       
   100     def __init__(self, max_workers=None, thread_name_prefix=''):
       
   101         """Initializes a new ThreadPoolExecutor instance.
       
   102 
       
   103         Args:
       
   104             max_workers: The maximum number of threads that can be used to
       
   105                 execute the given calls.
       
   106             thread_name_prefix: An optional name prefix to give our threads.
       
   107         """
       
   108         if max_workers is None:
       
   109             # Use this number because ThreadPoolExecutor is often
       
   110             # used to overlap I/O instead of CPU work.
       
   111             max_workers = (cpu_count() or 1) * 5
       
   112         if max_workers <= 0:
       
   113             raise ValueError("max_workers must be greater than 0")
       
   114 
       
   115         self._max_workers = max_workers
       
   116         self._work_queue = queue.Queue()
       
   117         self._threads = set()
       
   118         self._shutdown = False
       
   119         self._shutdown_lock = threading.Lock()
       
   120         self._thread_name_prefix = (thread_name_prefix or
       
   121                                     ("ThreadPoolExecutor-%d" % self._counter()))
       
   122 
       
   123     def submit(self, fn, *args, **kwargs):
       
   124         with self._shutdown_lock:
       
   125             if self._shutdown:
       
   126                 raise RuntimeError('cannot schedule new futures after shutdown')
       
   127 
       
   128             f = _base.Future()
       
   129             w = _WorkItem(f, fn, args, kwargs)
       
   130 
       
   131             self._work_queue.put(w)
       
   132             self._adjust_thread_count()
       
   133             return f
       
   134     submit.__doc__ = _base.Executor.submit.__doc__
       
   135 
       
   136     def _adjust_thread_count(self):
       
   137         # When the executor gets lost, the weakref callback will wake up
       
   138         # the worker threads.
       
   139         def weakref_cb(_, q=self._work_queue):
       
   140             q.put(None)
       
   141         # TODO(bquinlan): Should avoid creating new threads if there are more
       
   142         # idle threads than items in the work queue.
       
   143         num_threads = len(self._threads)
       
   144         if num_threads < self._max_workers:
       
   145             thread_name = '%s_%d' % (self._thread_name_prefix or self,
       
   146                                      num_threads)
       
   147             t = threading.Thread(name=thread_name, target=_worker,
       
   148                                  args=(weakref.ref(self, weakref_cb),
       
   149                                        self._work_queue))
       
   150             t.daemon = True
       
   151             t.start()
       
   152             self._threads.add(t)
       
   153             _threads_queues[t] = self._work_queue
       
   154 
       
   155     def shutdown(self, wait=True):
       
   156         with self._shutdown_lock:
       
   157             self._shutdown = True
       
   158             self._work_queue.put(None)
       
   159         if wait:
       
   160             for t in self._threads:
       
   161                 t.join(sys.maxint)
       
   162     shutdown.__doc__ = _base.Executor.shutdown.__doc__