1 # Copyright 2009 Brian Quinlan. All Rights Reserved. |
|
2 # Licensed to PSF under a Contributor Agreement. |
|
3 |
|
4 """Implements ThreadPoolExecutor.""" |
|
5 |
|
6 from __future__ import absolute_import |
|
7 |
|
8 import atexit |
|
9 from . import _base |
|
10 import itertools |
|
11 import Queue as queue |
|
12 import threading |
|
13 import weakref |
|
14 import sys |
|
15 |
|
16 try: |
|
17 from multiprocessing import cpu_count |
|
18 except ImportError: |
|
19 # some platforms don't have multiprocessing |
|
20 def cpu_count(): |
|
21 return None |
|
22 |
|
23 __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
|
24 |
|
25 # Workers are created as daemon threads. This is done to allow the interpreter |
|
26 # to exit when there are still idle threads in a ThreadPoolExecutor's thread |
|
27 # pool (i.e. shutdown() was not called). However, allowing workers to die with |
|
28 # the interpreter has two undesirable properties: |
|
29 # - The workers would still be running during interpretor shutdown, |
|
30 # meaning that they would fail in unpredictable ways. |
|
31 # - The workers could be killed while evaluating a work item, which could |
|
32 # be bad if the callable being evaluated has external side-effects e.g. |
|
33 # writing to a file. |
|
34 # |
|
35 # To work around this problem, an exit handler is installed which tells the |
|
36 # workers to exit when their work queues are empty and then waits until the |
|
37 # threads finish. |
|
38 |
|
39 _threads_queues = weakref.WeakKeyDictionary() |
|
40 _shutdown = False |
|
41 |
|
42 def _python_exit(): |
|
43 global _shutdown |
|
44 _shutdown = True |
|
45 items = list(_threads_queues.items()) if _threads_queues else () |
|
46 for t, q in items: |
|
47 q.put(None) |
|
48 for t, q in items: |
|
49 t.join(sys.maxint) |
|
50 |
|
51 atexit.register(_python_exit) |
|
52 |
|
53 class _WorkItem(object): |
|
54 def __init__(self, future, fn, args, kwargs): |
|
55 self.future = future |
|
56 self.fn = fn |
|
57 self.args = args |
|
58 self.kwargs = kwargs |
|
59 |
|
60 def run(self): |
|
61 if not self.future.set_running_or_notify_cancel(): |
|
62 return |
|
63 |
|
64 try: |
|
65 result = self.fn(*self.args, **self.kwargs) |
|
66 except: |
|
67 e, tb = sys.exc_info()[1:] |
|
68 self.future.set_exception_info(e, tb) |
|
69 else: |
|
70 self.future.set_result(result) |
|
71 |
|
72 def _worker(executor_reference, work_queue): |
|
73 try: |
|
74 while True: |
|
75 work_item = work_queue.get(block=True) |
|
76 if work_item is not None: |
|
77 work_item.run() |
|
78 # Delete references to object. See issue16284 |
|
79 del work_item |
|
80 continue |
|
81 executor = executor_reference() |
|
82 # Exit if: |
|
83 # - The interpreter is shutting down OR |
|
84 # - The executor that owns the worker has been collected OR |
|
85 # - The executor that owns the worker has been shutdown. |
|
86 if _shutdown or executor is None or executor._shutdown: |
|
87 # Notice other workers |
|
88 work_queue.put(None) |
|
89 return |
|
90 del executor |
|
91 except: |
|
92 _base.LOGGER.critical('Exception in worker', exc_info=True) |
|
93 |
|
94 |
|
95 class ThreadPoolExecutor(_base.Executor): |
|
96 |
|
97 # Used to assign unique thread names when thread_name_prefix is not supplied. |
|
98 _counter = itertools.count().next |
|
99 |
|
100 def __init__(self, max_workers=None, thread_name_prefix=''): |
|
101 """Initializes a new ThreadPoolExecutor instance. |
|
102 |
|
103 Args: |
|
104 max_workers: The maximum number of threads that can be used to |
|
105 execute the given calls. |
|
106 thread_name_prefix: An optional name prefix to give our threads. |
|
107 """ |
|
108 if max_workers is None: |
|
109 # Use this number because ThreadPoolExecutor is often |
|
110 # used to overlap I/O instead of CPU work. |
|
111 max_workers = (cpu_count() or 1) * 5 |
|
112 if max_workers <= 0: |
|
113 raise ValueError("max_workers must be greater than 0") |
|
114 |
|
115 self._max_workers = max_workers |
|
116 self._work_queue = queue.Queue() |
|
117 self._threads = set() |
|
118 self._shutdown = False |
|
119 self._shutdown_lock = threading.Lock() |
|
120 self._thread_name_prefix = (thread_name_prefix or |
|
121 ("ThreadPoolExecutor-%d" % self._counter())) |
|
122 |
|
123 def submit(self, fn, *args, **kwargs): |
|
124 with self._shutdown_lock: |
|
125 if self._shutdown: |
|
126 raise RuntimeError('cannot schedule new futures after shutdown') |
|
127 |
|
128 f = _base.Future() |
|
129 w = _WorkItem(f, fn, args, kwargs) |
|
130 |
|
131 self._work_queue.put(w) |
|
132 self._adjust_thread_count() |
|
133 return f |
|
134 submit.__doc__ = _base.Executor.submit.__doc__ |
|
135 |
|
136 def _adjust_thread_count(self): |
|
137 # When the executor gets lost, the weakref callback will wake up |
|
138 # the worker threads. |
|
139 def weakref_cb(_, q=self._work_queue): |
|
140 q.put(None) |
|
141 # TODO(bquinlan): Should avoid creating new threads if there are more |
|
142 # idle threads than items in the work queue. |
|
143 num_threads = len(self._threads) |
|
144 if num_threads < self._max_workers: |
|
145 thread_name = '%s_%d' % (self._thread_name_prefix or self, |
|
146 num_threads) |
|
147 t = threading.Thread(name=thread_name, target=_worker, |
|
148 args=(weakref.ref(self, weakref_cb), |
|
149 self._work_queue)) |
|
150 t.daemon = True |
|
151 t.start() |
|
152 self._threads.add(t) |
|
153 _threads_queues[t] = self._work_queue |
|
154 |
|
155 def shutdown(self, wait=True): |
|
156 with self._shutdown_lock: |
|
157 self._shutdown = True |
|
158 self._work_queue.put(None) |
|
159 if wait: |
|
160 for t in self._threads: |
|
161 t.join(sys.maxint) |
|
162 shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
|