• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The following diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
12|          |     +----------+       |        |     +-----------+    |  Pool   |
13|          |     | ...      |       |        |     | ...       |    +---------+
14|          |     | 6        |    => |        |  => | 5, call() | => |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Result Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import atexit
49import os
50from concurrent.futures import _base
51import queue
52from queue import Full
53import multiprocessing as mp
54import multiprocessing.connection
55from multiprocessing.queues import Queue
56import threading
57import weakref
58from functools import partial
59import itertools
60import sys
61import traceback
62
63# Workers are created as daemon threads and processes. This is done to allow the
64# interpreter to exit when there are still idle processes in a
65# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
66# allowing workers to die with the interpreter has two undesirable properties:
67#   - The workers would still be running during interpreter shutdown,
68#     meaning that they would fail in unpredictable ways.
69#   - The workers could be killed while evaluating a work item, which could
70#     be bad if the callable being evaluated has external side-effects e.g.
71#     writing to a file.
72#
73# To work around this problem, an exit handler is installed which tells the
74# workers to exit when their work queues are empty and then waits until the
75# threads/processes finish.
76
77_threads_wakeups = weakref.WeakKeyDictionary()
78_global_shutdown = False
79
80
81class _ThreadWakeup:
82    def __init__(self):
83        self._reader, self._writer = mp.Pipe(duplex=False)
84
85    def close(self):
86        self._writer.close()
87        self._reader.close()
88
89    def wakeup(self):
90        self._writer.send_bytes(b"")
91
92    def clear(self):
93        while self._reader.poll():
94            self._reader.recv_bytes()
95
96
97def _python_exit():
98    global _global_shutdown
99    _global_shutdown = True
100    items = list(_threads_wakeups.items())
101    for _, thread_wakeup in items:
102        thread_wakeup.wakeup()
103    for t, _ in items:
104        t.join()
105
106# Controls how many more calls than processes will be queued in the call queue.
107# A smaller number will mean that processes spend more time idle waiting for
108# work while a larger number will make Future.cancel() succeed less frequently
109# (Futures in the call queue cannot be cancelled).
110EXTRA_QUEUED_CALLS = 1
111
112
113# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
114# It can wait on, at most, 63 objects. There is an overhead of two objects:
115# - the result queue reader
116# - the thread wakeup reader
117_MAX_WINDOWS_WORKERS = 63 - 2
118
119# Hack to embed stringification of remote traceback in local traceback
120
121class _RemoteTraceback(Exception):
122    def __init__(self, tb):
123        self.tb = tb
124    def __str__(self):
125        return self.tb
126
127class _ExceptionWithTraceback:
128    def __init__(self, exc, tb):
129        tb = traceback.format_exception(type(exc), exc, tb)
130        tb = ''.join(tb)
131        self.exc = exc
132        self.tb = '\n"""\n%s"""' % tb
133    def __reduce__(self):
134        return _rebuild_exc, (self.exc, self.tb)
135
136def _rebuild_exc(exc, tb):
137    exc.__cause__ = _RemoteTraceback(tb)
138    return exc
139
140class _WorkItem(object):
141    def __init__(self, future, fn, args, kwargs):
142        self.future = future
143        self.fn = fn
144        self.args = args
145        self.kwargs = kwargs
146
147class _ResultItem(object):
148    def __init__(self, work_id, exception=None, result=None):
149        self.work_id = work_id
150        self.exception = exception
151        self.result = result
152
153class _CallItem(object):
154    def __init__(self, work_id, fn, args, kwargs):
155        self.work_id = work_id
156        self.fn = fn
157        self.args = args
158        self.kwargs = kwargs
159
160
161class _SafeQueue(Queue):
162    """Safe Queue set exception to the future object linked to a job"""
163    def __init__(self, max_size=0, *, ctx, pending_work_items):
164        self.pending_work_items = pending_work_items
165        super().__init__(max_size, ctx=ctx)
166
167    def _on_queue_feeder_error(self, e, obj):
168        if isinstance(obj, _CallItem):
169            tb = traceback.format_exception(type(e), e, e.__traceback__)
170            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171            work_item = self.pending_work_items.pop(obj.work_id, None)
172            # work_item can be None if another process terminated. In this case,
173            # the queue_manager_thread fails all work_items with BrokenProcessPool
174            if work_item is not None:
175                work_item.future.set_exception(e)
176        else:
177            super()._on_queue_feeder_error(e, obj)
178
179
180def _get_chunks(*iterables, chunksize):
181    """ Iterates over zip()ed iterables in chunks. """
182    it = zip(*iterables)
183    while True:
184        chunk = tuple(itertools.islice(it, chunksize))
185        if not chunk:
186            return
187        yield chunk
188
189def _process_chunk(fn, chunk):
190    """ Processes a chunk of an iterable passed to map.
191
192    Runs the function passed to map() on a chunk of the
193    iterable passed to map.
194
195    This function is run in a separate process.
196
197    """
198    return [fn(*args) for args in chunk]
199
200
201def _sendback_result(result_queue, work_id, result=None, exception=None):
202    """Safely send back the given result or exception"""
203    try:
204        result_queue.put(_ResultItem(work_id, result=result,
205                                     exception=exception))
206    except BaseException as e:
207        exc = _ExceptionWithTraceback(e, e.__traceback__)
208        result_queue.put(_ResultItem(work_id, exception=exc))
209
210
211def _process_worker(call_queue, result_queue, initializer, initargs):
212    """Evaluates calls from call_queue and places the results in result_queue.
213
214    This worker is run in a separate process.
215
216    Args:
217        call_queue: A ctx.Queue of _CallItems that will be read and
218            evaluated by the worker.
219        result_queue: A ctx.Queue of _ResultItems that will written
220            to by the worker.
221        initializer: A callable initializer, or None
222        initargs: A tuple of args for the initializer
223    """
224    if initializer is not None:
225        try:
226            initializer(*initargs)
227        except BaseException:
228            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
229            # The parent will notice that the process stopped and
230            # mark the pool broken
231            return
232    while True:
233        call_item = call_queue.get(block=True)
234        if call_item is None:
235            # Wake up queue management thread
236            result_queue.put(os.getpid())
237            return
238        try:
239            r = call_item.fn(*call_item.args, **call_item.kwargs)
240        except BaseException as e:
241            exc = _ExceptionWithTraceback(e, e.__traceback__)
242            _sendback_result(result_queue, call_item.work_id, exception=exc)
243        else:
244            _sendback_result(result_queue, call_item.work_id, result=r)
245            del r
246
247        # Liberate the resource as soon as possible, to avoid holding onto
248        # open files or shared memory that is not needed anymore
249        del call_item
250
251
252def _add_call_item_to_queue(pending_work_items,
253                            work_ids,
254                            call_queue):
255    """Fills call_queue with _WorkItems from pending_work_items.
256
257    This function never blocks.
258
259    Args:
260        pending_work_items: A dict mapping work ids to _WorkItems e.g.
261            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
262        work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
263            are consumed and the corresponding _WorkItems from
264            pending_work_items are transformed into _CallItems and put in
265            call_queue.
266        call_queue: A multiprocessing.Queue that will be filled with _CallItems
267            derived from _WorkItems.
268    """
269    while True:
270        if call_queue.full():
271            return
272        try:
273            work_id = work_ids.get(block=False)
274        except queue.Empty:
275            return
276        else:
277            work_item = pending_work_items[work_id]
278
279            if work_item.future.set_running_or_notify_cancel():
280                call_queue.put(_CallItem(work_id,
281                                         work_item.fn,
282                                         work_item.args,
283                                         work_item.kwargs),
284                               block=True)
285            else:
286                del pending_work_items[work_id]
287                continue
288
289
290def _queue_management_worker(executor_reference,
291                             processes,
292                             pending_work_items,
293                             work_ids_queue,
294                             call_queue,
295                             result_queue,
296                             thread_wakeup):
297    """Manages the communication between this process and the worker processes.
298
299    This function is run in a local thread.
300
301    Args:
302        executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
303            this thread. Used to determine if the ProcessPoolExecutor has been
304            garbage collected and that this function can exit.
305        process: A list of the ctx.Process instances used as
306            workers.
307        pending_work_items: A dict mapping work ids to _WorkItems e.g.
308            {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
309        work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
310        call_queue: A ctx.Queue that will be filled with _CallItems
311            derived from _WorkItems for processing by the process workers.
312        result_queue: A ctx.SimpleQueue of _ResultItems generated by the
313            process workers.
314        thread_wakeup: A _ThreadWakeup to allow waking up the
315            queue_manager_thread from the main Thread and avoid deadlocks
316            caused by permanently locked queues.
317    """
318    executor = None
319
320    def shutting_down():
321        return (_global_shutdown or executor is None
322                or executor._shutdown_thread)
323
324    def shutdown_worker():
325        # This is an upper bound on the number of children alive.
326        n_children_alive = sum(p.is_alive() for p in processes.values())
327        n_children_to_stop = n_children_alive
328        n_sentinels_sent = 0
329        # Send the right number of sentinels, to make sure all children are
330        # properly terminated.
331        while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
332            for i in range(n_children_to_stop - n_sentinels_sent):
333                try:
334                    call_queue.put_nowait(None)
335                    n_sentinels_sent += 1
336                except Full:
337                    break
338            n_children_alive = sum(p.is_alive() for p in processes.values())
339
340        # Release the queue's resources as soon as possible.
341        call_queue.close()
342        # If .join() is not called on the created processes then
343        # some ctx.Queue methods may deadlock on Mac OS X.
344        for p in processes.values():
345            p.join()
346
347    result_reader = result_queue._reader
348    wakeup_reader = thread_wakeup._reader
349    readers = [result_reader, wakeup_reader]
350
351    while True:
352        _add_call_item_to_queue(pending_work_items,
353                                work_ids_queue,
354                                call_queue)
355
356        # Wait for a result to be ready in the result_queue while checking
357        # that all worker processes are still running, or for a wake up
358        # signal send. The wake up signals come either from new tasks being
359        # submitted, from the executor being shutdown/gc-ed, or from the
360        # shutdown of the python interpreter.
361        worker_sentinels = [p.sentinel for p in processes.values()]
362        ready = mp.connection.wait(readers + worker_sentinels)
363
364        cause = None
365        is_broken = True
366        if result_reader in ready:
367            try:
368                result_item = result_reader.recv()
369                is_broken = False
370            except BaseException as e:
371                cause = traceback.format_exception(type(e), e, e.__traceback__)
372
373        elif wakeup_reader in ready:
374            is_broken = False
375            result_item = None
376        thread_wakeup.clear()
377        if is_broken:
378            # Mark the process pool broken so that submits fail right now.
379            executor = executor_reference()
380            if executor is not None:
381                executor._broken = ('A child process terminated '
382                                    'abruptly, the process pool is not '
383                                    'usable anymore')
384                executor._shutdown_thread = True
385                executor = None
386            bpe = BrokenProcessPool("A process in the process pool was "
387                                    "terminated abruptly while the future was "
388                                    "running or pending.")
389            if cause is not None:
390                bpe.__cause__ = _RemoteTraceback(
391                    f"\n'''\n{''.join(cause)}'''")
392            # All futures in flight must be marked failed
393            for work_id, work_item in pending_work_items.items():
394                work_item.future.set_exception(bpe)
395                # Delete references to object. See issue16284
396                del work_item
397            pending_work_items.clear()
398            # Terminate remaining workers forcibly: the queues or their
399            # locks may be in a dirty state and block forever.
400            for p in processes.values():
401                p.terminate()
402            shutdown_worker()
403            return
404        if isinstance(result_item, int):
405            # Clean shutdown of a worker using its PID
406            # (avoids marking the executor broken)
407            assert shutting_down()
408            p = processes.pop(result_item)
409            p.join()
410            if not processes:
411                shutdown_worker()
412                return
413        elif result_item is not None:
414            work_item = pending_work_items.pop(result_item.work_id, None)
415            # work_item can be None if another process terminated (see above)
416            if work_item is not None:
417                if result_item.exception:
418                    work_item.future.set_exception(result_item.exception)
419                else:
420                    work_item.future.set_result(result_item.result)
421                # Delete references to object. See issue16284
422                del work_item
423            # Delete reference to result_item
424            del result_item
425
426        # Check whether we should start shutting down.
427        executor = executor_reference()
428        # No more work items can be added if:
429        #   - The interpreter is shutting down OR
430        #   - The executor that owns this worker has been collected OR
431        #   - The executor that owns this worker has been shutdown.
432        if shutting_down():
433            try:
434                # Flag the executor as shutting down as early as possible if it
435                # is not gc-ed yet.
436                if executor is not None:
437                    executor._shutdown_thread = True
438                # Since no new work items can be added, it is safe to shutdown
439                # this thread if there are no pending work items.
440                if not pending_work_items:
441                    shutdown_worker()
442                    return
443            except Full:
444                # This is not a problem: we will eventually be woken up (in
445                # result_queue.get()) and be able to send a sentinel again.
446                pass
447        executor = None
448
449
450_system_limits_checked = False
451_system_limited = None
452
453
454def _check_system_limits():
455    global _system_limits_checked, _system_limited
456    if _system_limits_checked:
457        if _system_limited:
458            raise NotImplementedError(_system_limited)
459    _system_limits_checked = True
460    try:
461        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
462    except (AttributeError, ValueError):
463        # sysconf not available or setting not available
464        return
465    if nsems_max == -1:
466        # indetermined limit, assume that limit is determined
467        # by available memory only
468        return
469    if nsems_max >= 256:
470        # minimum number of semaphores available
471        # according to POSIX
472        return
473    _system_limited = ("system provides too few semaphores (%d"
474                       " available, 256 necessary)" % nsems_max)
475    raise NotImplementedError(_system_limited)
476
477
478def _chain_from_iterable_of_lists(iterable):
479    """
480    Specialized implementation of itertools.chain.from_iterable.
481    Each item in *iterable* should be a list.  This function is
482    careful not to keep references to yielded objects.
483    """
484    for element in iterable:
485        element.reverse()
486        while element:
487            yield element.pop()
488
489
490class BrokenProcessPool(_base.BrokenExecutor):
491    """
492    Raised when a process in a ProcessPoolExecutor terminated abruptly
493    while a future was in the running state.
494    """
495
496
497class ProcessPoolExecutor(_base.Executor):
498    def __init__(self, max_workers=None, mp_context=None,
499                 initializer=None, initargs=()):
500        """Initializes a new ProcessPoolExecutor instance.
501
502        Args:
503            max_workers: The maximum number of processes that can be used to
504                execute the given calls. If None or not given then as many
505                worker processes will be created as the machine has processors.
506            mp_context: A multiprocessing context to launch the workers. This
507                object should provide SimpleQueue, Queue and Process.
508            initializer: A callable used to initialize worker processes.
509            initargs: A tuple of arguments to pass to the initializer.
510        """
511        _check_system_limits()
512
513        if max_workers is None:
514            self._max_workers = os.cpu_count() or 1
515            if sys.platform == 'win32':
516                self._max_workers = min(_MAX_WINDOWS_WORKERS,
517                                        self._max_workers)
518        else:
519            if max_workers <= 0:
520                raise ValueError("max_workers must be greater than 0")
521            elif (sys.platform == 'win32' and
522                max_workers > _MAX_WINDOWS_WORKERS):
523                raise ValueError(
524                    f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
525
526            self._max_workers = max_workers
527
528        if mp_context is None:
529            mp_context = mp.get_context()
530        self._mp_context = mp_context
531
532        if initializer is not None and not callable(initializer):
533            raise TypeError("initializer must be a callable")
534        self._initializer = initializer
535        self._initargs = initargs
536
537        # Management thread
538        self._queue_management_thread = None
539
540        # Map of pids to processes
541        self._processes = {}
542
543        # Shutdown is a two-step process.
544        self._shutdown_thread = False
545        self._shutdown_lock = threading.Lock()
546        self._broken = False
547        self._queue_count = 0
548        self._pending_work_items = {}
549
550        # Create communication channels for the executor
551        # Make the call queue slightly larger than the number of processes to
552        # prevent the worker processes from idling. But don't make it too big
553        # because futures in the call queue cannot be cancelled.
554        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
555        self._call_queue = _SafeQueue(
556            max_size=queue_size, ctx=self._mp_context,
557            pending_work_items=self._pending_work_items)
558        # Killed worker processes can produce spurious "broken pipe"
559        # tracebacks in the queue's own worker thread. But we detect killed
560        # processes anyway, so silence the tracebacks.
561        self._call_queue._ignore_epipe = True
562        self._result_queue = mp_context.SimpleQueue()
563        self._work_ids = queue.Queue()
564
565        # _ThreadWakeup is a communication channel used to interrupt the wait
566        # of the main loop of queue_manager_thread from another thread (e.g.
567        # when calling executor.submit or executor.shutdown). We do not use the
568        # _result_queue to send the wakeup signal to the queue_manager_thread
569        # as it could result in a deadlock if a worker process dies with the
570        # _result_queue write lock still acquired.
571        self._queue_management_thread_wakeup = _ThreadWakeup()
572
573    def _start_queue_management_thread(self):
574        if self._queue_management_thread is None:
575            # When the executor gets garbarge collected, the weakref callback
576            # will wake up the queue management thread so that it can terminate
577            # if there is no pending work item.
578            def weakref_cb(_,
579                           thread_wakeup=self._queue_management_thread_wakeup):
580                mp.util.debug('Executor collected: triggering callback for'
581                              ' QueueManager wakeup')
582                thread_wakeup.wakeup()
583            # Start the processes so that their sentinels are known.
584            self._adjust_process_count()
585            self._queue_management_thread = threading.Thread(
586                target=_queue_management_worker,
587                args=(weakref.ref(self, weakref_cb),
588                      self._processes,
589                      self._pending_work_items,
590                      self._work_ids,
591                      self._call_queue,
592                      self._result_queue,
593                      self._queue_management_thread_wakeup),
594                name="QueueManagerThread")
595            self._queue_management_thread.daemon = True
596            self._queue_management_thread.start()
597            _threads_wakeups[self._queue_management_thread] = \
598                self._queue_management_thread_wakeup
599
600    def _adjust_process_count(self):
601        for _ in range(len(self._processes), self._max_workers):
602            p = self._mp_context.Process(
603                target=_process_worker,
604                args=(self._call_queue,
605                      self._result_queue,
606                      self._initializer,
607                      self._initargs))
608            p.start()
609            self._processes[p.pid] = p
610
611    def submit(*args, **kwargs):
612        if len(args) >= 2:
613            self, fn, *args = args
614        elif not args:
615            raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object "
616                            "needs an argument")
617        elif 'fn' in kwargs:
618            fn = kwargs.pop('fn')
619            self, *args = args
620            import warnings
621            warnings.warn("Passing 'fn' as keyword argument is deprecated",
622                          DeprecationWarning, stacklevel=2)
623        else:
624            raise TypeError('submit expected at least 1 positional argument, '
625                            'got %d' % (len(args)-1))
626
627        with self._shutdown_lock:
628            if self._broken:
629                raise BrokenProcessPool(self._broken)
630            if self._shutdown_thread:
631                raise RuntimeError('cannot schedule new futures after shutdown')
632            if _global_shutdown:
633                raise RuntimeError('cannot schedule new futures after '
634                                   'interpreter shutdown')
635
636            f = _base.Future()
637            w = _WorkItem(f, fn, args, kwargs)
638
639            self._pending_work_items[self._queue_count] = w
640            self._work_ids.put(self._queue_count)
641            self._queue_count += 1
642            # Wake up queue management thread
643            self._queue_management_thread_wakeup.wakeup()
644
645            self._start_queue_management_thread()
646            return f
647    submit.__text_signature__ = _base.Executor.submit.__text_signature__
648    submit.__doc__ = _base.Executor.submit.__doc__
649
650    def map(self, fn, *iterables, timeout=None, chunksize=1):
651        """Returns an iterator equivalent to map(fn, iter).
652
653        Args:
654            fn: A callable that will take as many arguments as there are
655                passed iterables.
656            timeout: The maximum number of seconds to wait. If None, then there
657                is no limit on the wait time.
658            chunksize: If greater than one, the iterables will be chopped into
659                chunks of size chunksize and submitted to the process pool.
660                If set to one, the items in the list will be sent one at a time.
661
662        Returns:
663            An iterator equivalent to: map(func, *iterables) but the calls may
664            be evaluated out-of-order.
665
666        Raises:
667            TimeoutError: If the entire result iterator could not be generated
668                before the given timeout.
669            Exception: If fn(*args) raises for any values.
670        """
671        if chunksize < 1:
672            raise ValueError("chunksize must be >= 1.")
673
674        results = super().map(partial(_process_chunk, fn),
675                              _get_chunks(*iterables, chunksize=chunksize),
676                              timeout=timeout)
677        return _chain_from_iterable_of_lists(results)
678
679    def shutdown(self, wait=True):
680        with self._shutdown_lock:
681            self._shutdown_thread = True
682        if self._queue_management_thread:
683            # Wake up queue management thread
684            self._queue_management_thread_wakeup.wakeup()
685            if wait:
686                self._queue_management_thread.join()
687        # To reduce the risk of opening too many files, remove references to
688        # objects that use file descriptors.
689        self._queue_management_thread = None
690        if self._call_queue is not None:
691            self._call_queue.close()
692            if wait:
693                self._call_queue.join_thread()
694            self._call_queue = None
695        self._result_queue = None
696        self._processes = None
697
698        if self._queue_management_thread_wakeup:
699            self._queue_management_thread_wakeup.close()
700            self._queue_management_thread_wakeup = None
701
702    shutdown.__doc__ = _base.Executor.shutdown.__doc__
703
704atexit.register(_python_exit)
705