1 # Copyright 2009 Brian Quinlan. All Rights Reserved. |
|
2 # Licensed to PSF under a Contributor Agreement. |
|
3 |
|
4 """Implements ProcessPoolExecutor. |
|
5 |
|
6 The follow diagram and text describe the data-flow through the system: |
|
7 |
|
8 |======================= In-process =====================|== Out-of-process ==| |
|
9 |
|
10 +----------+ +----------+ +--------+ +-----------+ +---------+ |
|
11 | | => | Work Ids | => | | => | Call Q | => | | |
|
12 | | +----------+ | | +-----------+ | | |
|
13 | | | ... | | | | ... | | | |
|
14 | | | 6 | | | | 5, call() | | | |
|
15 | | | 7 | | | | ... | | | |
|
16 | Process | | ... | | Local | +-----------+ | Process | |
|
17 | Pool | +----------+ | Worker | | #1..n | |
|
18 | Executor | | Thread | | | |
|
19 | | +----------- + | | +-----------+ | | |
|
20 | | <=> | Work Items | <=> | | <= | Result Q | <= | | |
|
21 | | +------------+ | | +-----------+ | | |
|
22 | | | 6: call() | | | | ... | | | |
|
23 | | | future | | | | 4, result | | | |
|
24 | | | ... | | | | 3, except | | | |
|
25 +----------+ +------------+ +--------+ +-----------+ +---------+ |
|
26 |
|
27 Executor.submit() called: |
|
28 - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict |
|
29 - adds the id of the _WorkItem to the "Work Ids" queue |
|
30 |
|
31 Local worker thread: |
|
32 - reads work ids from the "Work Ids" queue and looks up the corresponding |
|
33 WorkItem from the "Work Items" dict: if the work item has been cancelled then |
|
34 it is simply removed from the dict, otherwise it is repackaged as a |
|
35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" |
|
36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because |
|
37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). |
|
38 - reads _ResultItems from "Result Q", updates the future stored in the |
|
39 "Work Items" dict and deletes the dict entry |
|
40 |
|
41 Process #1..n: |
|
42 - reads _CallItems from "Call Q", executes the calls, and puts the resulting |
|
43 _ResultItems in "Request Q" |
|
44 """ |
|
45 |
|
46 from __future__ import absolute_import |
|
47 |
|
48 import atexit |
|
49 from . import _base |
|
50 import Queue as queue |
|
51 import multiprocessing |
|
52 import threading |
|
53 import weakref |
|
54 import sys |
|
55 |
|
56 __author__ = 'Brian Quinlan (brian@sweetapp.com)' |
|
57 |
|
58 # Workers are created as daemon threads and processes. This is done to allow the |
|
59 # interpreter to exit when there are still idle processes in a |
|
60 # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, |
|
61 # allowing workers to die with the interpreter has two undesirable properties: |
|
62 # - The workers would still be running during interpretor shutdown, |
|
63 # meaning that they would fail in unpredictable ways. |
|
64 # - The workers could be killed while evaluating a work item, which could |
|
65 # be bad if the callable being evaluated has external side-effects e.g. |
|
66 # writing to a file. |
|
67 # |
|
68 # To work around this problem, an exit handler is installed which tells the |
|
69 # workers to exit when their work queues are empty and then waits until the |
|
70 # threads/processes finish. |
|
71 |
|
72 _threads_queues = weakref.WeakKeyDictionary() |
|
73 _shutdown = False |
|
74 |
|
75 def _python_exit(): |
|
76 global _shutdown |
|
77 _shutdown = True |
|
78 items = list(_threads_queues.items()) if _threads_queues else () |
|
79 for t, q in items: |
|
80 q.put(None) |
|
81 for t, q in items: |
|
82 t.join(sys.maxint) |
|
83 |
|
84 # Controls how many more calls than processes will be queued in the call queue. |
|
85 # A smaller number will mean that processes spend more time idle waiting for |
|
86 # work while a larger number will make Future.cancel() succeed less frequently |
|
87 # (Futures in the call queue cannot be cancelled). |
|
88 EXTRA_QUEUED_CALLS = 1 |
|
89 |
|
90 class _WorkItem(object): |
|
91 def __init__(self, future, fn, args, kwargs): |
|
92 self.future = future |
|
93 self.fn = fn |
|
94 self.args = args |
|
95 self.kwargs = kwargs |
|
96 |
|
97 class _ResultItem(object): |
|
98 def __init__(self, work_id, exception=None, result=None): |
|
99 self.work_id = work_id |
|
100 self.exception = exception |
|
101 self.result = result |
|
102 |
|
103 class _CallItem(object): |
|
104 def __init__(self, work_id, fn, args, kwargs): |
|
105 self.work_id = work_id |
|
106 self.fn = fn |
|
107 self.args = args |
|
108 self.kwargs = kwargs |
|
109 |
|
110 def _process_worker(call_queue, result_queue): |
|
111 """Evaluates calls from call_queue and places the results in result_queue. |
|
112 |
|
113 This worker is run in a separate process. |
|
114 |
|
115 Args: |
|
116 call_queue: A multiprocessing.Queue of _CallItems that will be read and |
|
117 evaluated by the worker. |
|
118 result_queue: A multiprocessing.Queue of _ResultItems that will written |
|
119 to by the worker. |
|
120 shutdown: A multiprocessing.Event that will be set as a signal to the |
|
121 worker that it should exit when call_queue is empty. |
|
122 """ |
|
123 while True: |
|
124 call_item = call_queue.get(block=True) |
|
125 if call_item is None: |
|
126 # Wake up queue management thread |
|
127 result_queue.put(None) |
|
128 return |
|
129 try: |
|
130 r = call_item.fn(*call_item.args, **call_item.kwargs) |
|
131 except: |
|
132 e = sys.exc_info()[1] |
|
133 result_queue.put(_ResultItem(call_item.work_id, |
|
134 exception=e)) |
|
135 else: |
|
136 result_queue.put(_ResultItem(call_item.work_id, |
|
137 result=r)) |
|
138 |
|
139 def _add_call_item_to_queue(pending_work_items, |
|
140 work_ids, |
|
141 call_queue): |
|
142 """Fills call_queue with _WorkItems from pending_work_items. |
|
143 |
|
144 This function never blocks. |
|
145 |
|
146 Args: |
|
147 pending_work_items: A dict mapping work ids to _WorkItems e.g. |
|
148 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
|
149 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids |
|
150 are consumed and the corresponding _WorkItems from |
|
151 pending_work_items are transformed into _CallItems and put in |
|
152 call_queue. |
|
153 call_queue: A multiprocessing.Queue that will be filled with _CallItems |
|
154 derived from _WorkItems. |
|
155 """ |
|
156 while True: |
|
157 if call_queue.full(): |
|
158 return |
|
159 try: |
|
160 work_id = work_ids.get(block=False) |
|
161 except queue.Empty: |
|
162 return |
|
163 else: |
|
164 work_item = pending_work_items[work_id] |
|
165 |
|
166 if work_item.future.set_running_or_notify_cancel(): |
|
167 call_queue.put(_CallItem(work_id, |
|
168 work_item.fn, |
|
169 work_item.args, |
|
170 work_item.kwargs), |
|
171 block=True) |
|
172 else: |
|
173 del pending_work_items[work_id] |
|
174 continue |
|
175 |
|
176 def _queue_management_worker(executor_reference, |
|
177 processes, |
|
178 pending_work_items, |
|
179 work_ids_queue, |
|
180 call_queue, |
|
181 result_queue): |
|
182 """Manages the communication between this process and the worker processes. |
|
183 |
|
184 This function is run in a local thread. |
|
185 |
|
186 Args: |
|
187 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns |
|
188 this thread. Used to determine if the ProcessPoolExecutor has been |
|
189 garbage collected and that this function can exit. |
|
190 process: A list of the multiprocessing.Process instances used as |
|
191 workers. |
|
192 pending_work_items: A dict mapping work ids to _WorkItems e.g. |
|
193 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} |
|
194 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). |
|
195 call_queue: A multiprocessing.Queue that will be filled with _CallItems |
|
196 derived from _WorkItems for processing by the process workers. |
|
197 result_queue: A multiprocessing.Queue of _ResultItems generated by the |
|
198 process workers. |
|
199 """ |
|
200 nb_shutdown_processes = [0] |
|
201 def shutdown_one_process(): |
|
202 """Tell a worker to terminate, which will in turn wake us again""" |
|
203 call_queue.put(None) |
|
204 nb_shutdown_processes[0] += 1 |
|
205 while True: |
|
206 _add_call_item_to_queue(pending_work_items, |
|
207 work_ids_queue, |
|
208 call_queue) |
|
209 |
|
210 result_item = result_queue.get(block=True) |
|
211 if result_item is not None: |
|
212 work_item = pending_work_items[result_item.work_id] |
|
213 del pending_work_items[result_item.work_id] |
|
214 |
|
215 if result_item.exception: |
|
216 work_item.future.set_exception(result_item.exception) |
|
217 else: |
|
218 work_item.future.set_result(result_item.result) |
|
219 # Delete references to object. See issue16284 |
|
220 del work_item |
|
221 # Check whether we should start shutting down. |
|
222 executor = executor_reference() |
|
223 # No more work items can be added if: |
|
224 # - The interpreter is shutting down OR |
|
225 # - The executor that owns this worker has been collected OR |
|
226 # - The executor that owns this worker has been shutdown. |
|
227 if _shutdown or executor is None or executor._shutdown_thread: |
|
228 # Since no new work items can be added, it is safe to shutdown |
|
229 # this thread if there are no pending work items. |
|
230 if not pending_work_items: |
|
231 while nb_shutdown_processes[0] < len(processes): |
|
232 shutdown_one_process() |
|
233 # If .join() is not called on the created processes then |
|
234 # some multiprocessing.Queue methods may deadlock on Mac OS |
|
235 # X. |
|
236 for p in processes: |
|
237 p.join() |
|
238 call_queue.close() |
|
239 return |
|
240 del executor |
|
241 |
|
242 _system_limits_checked = False |
|
243 _system_limited = None |
|
244 def _check_system_limits(): |
|
245 global _system_limits_checked, _system_limited |
|
246 if _system_limits_checked: |
|
247 if _system_limited: |
|
248 raise NotImplementedError(_system_limited) |
|
249 _system_limits_checked = True |
|
250 try: |
|
251 import os |
|
252 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") |
|
253 except (AttributeError, ValueError): |
|
254 # sysconf not available or setting not available |
|
255 return |
|
256 if nsems_max == -1: |
|
257 # indetermine limit, assume that limit is determined |
|
258 # by available memory only |
|
259 return |
|
260 if nsems_max >= 256: |
|
261 # minimum number of semaphores available |
|
262 # according to POSIX |
|
263 return |
|
264 _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max |
|
265 raise NotImplementedError(_system_limited) |
|
266 |
|
267 |
|
268 class ProcessPoolExecutor(_base.Executor): |
|
269 def __init__(self, max_workers=None): |
|
270 """Initializes a new ProcessPoolExecutor instance. |
|
271 |
|
272 Args: |
|
273 max_workers: The maximum number of processes that can be used to |
|
274 execute the given calls. If None or not given then as many |
|
275 worker processes will be created as the machine has processors. |
|
276 """ |
|
277 _check_system_limits() |
|
278 |
|
279 if max_workers is None: |
|
280 self._max_workers = multiprocessing.cpu_count() |
|
281 else: |
|
282 if max_workers <= 0: |
|
283 raise ValueError("max_workers must be greater than 0") |
|
284 |
|
285 self._max_workers = max_workers |
|
286 |
|
287 # Make the call queue slightly larger than the number of processes to |
|
288 # prevent the worker processes from idling. But don't make it too big |
|
289 # because futures in the call queue cannot be cancelled. |
|
290 self._call_queue = multiprocessing.Queue(self._max_workers + |
|
291 EXTRA_QUEUED_CALLS) |
|
292 self._result_queue = multiprocessing.Queue() |
|
293 self._work_ids = queue.Queue() |
|
294 self._queue_management_thread = None |
|
295 self._processes = set() |
|
296 |
|
297 # Shutdown is a two-step process. |
|
298 self._shutdown_thread = False |
|
299 self._shutdown_lock = threading.Lock() |
|
300 self._queue_count = 0 |
|
301 self._pending_work_items = {} |
|
302 |
|
303 def _start_queue_management_thread(self): |
|
304 # When the executor gets lost, the weakref callback will wake up |
|
305 # the queue management thread. |
|
306 def weakref_cb(_, q=self._result_queue): |
|
307 q.put(None) |
|
308 if self._queue_management_thread is None: |
|
309 self._queue_management_thread = threading.Thread( |
|
310 target=_queue_management_worker, |
|
311 args=(weakref.ref(self, weakref_cb), |
|
312 self._processes, |
|
313 self._pending_work_items, |
|
314 self._work_ids, |
|
315 self._call_queue, |
|
316 self._result_queue)) |
|
317 self._queue_management_thread.daemon = True |
|
318 self._queue_management_thread.start() |
|
319 _threads_queues[self._queue_management_thread] = self._result_queue |
|
320 |
|
321 def _adjust_process_count(self): |
|
322 for _ in range(len(self._processes), self._max_workers): |
|
323 p = multiprocessing.Process( |
|
324 target=_process_worker, |
|
325 args=(self._call_queue, |
|
326 self._result_queue)) |
|
327 p.start() |
|
328 self._processes.add(p) |
|
329 |
|
330 def submit(self, fn, *args, **kwargs): |
|
331 with self._shutdown_lock: |
|
332 if self._shutdown_thread: |
|
333 raise RuntimeError('cannot schedule new futures after shutdown') |
|
334 |
|
335 f = _base.Future() |
|
336 w = _WorkItem(f, fn, args, kwargs) |
|
337 |
|
338 self._pending_work_items[self._queue_count] = w |
|
339 self._work_ids.put(self._queue_count) |
|
340 self._queue_count += 1 |
|
341 # Wake up queue management thread |
|
342 self._result_queue.put(None) |
|
343 |
|
344 self._start_queue_management_thread() |
|
345 self._adjust_process_count() |
|
346 return f |
|
347 submit.__doc__ = _base.Executor.submit.__doc__ |
|
348 |
|
349 def shutdown(self, wait=True): |
|
350 with self._shutdown_lock: |
|
351 self._shutdown_thread = True |
|
352 if self._queue_management_thread: |
|
353 # Wake up queue management thread |
|
354 self._result_queue.put(None) |
|
355 if wait: |
|
356 self._queue_management_thread.join(sys.maxint) |
|
357 # To reduce the risk of openning too many files, remove references to |
|
358 # objects that use file descriptors. |
|
359 self._queue_management_thread = None |
|
360 self._call_queue = None |
|
361 self._result_queue = None |
|
362 self._processes = None |
|
363 shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
|
364 |
|
365 atexit.register(_python_exit) |
|