• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The following diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
12|          |     +----------+       |        |     +-----------+    |  Pool   |
13|          |     | ...      |       |        |     | ...       |    +---------+
14|          |     | 6        |    => |        |  => | 5, call() | => |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Result Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import os
49from concurrent.futures import _base
50import queue
51import multiprocessing as mp
52import multiprocessing.connection
53from multiprocessing.queues import Queue
54import threading
55import weakref
56from functools import partial
57import itertools
58import sys
59import traceback
60
61
62_threads_wakeups = weakref.WeakKeyDictionary()
63_global_shutdown = False
64
65
66class _ThreadWakeup:
67    def __init__(self):
68        self._closed = False
69        self._reader, self._writer = mp.Pipe(duplex=False)
70
71    def close(self):
72        if not self._closed:
73            self._closed = True
74            self._writer.close()
75            self._reader.close()
76
77    def wakeup(self):
78        if not self._closed:
79            self._writer.send_bytes(b"")
80
81    def clear(self):
82        if not self._closed:
83            while self._reader.poll():
84                self._reader.recv_bytes()
85
86
87def _python_exit():
88    global _global_shutdown
89    _global_shutdown = True
90    items = list(_threads_wakeups.items())
91    for _, thread_wakeup in items:
92        # call not protected by ProcessPoolExecutor._shutdown_lock
93        thread_wakeup.wakeup()
94    for t, _ in items:
95        t.join()
96
97# Register for `_python_exit()` to be called just before joining all
98# non-daemon threads. This is used instead of `atexit.register()` for
99# compatibility with subinterpreters, which no longer support daemon threads.
100# See bpo-39812 for context.
101threading._register_atexit(_python_exit)
102
103# Controls how many more calls than processes will be queued in the call queue.
104# A smaller number will mean that processes spend more time idle waiting for
105# work while a larger number will make Future.cancel() succeed less frequently
106# (Futures in the call queue cannot be cancelled).
107EXTRA_QUEUED_CALLS = 1
108
109
110# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
111# It can wait on, at most, 63 objects. There is an overhead of two objects:
112# - the result queue reader
113# - the thread wakeup reader
114_MAX_WINDOWS_WORKERS = 63 - 2
115
116# Hack to embed stringification of remote traceback in local traceback
117
118class _RemoteTraceback(Exception):
119    def __init__(self, tb):
120        self.tb = tb
121    def __str__(self):
122        return self.tb
123
124class _ExceptionWithTraceback:
125    def __init__(self, exc, tb):
126        tb = traceback.format_exception(type(exc), exc, tb)
127        tb = ''.join(tb)
128        self.exc = exc
129        self.tb = '\n"""\n%s"""' % tb
130    def __reduce__(self):
131        return _rebuild_exc, (self.exc, self.tb)
132
133def _rebuild_exc(exc, tb):
134    exc.__cause__ = _RemoteTraceback(tb)
135    return exc
136
137class _WorkItem(object):
138    def __init__(self, future, fn, args, kwargs):
139        self.future = future
140        self.fn = fn
141        self.args = args
142        self.kwargs = kwargs
143
144class _ResultItem(object):
145    def __init__(self, work_id, exception=None, result=None):
146        self.work_id = work_id
147        self.exception = exception
148        self.result = result
149
150class _CallItem(object):
151    def __init__(self, work_id, fn, args, kwargs):
152        self.work_id = work_id
153        self.fn = fn
154        self.args = args
155        self.kwargs = kwargs
156
157
158class _SafeQueue(Queue):
159    """Safe Queue set exception to the future object linked to a job"""
160    def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
161                 thread_wakeup):
162        self.pending_work_items = pending_work_items
163        self.shutdown_lock = shutdown_lock
164        self.thread_wakeup = thread_wakeup
165        super().__init__(max_size, ctx=ctx)
166
167    def _on_queue_feeder_error(self, e, obj):
168        if isinstance(obj, _CallItem):
169            tb = traceback.format_exception(type(e), e, e.__traceback__)
170            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171            work_item = self.pending_work_items.pop(obj.work_id, None)
172            with self.shutdown_lock:
173                self.thread_wakeup.wakeup()
174            # work_item can be None if another process terminated. In this
175            # case, the executor_manager_thread fails all work_items
176            # with BrokenProcessPool
177            if work_item is not None:
178                work_item.future.set_exception(e)
179        else:
180            super()._on_queue_feeder_error(e, obj)
181
182
183def _get_chunks(*iterables, chunksize):
184    """ Iterates over zip()ed iterables in chunks. """
185    it = zip(*iterables)
186    while True:
187        chunk = tuple(itertools.islice(it, chunksize))
188        if not chunk:
189            return
190        yield chunk
191
192
193def _process_chunk(fn, chunk):
194    """ Processes a chunk of an iterable passed to map.
195
196    Runs the function passed to map() on a chunk of the
197    iterable passed to map.
198
199    This function is run in a separate process.
200
201    """
202    return [fn(*args) for args in chunk]
203
204
205def _sendback_result(result_queue, work_id, result=None, exception=None):
206    """Safely send back the given result or exception"""
207    try:
208        result_queue.put(_ResultItem(work_id, result=result,
209                                     exception=exception))
210    except BaseException as e:
211        exc = _ExceptionWithTraceback(e, e.__traceback__)
212        result_queue.put(_ResultItem(work_id, exception=exc))
213
214
215def _process_worker(call_queue, result_queue, initializer, initargs):
216    """Evaluates calls from call_queue and places the results in result_queue.
217
218    This worker is run in a separate process.
219
220    Args:
221        call_queue: A ctx.Queue of _CallItems that will be read and
222            evaluated by the worker.
223        result_queue: A ctx.Queue of _ResultItems that will written
224            to by the worker.
225        initializer: A callable initializer, or None
226        initargs: A tuple of args for the initializer
227    """
228    if initializer is not None:
229        try:
230            initializer(*initargs)
231        except BaseException:
232            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
233            # The parent will notice that the process stopped and
234            # mark the pool broken
235            return
236    while True:
237        call_item = call_queue.get(block=True)
238        if call_item is None:
239            # Wake up queue management thread
240            result_queue.put(os.getpid())
241            return
242        try:
243            r = call_item.fn(*call_item.args, **call_item.kwargs)
244        except BaseException as e:
245            exc = _ExceptionWithTraceback(e, e.__traceback__)
246            _sendback_result(result_queue, call_item.work_id, exception=exc)
247        else:
248            _sendback_result(result_queue, call_item.work_id, result=r)
249            del r
250
251        # Liberate the resource as soon as possible, to avoid holding onto
252        # open files or shared memory that is not needed anymore
253        del call_item
254
255
256class _ExecutorManagerThread(threading.Thread):
257    """Manages the communication between this process and the worker processes.
258
259    The manager is run in a local thread.
260
261    Args:
262        executor: A reference to the ProcessPoolExecutor that owns
263            this thread. A weakref will be own by the manager as well as
264            references to internal objects used to introspect the state of
265            the executor.
266    """
267
268    def __init__(self, executor):
269        # Store references to necessary internals of the executor.
270
271        # A _ThreadWakeup to allow waking up the queue_manager_thread from the
272        # main Thread and avoid deadlocks caused by permanently locked queues.
273        self.thread_wakeup = executor._executor_manager_thread_wakeup
274        self.shutdown_lock = executor._shutdown_lock
275
276        # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
277        # to determine if the ProcessPoolExecutor has been garbage collected
278        # and that the manager can exit.
279        # When the executor gets garbage collected, the weakref callback
280        # will wake up the queue management thread so that it can terminate
281        # if there is no pending work item.
282        def weakref_cb(_,
283                       thread_wakeup=self.thread_wakeup,
284                       shutdown_lock=self.shutdown_lock):
285            mp.util.debug('Executor collected: triggering callback for'
286                          ' QueueManager wakeup')
287            with shutdown_lock:
288                thread_wakeup.wakeup()
289
290        self.executor_reference = weakref.ref(executor, weakref_cb)
291
292        # A list of the ctx.Process instances used as workers.
293        self.processes = executor._processes
294
295        # A ctx.Queue that will be filled with _CallItems derived from
296        # _WorkItems for processing by the process workers.
297        self.call_queue = executor._call_queue
298
299        # A ctx.SimpleQueue of _ResultItems generated by the process workers.
300        self.result_queue = executor._result_queue
301
302        # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
303        self.work_ids_queue = executor._work_ids
304
305        # A dict mapping work ids to _WorkItems e.g.
306        #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
307        self.pending_work_items = executor._pending_work_items
308
309        super().__init__()
310
311    def run(self):
312        # Main loop for the executor manager thread.
313
314        while True:
315            self.add_call_item_to_queue()
316
317            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
318
319            if is_broken:
320                self.terminate_broken(cause)
321                return
322            if result_item is not None:
323                self.process_result_item(result_item)
324                # Delete reference to result_item to avoid keeping references
325                # while waiting on new results.
326                del result_item
327
328                # attempt to increment idle process count
329                executor = self.executor_reference()
330                if executor is not None:
331                    executor._idle_worker_semaphore.release()
332                del executor
333
334            if self.is_shutting_down():
335                self.flag_executor_shutting_down()
336
337                # Since no new work items can be added, it is safe to shutdown
338                # this thread if there are no pending work items.
339                if not self.pending_work_items:
340                    self.join_executor_internals()
341                    return
342
343    def add_call_item_to_queue(self):
344        # Fills call_queue with _WorkItems from pending_work_items.
345        # This function never blocks.
346        while True:
347            if self.call_queue.full():
348                return
349            try:
350                work_id = self.work_ids_queue.get(block=False)
351            except queue.Empty:
352                return
353            else:
354                work_item = self.pending_work_items[work_id]
355
356                if work_item.future.set_running_or_notify_cancel():
357                    self.call_queue.put(_CallItem(work_id,
358                                                  work_item.fn,
359                                                  work_item.args,
360                                                  work_item.kwargs),
361                                        block=True)
362                else:
363                    del self.pending_work_items[work_id]
364                    continue
365
366    def wait_result_broken_or_wakeup(self):
367        # Wait for a result to be ready in the result_queue while checking
368        # that all worker processes are still running, or for a wake up
369        # signal send. The wake up signals come either from new tasks being
370        # submitted, from the executor being shutdown/gc-ed, or from the
371        # shutdown of the python interpreter.
372        result_reader = self.result_queue._reader
373        assert not self.thread_wakeup._closed
374        wakeup_reader = self.thread_wakeup._reader
375        readers = [result_reader, wakeup_reader]
376        worker_sentinels = [p.sentinel for p in list(self.processes.values())]
377        ready = mp.connection.wait(readers + worker_sentinels)
378
379        cause = None
380        is_broken = True
381        result_item = None
382        if result_reader in ready:
383            try:
384                result_item = result_reader.recv()
385                is_broken = False
386            except BaseException as e:
387                cause = traceback.format_exception(type(e), e, e.__traceback__)
388
389        elif wakeup_reader in ready:
390            is_broken = False
391
392        with self.shutdown_lock:
393            self.thread_wakeup.clear()
394
395        return result_item, is_broken, cause
396
397    def process_result_item(self, result_item):
398        # Process the received a result_item. This can be either the PID of a
399        # worker that exited gracefully or a _ResultItem
400
401        if isinstance(result_item, int):
402            # Clean shutdown of a worker using its PID
403            # (avoids marking the executor broken)
404            assert self.is_shutting_down()
405            p = self.processes.pop(result_item)
406            p.join()
407            if not self.processes:
408                self.join_executor_internals()
409                return
410        else:
411            # Received a _ResultItem so mark the future as completed.
412            work_item = self.pending_work_items.pop(result_item.work_id, None)
413            # work_item can be None if another process terminated (see above)
414            if work_item is not None:
415                if result_item.exception:
416                    work_item.future.set_exception(result_item.exception)
417                else:
418                    work_item.future.set_result(result_item.result)
419
420    def is_shutting_down(self):
421        # Check whether we should start shutting down the executor.
422        executor = self.executor_reference()
423        # No more work items can be added if:
424        #   - The interpreter is shutting down OR
425        #   - The executor that owns this worker has been collected OR
426        #   - The executor that owns this worker has been shutdown.
427        return (_global_shutdown or executor is None
428                or executor._shutdown_thread)
429
430    def terminate_broken(self, cause):
431        # Terminate the executor because it is in a broken state. The cause
432        # argument can be used to display more information on the error that
433        # lead the executor into becoming broken.
434
435        # Mark the process pool broken so that submits fail right now.
436        executor = self.executor_reference()
437        if executor is not None:
438            executor._broken = ('A child process terminated '
439                                'abruptly, the process pool is not '
440                                'usable anymore')
441            executor._shutdown_thread = True
442            executor = None
443
444        # All pending tasks are to be marked failed with the following
445        # BrokenProcessPool error
446        bpe = BrokenProcessPool("A process in the process pool was "
447                                "terminated abruptly while the future was "
448                                "running or pending.")
449        if cause is not None:
450            bpe.__cause__ = _RemoteTraceback(
451                f"\n'''\n{''.join(cause)}'''")
452
453        # Mark pending tasks as failed.
454        for work_id, work_item in self.pending_work_items.items():
455            work_item.future.set_exception(bpe)
456            # Delete references to object. See issue16284
457            del work_item
458        self.pending_work_items.clear()
459
460        # Terminate remaining workers forcibly: the queues or their
461        # locks may be in a dirty state and block forever.
462        for p in self.processes.values():
463            p.terminate()
464
465        # clean up resources
466        self.join_executor_internals()
467
468    def flag_executor_shutting_down(self):
469        # Flag the executor as shutting down and cancel remaining tasks if
470        # requested as early as possible if it is not gc-ed yet.
471        executor = self.executor_reference()
472        if executor is not None:
473            executor._shutdown_thread = True
474            # Cancel pending work items if requested.
475            if executor._cancel_pending_futures:
476                # Cancel all pending futures and update pending_work_items
477                # to only have futures that are currently running.
478                new_pending_work_items = {}
479                for work_id, work_item in self.pending_work_items.items():
480                    if not work_item.future.cancel():
481                        new_pending_work_items[work_id] = work_item
482                self.pending_work_items = new_pending_work_items
483                # Drain work_ids_queue since we no longer need to
484                # add items to the call queue.
485                while True:
486                    try:
487                        self.work_ids_queue.get_nowait()
488                    except queue.Empty:
489                        break
490                # Make sure we do this only once to not waste time looping
491                # on running processes over and over.
492                executor._cancel_pending_futures = False
493
494    def shutdown_workers(self):
495        n_children_to_stop = self.get_n_children_alive()
496        n_sentinels_sent = 0
497        # Send the right number of sentinels, to make sure all children are
498        # properly terminated.
499        while (n_sentinels_sent < n_children_to_stop
500                and self.get_n_children_alive() > 0):
501            for i in range(n_children_to_stop - n_sentinels_sent):
502                try:
503                    self.call_queue.put_nowait(None)
504                    n_sentinels_sent += 1
505                except queue.Full:
506                    break
507
508    def join_executor_internals(self):
509        self.shutdown_workers()
510        # Release the queue's resources as soon as possible.
511        self.call_queue.close()
512        self.call_queue.join_thread()
513        with self.shutdown_lock:
514            self.thread_wakeup.close()
515        # If .join() is not called on the created processes then
516        # some ctx.Queue methods may deadlock on Mac OS X.
517        for p in self.processes.values():
518            p.join()
519
520    def get_n_children_alive(self):
521        # This is an upper bound on the number of children alive.
522        return sum(p.is_alive() for p in self.processes.values())
523
524
525_system_limits_checked = False
526_system_limited = None
527
528
529def _check_system_limits():
530    global _system_limits_checked, _system_limited
531    if _system_limits_checked:
532        if _system_limited:
533            raise NotImplementedError(_system_limited)
534    _system_limits_checked = True
535    try:
536        import multiprocessing.synchronize
537    except ImportError:
538        _system_limited = (
539            "This Python build lacks multiprocessing.synchronize, usually due "
540            "to named semaphores being unavailable on this platform."
541        )
542        raise NotImplementedError(_system_limited)
543    try:
544        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
545    except (AttributeError, ValueError):
546        # sysconf not available or setting not available
547        return
548    if nsems_max == -1:
549        # indetermined limit, assume that limit is determined
550        # by available memory only
551        return
552    if nsems_max >= 256:
553        # minimum number of semaphores available
554        # according to POSIX
555        return
556    _system_limited = ("system provides too few semaphores (%d"
557                       " available, 256 necessary)" % nsems_max)
558    raise NotImplementedError(_system_limited)
559
560
561def _chain_from_iterable_of_lists(iterable):
562    """
563    Specialized implementation of itertools.chain.from_iterable.
564    Each item in *iterable* should be a list.  This function is
565    careful not to keep references to yielded objects.
566    """
567    for element in iterable:
568        element.reverse()
569        while element:
570            yield element.pop()
571
572
573class BrokenProcessPool(_base.BrokenExecutor):
574    """
575    Raised when a process in a ProcessPoolExecutor terminated abruptly
576    while a future was in the running state.
577    """
578
579
580class ProcessPoolExecutor(_base.Executor):
581    def __init__(self, max_workers=None, mp_context=None,
582                 initializer=None, initargs=()):
583        """Initializes a new ProcessPoolExecutor instance.
584
585        Args:
586            max_workers: The maximum number of processes that can be used to
587                execute the given calls. If None or not given then as many
588                worker processes will be created as the machine has processors.
589            mp_context: A multiprocessing context to launch the workers. This
590                object should provide SimpleQueue, Queue and Process.
591            initializer: A callable used to initialize worker processes.
592            initargs: A tuple of arguments to pass to the initializer.
593        """
594        _check_system_limits()
595
596        if max_workers is None:
597            self._max_workers = os.cpu_count() or 1
598            if sys.platform == 'win32':
599                self._max_workers = min(_MAX_WINDOWS_WORKERS,
600                                        self._max_workers)
601        else:
602            if max_workers <= 0:
603                raise ValueError("max_workers must be greater than 0")
604            elif (sys.platform == 'win32' and
605                max_workers > _MAX_WINDOWS_WORKERS):
606                raise ValueError(
607                    f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
608
609            self._max_workers = max_workers
610
611        if mp_context is None:
612            mp_context = mp.get_context()
613        self._mp_context = mp_context
614
615        if initializer is not None and not callable(initializer):
616            raise TypeError("initializer must be a callable")
617        self._initializer = initializer
618        self._initargs = initargs
619
620        # Management thread
621        self._executor_manager_thread = None
622
623        # Map of pids to processes
624        self._processes = {}
625
626        # Shutdown is a two-step process.
627        self._shutdown_thread = False
628        self._shutdown_lock = threading.Lock()
629        self._idle_worker_semaphore = threading.Semaphore(0)
630        self._broken = False
631        self._queue_count = 0
632        self._pending_work_items = {}
633        self._cancel_pending_futures = False
634
635        # _ThreadWakeup is a communication channel used to interrupt the wait
636        # of the main loop of executor_manager_thread from another thread (e.g.
637        # when calling executor.submit or executor.shutdown). We do not use the
638        # _result_queue to send wakeup signals to the executor_manager_thread
639        # as it could result in a deadlock if a worker process dies with the
640        # _result_queue write lock still acquired.
641        #
642        # _shutdown_lock must be locked to access _ThreadWakeup.
643        self._executor_manager_thread_wakeup = _ThreadWakeup()
644
645        # Create communication channels for the executor
646        # Make the call queue slightly larger than the number of processes to
647        # prevent the worker processes from idling. But don't make it too big
648        # because futures in the call queue cannot be cancelled.
649        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
650        self._call_queue = _SafeQueue(
651            max_size=queue_size, ctx=self._mp_context,
652            pending_work_items=self._pending_work_items,
653            shutdown_lock=self._shutdown_lock,
654            thread_wakeup=self._executor_manager_thread_wakeup)
655        # Killed worker processes can produce spurious "broken pipe"
656        # tracebacks in the queue's own worker thread. But we detect killed
657        # processes anyway, so silence the tracebacks.
658        self._call_queue._ignore_epipe = True
659        self._result_queue = mp_context.SimpleQueue()
660        self._work_ids = queue.Queue()
661
662    def _start_executor_manager_thread(self):
663        if self._executor_manager_thread is None:
664            # Start the processes so that their sentinels are known.
665            self._executor_manager_thread = _ExecutorManagerThread(self)
666            self._executor_manager_thread.start()
667            _threads_wakeups[self._executor_manager_thread] = \
668                self._executor_manager_thread_wakeup
669
670    def _adjust_process_count(self):
671        # if there's an idle process, we don't need to spawn a new one.
672        if self._idle_worker_semaphore.acquire(blocking=False):
673            return
674
675        process_count = len(self._processes)
676        if process_count < self._max_workers:
677            p = self._mp_context.Process(
678                target=_process_worker,
679                args=(self._call_queue,
680                      self._result_queue,
681                      self._initializer,
682                      self._initargs))
683            p.start()
684            self._processes[p.pid] = p
685
686    def submit(self, fn, /, *args, **kwargs):
687        with self._shutdown_lock:
688            if self._broken:
689                raise BrokenProcessPool(self._broken)
690            if self._shutdown_thread:
691                raise RuntimeError('cannot schedule new futures after shutdown')
692            if _global_shutdown:
693                raise RuntimeError('cannot schedule new futures after '
694                                   'interpreter shutdown')
695
696            f = _base.Future()
697            w = _WorkItem(f, fn, args, kwargs)
698
699            self._pending_work_items[self._queue_count] = w
700            self._work_ids.put(self._queue_count)
701            self._queue_count += 1
702            # Wake up queue management thread
703            self._executor_manager_thread_wakeup.wakeup()
704
705            self._adjust_process_count()
706            self._start_executor_manager_thread()
707            return f
708    submit.__doc__ = _base.Executor.submit.__doc__
709
710    def map(self, fn, *iterables, timeout=None, chunksize=1):
711        """Returns an iterator equivalent to map(fn, iter).
712
713        Args:
714            fn: A callable that will take as many arguments as there are
715                passed iterables.
716            timeout: The maximum number of seconds to wait. If None, then there
717                is no limit on the wait time.
718            chunksize: If greater than one, the iterables will be chopped into
719                chunks of size chunksize and submitted to the process pool.
720                If set to one, the items in the list will be sent one at a time.
721
722        Returns:
723            An iterator equivalent to: map(func, *iterables) but the calls may
724            be evaluated out-of-order.
725
726        Raises:
727            TimeoutError: If the entire result iterator could not be generated
728                before the given timeout.
729            Exception: If fn(*args) raises for any values.
730        """
731        if chunksize < 1:
732            raise ValueError("chunksize must be >= 1.")
733
734        results = super().map(partial(_process_chunk, fn),
735                              _get_chunks(*iterables, chunksize=chunksize),
736                              timeout=timeout)
737        return _chain_from_iterable_of_lists(results)
738
739    def shutdown(self, wait=True, *, cancel_futures=False):
740        with self._shutdown_lock:
741            self._cancel_pending_futures = cancel_futures
742            self._shutdown_thread = True
743            if self._executor_manager_thread_wakeup is not None:
744                # Wake up queue management thread
745                self._executor_manager_thread_wakeup.wakeup()
746
747        if self._executor_manager_thread is not None and wait:
748            self._executor_manager_thread.join()
749        # To reduce the risk of opening too many files, remove references to
750        # objects that use file descriptors.
751        self._executor_manager_thread = None
752        self._call_queue = None
753        if self._result_queue is not None and wait:
754            self._result_queue.close()
755        self._result_queue = None
756        self._processes = None
757        self._executor_manager_thread_wakeup = None
758
759    shutdown.__doc__ = _base.Executor.shutdown.__doc__
760