• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The follow diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |    => |        |  => | Call Q    | => |         |
12|          |     +----------+       |        |     +-----------+    |         |
13|          |     | ...      |       |        |     | ...       |    |         |
14|          |     | 6        |       |        |     | 5, call() |    |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Request Q"
44"""
45
46import atexit
47from concurrent.futures import _base
48import Queue as queue
49import multiprocessing
50import threading
51import weakref
52import sys
53
54__author__ = 'Brian Quinlan (brian@sweetapp.com)'
55
56# Workers are created as daemon threads and processes. This is done to allow the
57# interpreter to exit when there are still idle processes in a
58# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
59# allowing workers to die with the interpreter has two undesirable properties:
60#   - The workers would still be running during interpretor shutdown,
61#     meaning that they would fail in unpredictable ways.
62#   - The workers could be killed while evaluating a work item, which could
63#     be bad if the callable being evaluated has external side-effects e.g.
64#     writing to a file.
65#
66# To work around this problem, an exit handler is installed which tells the
67# workers to exit when their work queues are empty and then waits until the
68# threads/processes finish.
69
70_threads_queues = weakref.WeakKeyDictionary()
71_shutdown = False
72
73def _python_exit():
74    global _shutdown
75    _shutdown = True
76    items = list(_threads_queues.items()) if _threads_queues else ()
77    for t, q in items:
78        q.put(None)
79    for t, q in items:
80        t.join(sys.maxint)
81
82# Controls how many more calls than processes will be queued in the call queue.
83# A smaller number will mean that processes spend more time idle waiting for
84# work while a larger number will make Future.cancel() succeed less frequently
85# (Futures in the call queue cannot be cancelled).
86EXTRA_QUEUED_CALLS = 1
87
88class _WorkItem(object):
89    def __init__(self, future, fn, args, kwargs):
90        self.future = future
91        self.fn = fn
92        self.args = args
93        self.kwargs = kwargs
94
95class _ResultItem(object):
96    def __init__(self, work_id, exception=None, result=None):
97        self.work_id = work_id
98        self.exception = exception
99        self.result = result
100
101class _CallItem(object):
102    def __init__(self, work_id, fn, args, kwargs):
103        self.work_id = work_id
104        self.fn = fn
105        self.args = args
106        self.kwargs = kwargs
107
108def _process_worker(call_queue, result_queue):
109    """Evaluates calls from call_queue and places the results in result_queue.
110
111    This worker is run in a separate process.
112
113    Args:
114        call_queue: A multiprocessing.Queue of _CallItems that will be read and
115            evaluated by the worker.
116        result_queue: A multiprocessing.Queue of _ResultItems that will written
117            to by the worker.
118        shutdown: A multiprocessing.Event that will be set as a signal to the
119            worker that it should exit when call_queue is empty.
120    """
121    while True:
122        call_item = call_queue.get(block=True)
123        if call_item is None:
124            # Wake up queue management thread
125            result_queue.put(None)
126            return
127        try:
128            r = call_item.fn(*call_item.args, **call_item.kwargs)
129        except:
130            e = sys.exc_info()[1]
131            result_queue.put(_ResultItem(call_item.work_id,
132                                         exception=e))
133        else:
134            result_queue.put(_ResultItem(call_item.work_id,
135                                         result=r))
136
137def _add_call_item_to_queue(pending_work_items,
138                            work_ids,
139                            call_queue):
140    """Fills call_queue with _WorkItems from pending_work_items.
141
142    This function never blocks.
143
144    Args:
145        pending_work_items: A dict mapping work ids to _WorkItems e.g.
146            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
147        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
148            are consumed and the corresponding _WorkItems from
149            pending_work_items are transformed into _CallItems and put in
150            call_queue.
151        call_queue: A multiprocessing.Queue that will be filled with _CallItems
152            derived from _WorkItems.
153    """
154    while True:
155        if call_queue.full():
156            return
157        try:
158            work_id = work_ids.get(block=False)
159        except queue.Empty:
160            return
161        else:
162            work_item = pending_work_items[work_id]
163
164            if work_item.future.set_running_or_notify_cancel():
165                call_queue.put(_CallItem(work_id,
166                                         work_item.fn,
167                                         work_item.args,
168                                         work_item.kwargs),
169                               block=True)
170            else:
171                del pending_work_items[work_id]
172                continue
173
174def _queue_management_worker(executor_reference,
175                             processes,
176                             pending_work_items,
177                             work_ids_queue,
178                             call_queue,
179                             result_queue):
180    """Manages the communication between this process and the worker processes.
181
182    This function is run in a local thread.
183
184    Args:
185        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
186            this thread. Used to determine if the ProcessPoolExecutor has been
187            garbage collected and that this function can exit.
188        process: A list of the multiprocessing.Process instances used as
189            workers.
190        pending_work_items: A dict mapping work ids to _WorkItems e.g.
191            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
192        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
193        call_queue: A multiprocessing.Queue that will be filled with _CallItems
194            derived from _WorkItems for processing by the process workers.
195        result_queue: A multiprocessing.Queue of _ResultItems generated by the
196            process workers.
197    """
198    nb_shutdown_processes = [0]
199    def shutdown_one_process():
200        """Tell a worker to terminate, which will in turn wake us again"""
201        call_queue.put(None)
202        nb_shutdown_processes[0] += 1
203    while True:
204        _add_call_item_to_queue(pending_work_items,
205                                work_ids_queue,
206                                call_queue)
207
208        result_item = result_queue.get(block=True)
209        if result_item is not None:
210            work_item = pending_work_items[result_item.work_id]
211            del pending_work_items[result_item.work_id]
212
213            if result_item.exception:
214                work_item.future.set_exception(result_item.exception)
215            else:
216                work_item.future.set_result(result_item.result)
217            # Delete references to object. See issue16284
218            del work_item
219        # Check whether we should start shutting down.
220        executor = executor_reference()
221        # No more work items can be added if:
222        #   - The interpreter is shutting down OR
223        #   - The executor that owns this worker has been collected OR
224        #   - The executor that owns this worker has been shutdown.
225        if _shutdown or executor is None or executor._shutdown_thread:
226            # Since no new work items can be added, it is safe to shutdown
227            # this thread if there are no pending work items.
228            if not pending_work_items:
229                while nb_shutdown_processes[0] < len(processes):
230                    shutdown_one_process()
231                # If .join() is not called on the created processes then
232                # some multiprocessing.Queue methods may deadlock on Mac OS
233                # X.
234                for p in processes:
235                    p.join()
236                call_queue.close()
237                return
238        del executor
239
240_system_limits_checked = False
241_system_limited = None
242def _check_system_limits():
243    global _system_limits_checked, _system_limited
244    if _system_limits_checked:
245        if _system_limited:
246            raise NotImplementedError(_system_limited)
247    _system_limits_checked = True
248    try:
249        import os
250        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
251    except (AttributeError, ValueError):
252        # sysconf not available or setting not available
253        return
254    if nsems_max == -1:
255        # indetermine limit, assume that limit is determined
256        # by available memory only
257        return
258    if nsems_max >= 256:
259        # minimum number of semaphores available
260        # according to POSIX
261        return
262    _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
263    raise NotImplementedError(_system_limited)
264
265
266class ProcessPoolExecutor(_base.Executor):
267    def __init__(self, max_workers=None):
268        """Initializes a new ProcessPoolExecutor instance.
269
270        Args:
271            max_workers: The maximum number of processes that can be used to
272                execute the given calls. If None or not given then as many
273                worker processes will be created as the machine has processors.
274        """
275        _check_system_limits()
276
277        if max_workers is None:
278            self._max_workers = multiprocessing.cpu_count()
279        else:
280            if max_workers <= 0:
281                raise ValueError("max_workers must be greater than 0")
282
283            self._max_workers = max_workers
284
285        # Make the call queue slightly larger than the number of processes to
286        # prevent the worker processes from idling. But don't make it too big
287        # because futures in the call queue cannot be cancelled.
288        self._call_queue = multiprocessing.Queue(self._max_workers +
289                                                 EXTRA_QUEUED_CALLS)
290        self._result_queue = multiprocessing.Queue()
291        self._work_ids = queue.Queue()
292        self._queue_management_thread = None
293        self._processes = set()
294
295        # Shutdown is a two-step process.
296        self._shutdown_thread = False
297        self._shutdown_lock = threading.Lock()
298        self._queue_count = 0
299        self._pending_work_items = {}
300
301    def _start_queue_management_thread(self):
302        # When the executor gets lost, the weakref callback will wake up
303        # the queue management thread.
304        def weakref_cb(_, q=self._result_queue):
305            q.put(None)
306        if self._queue_management_thread is None:
307            self._queue_management_thread = threading.Thread(
308                    target=_queue_management_worker,
309                    args=(weakref.ref(self, weakref_cb),
310                          self._processes,
311                          self._pending_work_items,
312                          self._work_ids,
313                          self._call_queue,
314                          self._result_queue))
315            self._queue_management_thread.daemon = True
316            self._queue_management_thread.start()
317            _threads_queues[self._queue_management_thread] = self._result_queue
318
319    def _adjust_process_count(self):
320        for _ in range(len(self._processes), self._max_workers):
321            p = multiprocessing.Process(
322                    target=_process_worker,
323                    args=(self._call_queue,
324                          self._result_queue))
325            p.start()
326            self._processes.add(p)
327
328    def submit(self, fn, *args, **kwargs):
329        with self._shutdown_lock:
330            if self._shutdown_thread:
331                raise RuntimeError('cannot schedule new futures after shutdown')
332
333            f = _base.Future()
334            w = _WorkItem(f, fn, args, kwargs)
335
336            self._pending_work_items[self._queue_count] = w
337            self._work_ids.put(self._queue_count)
338            self._queue_count += 1
339            # Wake up queue management thread
340            self._result_queue.put(None)
341
342            self._start_queue_management_thread()
343            self._adjust_process_count()
344            return f
345    submit.__doc__ = _base.Executor.submit.__doc__
346
347    def shutdown(self, wait=True):
348        with self._shutdown_lock:
349            self._shutdown_thread = True
350        if self._queue_management_thread:
351            # Wake up queue management thread
352            self._result_queue.put(None)
353            if wait:
354                self._queue_management_thread.join(sys.maxint)
355        # To reduce the risk of openning too many files, remove references to
356        # objects that use file descriptors.
357        self._queue_management_thread = None
358        self._call_queue = None
359        self._result_queue = None
360        self._processes = None
361    shutdown.__doc__ = _base.Executor.shutdown.__doc__
362
363atexit.register(_python_exit)
364