• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4"""Implements ProcessPoolExecutor.
5
6The following diagram and text describe the data-flow through the system:
7
8|======================= In-process =====================|== Out-of-process ==|
9
10+----------+     +----------+       +--------+     +-----------+    +---------+
11|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
12|          |     +----------+       |        |     +-----------+    |  Pool   |
13|          |     | ...      |       |        |     | ...       |    +---------+
14|          |     | 6        |    => |        |  => | 5, call() | => |         |
15|          |     | 7        |       |        |     | ...       |    |         |
16| Process  |     | ...      |       | Local  |     +-----------+    | Process |
17|  Pool    |     +----------+       | Worker |                      |  #1..n  |
18| Executor |                        | Thread |                      |         |
19|          |     +----------- +     |        |     +-----------+    |         |
20|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
21|          |     +------------+     |        |     +-----------+    |         |
22|          |     | 6: call()  |     |        |     | ...       |    |         |
23|          |     |    future  |     |        |     | 4, result |    |         |
24|          |     | ...        |     |        |     | 3, except |    |         |
25+----------+     +------------+     +--------+     +-----------+    +---------+
26
27Executor.submit() called:
28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
29- adds the id of the _WorkItem to the "Work Ids" queue
30
31Local worker thread:
32- reads work ids from the "Work Ids" queue and looks up the corresponding
33  WorkItem from the "Work Items" dict: if the work item has been cancelled then
34  it is simply removed from the dict, otherwise it is repackaged as a
35  _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
36  until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
37  calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
38- reads _ResultItems from "Result Q", updates the future stored in the
39  "Work Items" dict and deletes the dict entry
40
41Process #1..n:
42- reads _CallItems from "Call Q", executes the calls, and puts the resulting
43  _ResultItems in "Result Q"
44"""
45
46__author__ = 'Brian Quinlan (brian@sweetapp.com)'
47
48import os
49from concurrent.futures import _base
50import queue
51import multiprocessing as mp
52# This import is required to load the multiprocessing.connection submodule
53# so that it can be accessed later as `mp.connection`
54import multiprocessing.connection
55from multiprocessing.queues import Queue
56import threading
57import weakref
58from functools import partial
59import itertools
60import sys
61from traceback import format_exception
62
63
64_threads_wakeups = weakref.WeakKeyDictionary()
65_global_shutdown = False
66
67
68class _ThreadWakeup:
69    def __init__(self):
70        self._closed = False
71        self._lock = threading.Lock()
72        self._reader, self._writer = mp.Pipe(duplex=False)
73
74    def close(self):
75        # Please note that we do not take the self._lock when
76        # calling clear() (to avoid deadlocking) so this method can
77        # only be called safely from the same thread as all calls to
78        # clear() even if you hold the lock. Otherwise we
79        # might try to read from the closed pipe.
80        with self._lock:
81            if not self._closed:
82                self._closed = True
83                self._writer.close()
84                self._reader.close()
85
86    def wakeup(self):
87        with self._lock:
88            if not self._closed:
89                self._writer.send_bytes(b"")
90
91    def clear(self):
92        if self._closed:
93            raise RuntimeError('operation on closed _ThreadWakeup')
94        while self._reader.poll():
95            self._reader.recv_bytes()
96
97
98def _python_exit():
99    global _global_shutdown
100    _global_shutdown = True
101    items = list(_threads_wakeups.items())
102    for _, thread_wakeup in items:
103        # call not protected by ProcessPoolExecutor._shutdown_lock
104        thread_wakeup.wakeup()
105    for t, _ in items:
106        t.join()
107
108# Register for `_python_exit()` to be called just before joining all
109# non-daemon threads. This is used instead of `atexit.register()` for
110# compatibility with subinterpreters, which no longer support daemon threads.
111# See bpo-39812 for context.
112threading._register_atexit(_python_exit)
113
114# Controls how many more calls than processes will be queued in the call queue.
115# A smaller number will mean that processes spend more time idle waiting for
116# work while a larger number will make Future.cancel() succeed less frequently
117# (Futures in the call queue cannot be cancelled).
118EXTRA_QUEUED_CALLS = 1
119
120
121# On Windows, WaitForMultipleObjects is used to wait for processes to finish.
122# It can wait on, at most, 63 objects. There is an overhead of two objects:
123# - the result queue reader
124# - the thread wakeup reader
125_MAX_WINDOWS_WORKERS = 63 - 2
126
127# Hack to embed stringification of remote traceback in local traceback
128
129class _RemoteTraceback(Exception):
130    def __init__(self, tb):
131        self.tb = tb
132    def __str__(self):
133        return self.tb
134
135class _ExceptionWithTraceback:
136    def __init__(self, exc, tb):
137        tb = ''.join(format_exception(type(exc), exc, tb))
138        self.exc = exc
139        # Traceback object needs to be garbage-collected as its frames
140        # contain references to all the objects in the exception scope
141        self.exc.__traceback__ = None
142        self.tb = '\n"""\n%s"""' % tb
143    def __reduce__(self):
144        return _rebuild_exc, (self.exc, self.tb)
145
146def _rebuild_exc(exc, tb):
147    exc.__cause__ = _RemoteTraceback(tb)
148    return exc
149
150class _WorkItem(object):
151    def __init__(self, future, fn, args, kwargs):
152        self.future = future
153        self.fn = fn
154        self.args = args
155        self.kwargs = kwargs
156
157class _ResultItem(object):
158    def __init__(self, work_id, exception=None, result=None, exit_pid=None):
159        self.work_id = work_id
160        self.exception = exception
161        self.result = result
162        self.exit_pid = exit_pid
163
164class _CallItem(object):
165    def __init__(self, work_id, fn, args, kwargs):
166        self.work_id = work_id
167        self.fn = fn
168        self.args = args
169        self.kwargs = kwargs
170
171
172class _SafeQueue(Queue):
173    """Safe Queue set exception to the future object linked to a job"""
174    def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
175        self.pending_work_items = pending_work_items
176        self.thread_wakeup = thread_wakeup
177        super().__init__(max_size, ctx=ctx)
178
179    def _on_queue_feeder_error(self, e, obj):
180        if isinstance(obj, _CallItem):
181            tb = format_exception(type(e), e, e.__traceback__)
182            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
183            work_item = self.pending_work_items.pop(obj.work_id, None)
184            self.thread_wakeup.wakeup()
185            # work_item can be None if another process terminated. In this
186            # case, the executor_manager_thread fails all work_items
187            # with BrokenProcessPool
188            if work_item is not None:
189                work_item.future.set_exception(e)
190        else:
191            super()._on_queue_feeder_error(e, obj)
192
193
194def _process_chunk(fn, chunk):
195    """ Processes a chunk of an iterable passed to map.
196
197    Runs the function passed to map() on a chunk of the
198    iterable passed to map.
199
200    This function is run in a separate process.
201
202    """
203    return [fn(*args) for args in chunk]
204
205
206def _sendback_result(result_queue, work_id, result=None, exception=None,
207                     exit_pid=None):
208    """Safely send back the given result or exception"""
209    try:
210        result_queue.put(_ResultItem(work_id, result=result,
211                                     exception=exception, exit_pid=exit_pid))
212    except BaseException as e:
213        exc = _ExceptionWithTraceback(e, e.__traceback__)
214        result_queue.put(_ResultItem(work_id, exception=exc,
215                                     exit_pid=exit_pid))
216
217
218def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
219    """Evaluates calls from call_queue and places the results in result_queue.
220
221    This worker is run in a separate process.
222
223    Args:
224        call_queue: A ctx.Queue of _CallItems that will be read and
225            evaluated by the worker.
226        result_queue: A ctx.Queue of _ResultItems that will written
227            to by the worker.
228        initializer: A callable initializer, or None
229        initargs: A tuple of args for the initializer
230    """
231    if initializer is not None:
232        try:
233            initializer(*initargs)
234        except BaseException:
235            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
236            # The parent will notice that the process stopped and
237            # mark the pool broken
238            return
239    num_tasks = 0
240    exit_pid = None
241    while True:
242        call_item = call_queue.get(block=True)
243        if call_item is None:
244            # Wake up queue management thread
245            result_queue.put(os.getpid())
246            return
247
248        if max_tasks is not None:
249            num_tasks += 1
250            if num_tasks >= max_tasks:
251                exit_pid = os.getpid()
252
253        try:
254            r = call_item.fn(*call_item.args, **call_item.kwargs)
255        except BaseException as e:
256            exc = _ExceptionWithTraceback(e, e.__traceback__)
257            _sendback_result(result_queue, call_item.work_id, exception=exc,
258                             exit_pid=exit_pid)
259        else:
260            _sendback_result(result_queue, call_item.work_id, result=r,
261                             exit_pid=exit_pid)
262            del r
263
264        # Liberate the resource as soon as possible, to avoid holding onto
265        # open files or shared memory that is not needed anymore
266        del call_item
267
268        if exit_pid is not None:
269            return
270
271
272class _ExecutorManagerThread(threading.Thread):
273    """Manages the communication between this process and the worker processes.
274
275    The manager is run in a local thread.
276
277    Args:
278        executor: A reference to the ProcessPoolExecutor that owns
279            this thread. A weakref will be own by the manager as well as
280            references to internal objects used to introspect the state of
281            the executor.
282    """
283
284    def __init__(self, executor):
285        # Store references to necessary internals of the executor.
286
287        # A _ThreadWakeup to allow waking up the queue_manager_thread from the
288        # main Thread and avoid deadlocks caused by permanently locked queues.
289        self.thread_wakeup = executor._executor_manager_thread_wakeup
290        self.shutdown_lock = executor._shutdown_lock
291
292        # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
293        # to determine if the ProcessPoolExecutor has been garbage collected
294        # and that the manager can exit.
295        # When the executor gets garbage collected, the weakref callback
296        # will wake up the queue management thread so that it can terminate
297        # if there is no pending work item.
298        def weakref_cb(_,
299                       thread_wakeup=self.thread_wakeup,
300                       mp_util_debug=mp.util.debug):
301            mp_util_debug('Executor collected: triggering callback for'
302                          ' QueueManager wakeup')
303            thread_wakeup.wakeup()
304
305        self.executor_reference = weakref.ref(executor, weakref_cb)
306
307        # A list of the ctx.Process instances used as workers.
308        self.processes = executor._processes
309
310        # A ctx.Queue that will be filled with _CallItems derived from
311        # _WorkItems for processing by the process workers.
312        self.call_queue = executor._call_queue
313
314        # A ctx.SimpleQueue of _ResultItems generated by the process workers.
315        self.result_queue = executor._result_queue
316
317        # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
318        self.work_ids_queue = executor._work_ids
319
320        # Maximum number of tasks a worker process can execute before
321        # exiting safely
322        self.max_tasks_per_child = executor._max_tasks_per_child
323
324        # A dict mapping work ids to _WorkItems e.g.
325        #     {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
326        self.pending_work_items = executor._pending_work_items
327
328        super().__init__()
329
330    def run(self):
331        # Main loop for the executor manager thread.
332
333        while True:
334            # gh-109047: During Python finalization, self.call_queue.put()
335            # creation of a thread can fail with RuntimeError.
336            try:
337                self.add_call_item_to_queue()
338            except BaseException as exc:
339                cause = format_exception(exc)
340                self.terminate_broken(cause)
341                return
342
343            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
344
345            if is_broken:
346                self.terminate_broken(cause)
347                return
348            if result_item is not None:
349                self.process_result_item(result_item)
350
351                process_exited = result_item.exit_pid is not None
352                if process_exited:
353                    p = self.processes.pop(result_item.exit_pid)
354                    p.join()
355
356                # Delete reference to result_item to avoid keeping references
357                # while waiting on new results.
358                del result_item
359
360                if executor := self.executor_reference():
361                    if process_exited:
362                        with self.shutdown_lock:
363                            executor._adjust_process_count()
364                    else:
365                        executor._idle_worker_semaphore.release()
366                    del executor
367
368            if self.is_shutting_down():
369                self.flag_executor_shutting_down()
370
371                # When only canceled futures remain in pending_work_items, our
372                # next call to wait_result_broken_or_wakeup would hang forever.
373                # This makes sure we have some running futures or none at all.
374                self.add_call_item_to_queue()
375
376                # Since no new work items can be added, it is safe to shutdown
377                # this thread if there are no pending work items.
378                if not self.pending_work_items:
379                    self.join_executor_internals()
380                    return
381
382    def add_call_item_to_queue(self):
383        # Fills call_queue with _WorkItems from pending_work_items.
384        # This function never blocks.
385        while True:
386            if self.call_queue.full():
387                return
388            try:
389                work_id = self.work_ids_queue.get(block=False)
390            except queue.Empty:
391                return
392            else:
393                work_item = self.pending_work_items[work_id]
394
395                if work_item.future.set_running_or_notify_cancel():
396                    self.call_queue.put(_CallItem(work_id,
397                                                  work_item.fn,
398                                                  work_item.args,
399                                                  work_item.kwargs),
400                                        block=True)
401                else:
402                    del self.pending_work_items[work_id]
403                    continue
404
405    def wait_result_broken_or_wakeup(self):
406        # Wait for a result to be ready in the result_queue while checking
407        # that all worker processes are still running, or for a wake up
408        # signal send. The wake up signals come either from new tasks being
409        # submitted, from the executor being shutdown/gc-ed, or from the
410        # shutdown of the python interpreter.
411        result_reader = self.result_queue._reader
412        assert not self.thread_wakeup._closed
413        wakeup_reader = self.thread_wakeup._reader
414        readers = [result_reader, wakeup_reader]
415        worker_sentinels = [p.sentinel for p in list(self.processes.values())]
416        ready = mp.connection.wait(readers + worker_sentinels)
417
418        cause = None
419        is_broken = True
420        result_item = None
421        if result_reader in ready:
422            try:
423                result_item = result_reader.recv()
424                is_broken = False
425            except BaseException as exc:
426                cause = format_exception(exc)
427
428        elif wakeup_reader in ready:
429            is_broken = False
430
431        self.thread_wakeup.clear()
432
433        return result_item, is_broken, cause
434
435    def process_result_item(self, result_item):
436        # Process the received a result_item. This can be either the PID of a
437        # worker that exited gracefully or a _ResultItem
438
439        # Received a _ResultItem so mark the future as completed.
440        work_item = self.pending_work_items.pop(result_item.work_id, None)
441        # work_item can be None if another process terminated (see above)
442        if work_item is not None:
443            if result_item.exception:
444                work_item.future.set_exception(result_item.exception)
445            else:
446                work_item.future.set_result(result_item.result)
447
448    def is_shutting_down(self):
449        # Check whether we should start shutting down the executor.
450        executor = self.executor_reference()
451        # No more work items can be added if:
452        #   - The interpreter is shutting down OR
453        #   - The executor that owns this worker has been collected OR
454        #   - The executor that owns this worker has been shutdown.
455        return (_global_shutdown or executor is None
456                or executor._shutdown_thread)
457
458    def _terminate_broken(self, cause):
459        # Terminate the executor because it is in a broken state. The cause
460        # argument can be used to display more information on the error that
461        # lead the executor into becoming broken.
462
463        # Mark the process pool broken so that submits fail right now.
464        executor = self.executor_reference()
465        if executor is not None:
466            executor._broken = ('A child process terminated '
467                                'abruptly, the process pool is not '
468                                'usable anymore')
469            executor._shutdown_thread = True
470            executor = None
471
472        # All pending tasks are to be marked failed with the following
473        # BrokenProcessPool error
474        bpe = BrokenProcessPool("A process in the process pool was "
475                                "terminated abruptly while the future was "
476                                "running or pending.")
477        if cause is not None:
478            bpe.__cause__ = _RemoteTraceback(
479                f"\n'''\n{''.join(cause)}'''")
480
481        # Mark pending tasks as failed.
482        for work_id, work_item in self.pending_work_items.items():
483            try:
484                work_item.future.set_exception(bpe)
485            except _base.InvalidStateError:
486                # set_exception() fails if the future is cancelled: ignore it.
487                # Trying to check if the future is cancelled before calling
488                # set_exception() would leave a race condition if the future is
489                # cancelled between the check and set_exception().
490                pass
491            # Delete references to object. See issue16284
492            del work_item
493        self.pending_work_items.clear()
494
495        # Terminate remaining workers forcibly: the queues or their
496        # locks may be in a dirty state and block forever.
497        for p in self.processes.values():
498            p.terminate()
499
500        self.call_queue._terminate_broken()
501
502        # clean up resources
503        self._join_executor_internals(broken=True)
504
505    def terminate_broken(self, cause):
506        with self.shutdown_lock:
507            self._terminate_broken(cause)
508
509    def flag_executor_shutting_down(self):
510        # Flag the executor as shutting down and cancel remaining tasks if
511        # requested as early as possible if it is not gc-ed yet.
512        executor = self.executor_reference()
513        if executor is not None:
514            executor._shutdown_thread = True
515            # Cancel pending work items if requested.
516            if executor._cancel_pending_futures:
517                # Cancel all pending futures and update pending_work_items
518                # to only have futures that are currently running.
519                new_pending_work_items = {}
520                for work_id, work_item in self.pending_work_items.items():
521                    if not work_item.future.cancel():
522                        new_pending_work_items[work_id] = work_item
523                self.pending_work_items = new_pending_work_items
524                # Drain work_ids_queue since we no longer need to
525                # add items to the call queue.
526                while True:
527                    try:
528                        self.work_ids_queue.get_nowait()
529                    except queue.Empty:
530                        break
531                # Make sure we do this only once to not waste time looping
532                # on running processes over and over.
533                executor._cancel_pending_futures = False
534
535    def shutdown_workers(self):
536        n_children_to_stop = self.get_n_children_alive()
537        n_sentinels_sent = 0
538        # Send the right number of sentinels, to make sure all children are
539        # properly terminated.
540        while (n_sentinels_sent < n_children_to_stop
541                and self.get_n_children_alive() > 0):
542            for i in range(n_children_to_stop - n_sentinels_sent):
543                try:
544                    self.call_queue.put_nowait(None)
545                    n_sentinels_sent += 1
546                except queue.Full:
547                    break
548
549    def join_executor_internals(self):
550        with self.shutdown_lock:
551            self._join_executor_internals()
552
553    def _join_executor_internals(self, broken=False):
554        # If broken, call_queue was closed and so can no longer be used.
555        if not broken:
556            self.shutdown_workers()
557
558        # Release the queue's resources as soon as possible.
559        self.call_queue.close()
560        self.call_queue.join_thread()
561        self.thread_wakeup.close()
562
563        # If .join() is not called on the created processes then
564        # some ctx.Queue methods may deadlock on Mac OS X.
565        for p in self.processes.values():
566            if broken:
567                p.terminate()
568            p.join()
569
570    def get_n_children_alive(self):
571        # This is an upper bound on the number of children alive.
572        return sum(p.is_alive() for p in self.processes.values())
573
574
575_system_limits_checked = False
576_system_limited = None
577
578
579def _check_system_limits():
580    global _system_limits_checked, _system_limited
581    if _system_limits_checked:
582        if _system_limited:
583            raise NotImplementedError(_system_limited)
584    _system_limits_checked = True
585    try:
586        import multiprocessing.synchronize
587    except ImportError:
588        _system_limited = (
589            "This Python build lacks multiprocessing.synchronize, usually due "
590            "to named semaphores being unavailable on this platform."
591        )
592        raise NotImplementedError(_system_limited)
593    try:
594        nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")
595    except (AttributeError, ValueError):
596        # sysconf not available or setting not available
597        return
598    if nsems_max == -1:
599        # indetermined limit, assume that limit is determined
600        # by available memory only
601        return
602    if nsems_max >= 256:
603        # minimum number of semaphores available
604        # according to POSIX
605        return
606    _system_limited = ("system provides too few semaphores (%d"
607                       " available, 256 necessary)" % nsems_max)
608    raise NotImplementedError(_system_limited)
609
610
611def _chain_from_iterable_of_lists(iterable):
612    """
613    Specialized implementation of itertools.chain.from_iterable.
614    Each item in *iterable* should be a list.  This function is
615    careful not to keep references to yielded objects.
616    """
617    for element in iterable:
618        element.reverse()
619        while element:
620            yield element.pop()
621
622
623class BrokenProcessPool(_base.BrokenExecutor):
624    """
625    Raised when a process in a ProcessPoolExecutor terminated abruptly
626    while a future was in the running state.
627    """
628
629
630class ProcessPoolExecutor(_base.Executor):
631    def __init__(self, max_workers=None, mp_context=None,
632                 initializer=None, initargs=(), *, max_tasks_per_child=None):
633        """Initializes a new ProcessPoolExecutor instance.
634
635        Args:
636            max_workers: The maximum number of processes that can be used to
637                execute the given calls. If None or not given then as many
638                worker processes will be created as the machine has processors.
639            mp_context: A multiprocessing context to launch the workers created
640                using the multiprocessing.get_context('start method') API. This
641                object should provide SimpleQueue, Queue and Process.
642            initializer: A callable used to initialize worker processes.
643            initargs: A tuple of arguments to pass to the initializer.
644            max_tasks_per_child: The maximum number of tasks a worker process
645                can complete before it will exit and be replaced with a fresh
646                worker process. The default of None means worker process will
647                live as long as the executor. Requires a non-'fork' mp_context
648                start method. When given, we default to using 'spawn' if no
649                mp_context is supplied.
650        """
651        _check_system_limits()
652
653        if max_workers is None:
654            self._max_workers = os.process_cpu_count() or 1
655            if sys.platform == 'win32':
656                self._max_workers = min(_MAX_WINDOWS_WORKERS,
657                                        self._max_workers)
658        else:
659            if max_workers <= 0:
660                raise ValueError("max_workers must be greater than 0")
661            elif (sys.platform == 'win32' and
662                max_workers > _MAX_WINDOWS_WORKERS):
663                raise ValueError(
664                    f"max_workers must be <= {_MAX_WINDOWS_WORKERS}")
665
666            self._max_workers = max_workers
667
668        if mp_context is None:
669            if max_tasks_per_child is not None:
670                mp_context = mp.get_context("spawn")
671            else:
672                mp_context = mp.get_context()
673        self._mp_context = mp_context
674
675        # https://github.com/python/cpython/issues/90622
676        self._safe_to_dynamically_spawn_children = (
677                self._mp_context.get_start_method(allow_none=False) != "fork")
678
679        if initializer is not None and not callable(initializer):
680            raise TypeError("initializer must be a callable")
681        self._initializer = initializer
682        self._initargs = initargs
683
684        if max_tasks_per_child is not None:
685            if not isinstance(max_tasks_per_child, int):
686                raise TypeError("max_tasks_per_child must be an integer")
687            elif max_tasks_per_child <= 0:
688                raise ValueError("max_tasks_per_child must be >= 1")
689            if self._mp_context.get_start_method(allow_none=False) == "fork":
690                # https://github.com/python/cpython/issues/90622
691                raise ValueError("max_tasks_per_child is incompatible with"
692                                 " the 'fork' multiprocessing start method;"
693                                 " supply a different mp_context.")
694        self._max_tasks_per_child = max_tasks_per_child
695
696        # Management thread
697        self._executor_manager_thread = None
698
699        # Map of pids to processes
700        self._processes = {}
701
702        # Shutdown is a two-step process.
703        self._shutdown_thread = False
704        self._shutdown_lock = threading.Lock()
705        self._idle_worker_semaphore = threading.Semaphore(0)
706        self._broken = False
707        self._queue_count = 0
708        self._pending_work_items = {}
709        self._cancel_pending_futures = False
710
711        # _ThreadWakeup is a communication channel used to interrupt the wait
712        # of the main loop of executor_manager_thread from another thread (e.g.
713        # when calling executor.submit or executor.shutdown). We do not use the
714        # _result_queue to send wakeup signals to the executor_manager_thread
715        # as it could result in a deadlock if a worker process dies with the
716        # _result_queue write lock still acquired.
717        #
718        # Care must be taken to only call clear and close from the
719        # executor_manager_thread, since _ThreadWakeup.clear() is not protected
720        # by a lock.
721        self._executor_manager_thread_wakeup = _ThreadWakeup()
722
723        # Create communication channels for the executor
724        # Make the call queue slightly larger than the number of processes to
725        # prevent the worker processes from idling. But don't make it too big
726        # because futures in the call queue cannot be cancelled.
727        queue_size = self._max_workers + EXTRA_QUEUED_CALLS
728        self._call_queue = _SafeQueue(
729            max_size=queue_size, ctx=self._mp_context,
730            pending_work_items=self._pending_work_items,
731            thread_wakeup=self._executor_manager_thread_wakeup)
732        # Killed worker processes can produce spurious "broken pipe"
733        # tracebacks in the queue's own worker thread. But we detect killed
734        # processes anyway, so silence the tracebacks.
735        self._call_queue._ignore_epipe = True
736        self._result_queue = mp_context.SimpleQueue()
737        self._work_ids = queue.Queue()
738
739    def _start_executor_manager_thread(self):
740        if self._executor_manager_thread is None:
741            # Start the processes so that their sentinels are known.
742            if not self._safe_to_dynamically_spawn_children:  # ie, using fork.
743                self._launch_processes()
744            self._executor_manager_thread = _ExecutorManagerThread(self)
745            self._executor_manager_thread.start()
746            _threads_wakeups[self._executor_manager_thread] = \
747                self._executor_manager_thread_wakeup
748
749    def _adjust_process_count(self):
750        # if there's an idle process, we don't need to spawn a new one.
751        if self._idle_worker_semaphore.acquire(blocking=False):
752            return
753
754        process_count = len(self._processes)
755        if process_count < self._max_workers:
756            # Assertion disabled as this codepath is also used to replace a
757            # worker that unexpectedly dies, even when using the 'fork' start
758            # method. That means there is still a potential deadlock bug. If a
759            # 'fork' mp_context worker dies, we'll be forking a new one when
760            # we know a thread is running (self._executor_manager_thread).
761            #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
762            self._spawn_process()
763
764    def _launch_processes(self):
765        # https://github.com/python/cpython/issues/90622
766        assert not self._executor_manager_thread, (
767                'Processes cannot be fork()ed after the thread has started, '
768                'deadlock in the child processes could result.')
769        for _ in range(len(self._processes), self._max_workers):
770            self._spawn_process()
771
772    def _spawn_process(self):
773        p = self._mp_context.Process(
774            target=_process_worker,
775            args=(self._call_queue,
776                  self._result_queue,
777                  self._initializer,
778                  self._initargs,
779                  self._max_tasks_per_child))
780        p.start()
781        self._processes[p.pid] = p
782
783    def submit(self, fn, /, *args, **kwargs):
784        with self._shutdown_lock:
785            if self._broken:
786                raise BrokenProcessPool(self._broken)
787            if self._shutdown_thread:
788                raise RuntimeError('cannot schedule new futures after shutdown')
789            if _global_shutdown:
790                raise RuntimeError('cannot schedule new futures after '
791                                   'interpreter shutdown')
792
793            f = _base.Future()
794            w = _WorkItem(f, fn, args, kwargs)
795
796            self._pending_work_items[self._queue_count] = w
797            self._work_ids.put(self._queue_count)
798            self._queue_count += 1
799            # Wake up queue management thread
800            self._executor_manager_thread_wakeup.wakeup()
801
802            if self._safe_to_dynamically_spawn_children:
803                self._adjust_process_count()
804            self._start_executor_manager_thread()
805            return f
806    submit.__doc__ = _base.Executor.submit.__doc__
807
808    def map(self, fn, *iterables, timeout=None, chunksize=1):
809        """Returns an iterator equivalent to map(fn, iter).
810
811        Args:
812            fn: A callable that will take as many arguments as there are
813                passed iterables.
814            timeout: The maximum number of seconds to wait. If None, then there
815                is no limit on the wait time.
816            chunksize: If greater than one, the iterables will be chopped into
817                chunks of size chunksize and submitted to the process pool.
818                If set to one, the items in the list will be sent one at a time.
819
820        Returns:
821            An iterator equivalent to: map(func, *iterables) but the calls may
822            be evaluated out-of-order.
823
824        Raises:
825            TimeoutError: If the entire result iterator could not be generated
826                before the given timeout.
827            Exception: If fn(*args) raises for any values.
828        """
829        if chunksize < 1:
830            raise ValueError("chunksize must be >= 1.")
831
832        results = super().map(partial(_process_chunk, fn),
833                              itertools.batched(zip(*iterables), chunksize),
834                              timeout=timeout)
835        return _chain_from_iterable_of_lists(results)
836
837    def shutdown(self, wait=True, *, cancel_futures=False):
838        with self._shutdown_lock:
839            self._cancel_pending_futures = cancel_futures
840            self._shutdown_thread = True
841            if self._executor_manager_thread_wakeup is not None:
842                # Wake up queue management thread
843                self._executor_manager_thread_wakeup.wakeup()
844
845        if self._executor_manager_thread is not None and wait:
846            self._executor_manager_thread.join()
847        # To reduce the risk of opening too many files, remove references to
848        # objects that use file descriptors.
849        self._executor_manager_thread = None
850        self._call_queue = None
851        if self._result_queue is not None and wait:
852            self._result_queue.close()
853        self._result_queue = None
854        self._processes = None
855        self._executor_manager_thread_wakeup = None
856
857    shutdown.__doc__ = _base.Executor.shutdown.__doc__
858