1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ProcessPoolExecutor. 5 6The following diagram and text describe the data-flow through the system: 7 8|======================= In-process =====================|== Out-of-process ==| 9 10+----------+ +----------+ +--------+ +-----------+ +---------+ 11| | => | Work Ids | | | | Call Q | | Process | 12| | +----------+ | | +-----------+ | Pool | 13| | | ... | | | | ... | +---------+ 14| | | 6 | => | | => | 5, call() | => | | 15| | | 7 | | | | ... | | | 16| Process | | ... | | Local | +-----------+ | Process | 17| Pool | +----------+ | Worker | | #1..n | 18| Executor | | Thread | | | 19| | +----------- + | | +-----------+ | | 20| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21| | +------------+ | | +-----------+ | | 22| | | 6: call() | | | | ... | | | 23| | | future | | | | 4, result | | | 24| | | ... | | | | 3, except | | | 25+----------+ +------------+ +--------+ +-----------+ +---------+ 26 27Executor.submit() called: 28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29- adds the id of the _WorkItem to the "Work Ids" queue 30 31Local worker thread: 32- reads work ids from the "Work Ids" queue and looks up the corresponding 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then 34 it is simply removed from the dict, otherwise it is repackaged as a 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38- reads _ResultItems from "Result Q", updates the future stored in the 39 "Work Items" dict and deletes the dict entry 40 41Process #1..n: 42- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43 _ResultItems in "Result Q" 44""" 45 46__author__ = 'Brian Quinlan (brian@sweetapp.com)' 47 48import os 49from concurrent.futures import _base 50import queue 51import multiprocessing as mp 52# This import is required to load the multiprocessing.connection submodule 53# so that it can be accessed later as `mp.connection` 54import multiprocessing.connection 55from multiprocessing.queues import Queue 56import threading 57import weakref 58from functools import partial 59import itertools 60import sys 61from traceback import format_exception 62 63 64_threads_wakeups = weakref.WeakKeyDictionary() 65_global_shutdown = False 66 67 68class _ThreadWakeup: 69 def __init__(self): 70 self._closed = False 71 self._lock = threading.Lock() 72 self._reader, self._writer = mp.Pipe(duplex=False) 73 74 def close(self): 75 # Please note that we do not take the self._lock when 76 # calling clear() (to avoid deadlocking) so this method can 77 # only be called safely from the same thread as all calls to 78 # clear() even if you hold the lock. Otherwise we 79 # might try to read from the closed pipe. 80 with self._lock: 81 if not self._closed: 82 self._closed = True 83 self._writer.close() 84 self._reader.close() 85 86 def wakeup(self): 87 with self._lock: 88 if not self._closed: 89 self._writer.send_bytes(b"") 90 91 def clear(self): 92 if self._closed: 93 raise RuntimeError('operation on closed _ThreadWakeup') 94 while self._reader.poll(): 95 self._reader.recv_bytes() 96 97 98def _python_exit(): 99 global _global_shutdown 100 _global_shutdown = True 101 items = list(_threads_wakeups.items()) 102 for _, thread_wakeup in items: 103 # call not protected by ProcessPoolExecutor._shutdown_lock 104 thread_wakeup.wakeup() 105 for t, _ in items: 106 t.join() 107 108# Register for `_python_exit()` to be called just before joining all 109# non-daemon threads. This is used instead of `atexit.register()` for 110# compatibility with subinterpreters, which no longer support daemon threads. 111# See bpo-39812 for context. 112threading._register_atexit(_python_exit) 113 114# Controls how many more calls than processes will be queued in the call queue. 115# A smaller number will mean that processes spend more time idle waiting for 116# work while a larger number will make Future.cancel() succeed less frequently 117# (Futures in the call queue cannot be cancelled). 118EXTRA_QUEUED_CALLS = 1 119 120 121# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 122# It can wait on, at most, 63 objects. There is an overhead of two objects: 123# - the result queue reader 124# - the thread wakeup reader 125_MAX_WINDOWS_WORKERS = 63 - 2 126 127# Hack to embed stringification of remote traceback in local traceback 128 129class _RemoteTraceback(Exception): 130 def __init__(self, tb): 131 self.tb = tb 132 def __str__(self): 133 return self.tb 134 135class _ExceptionWithTraceback: 136 def __init__(self, exc, tb): 137 tb = ''.join(format_exception(type(exc), exc, tb)) 138 self.exc = exc 139 # Traceback object needs to be garbage-collected as its frames 140 # contain references to all the objects in the exception scope 141 self.exc.__traceback__ = None 142 self.tb = '\n"""\n%s"""' % tb 143 def __reduce__(self): 144 return _rebuild_exc, (self.exc, self.tb) 145 146def _rebuild_exc(exc, tb): 147 exc.__cause__ = _RemoteTraceback(tb) 148 return exc 149 150class _WorkItem(object): 151 def __init__(self, future, fn, args, kwargs): 152 self.future = future 153 self.fn = fn 154 self.args = args 155 self.kwargs = kwargs 156 157class _ResultItem(object): 158 def __init__(self, work_id, exception=None, result=None, exit_pid=None): 159 self.work_id = work_id 160 self.exception = exception 161 self.result = result 162 self.exit_pid = exit_pid 163 164class _CallItem(object): 165 def __init__(self, work_id, fn, args, kwargs): 166 self.work_id = work_id 167 self.fn = fn 168 self.args = args 169 self.kwargs = kwargs 170 171 172class _SafeQueue(Queue): 173 """Safe Queue set exception to the future object linked to a job""" 174 def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): 175 self.pending_work_items = pending_work_items 176 self.thread_wakeup = thread_wakeup 177 super().__init__(max_size, ctx=ctx) 178 179 def _on_queue_feeder_error(self, e, obj): 180 if isinstance(obj, _CallItem): 181 tb = format_exception(type(e), e, e.__traceback__) 182 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 183 work_item = self.pending_work_items.pop(obj.work_id, None) 184 self.thread_wakeup.wakeup() 185 # work_item can be None if another process terminated. In this 186 # case, the executor_manager_thread fails all work_items 187 # with BrokenProcessPool 188 if work_item is not None: 189 work_item.future.set_exception(e) 190 else: 191 super()._on_queue_feeder_error(e, obj) 192 193 194def _process_chunk(fn, chunk): 195 """ Processes a chunk of an iterable passed to map. 196 197 Runs the function passed to map() on a chunk of the 198 iterable passed to map. 199 200 This function is run in a separate process. 201 202 """ 203 return [fn(*args) for args in chunk] 204 205 206def _sendback_result(result_queue, work_id, result=None, exception=None, 207 exit_pid=None): 208 """Safely send back the given result or exception""" 209 try: 210 result_queue.put(_ResultItem(work_id, result=result, 211 exception=exception, exit_pid=exit_pid)) 212 except BaseException as e: 213 exc = _ExceptionWithTraceback(e, e.__traceback__) 214 result_queue.put(_ResultItem(work_id, exception=exc, 215 exit_pid=exit_pid)) 216 217 218def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): 219 """Evaluates calls from call_queue and places the results in result_queue. 220 221 This worker is run in a separate process. 222 223 Args: 224 call_queue: A ctx.Queue of _CallItems that will be read and 225 evaluated by the worker. 226 result_queue: A ctx.Queue of _ResultItems that will written 227 to by the worker. 228 initializer: A callable initializer, or None 229 initargs: A tuple of args for the initializer 230 """ 231 if initializer is not None: 232 try: 233 initializer(*initargs) 234 except BaseException: 235 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 236 # The parent will notice that the process stopped and 237 # mark the pool broken 238 return 239 num_tasks = 0 240 exit_pid = None 241 while True: 242 call_item = call_queue.get(block=True) 243 if call_item is None: 244 # Wake up queue management thread 245 result_queue.put(os.getpid()) 246 return 247 248 if max_tasks is not None: 249 num_tasks += 1 250 if num_tasks >= max_tasks: 251 exit_pid = os.getpid() 252 253 try: 254 r = call_item.fn(*call_item.args, **call_item.kwargs) 255 except BaseException as e: 256 exc = _ExceptionWithTraceback(e, e.__traceback__) 257 _sendback_result(result_queue, call_item.work_id, exception=exc, 258 exit_pid=exit_pid) 259 else: 260 _sendback_result(result_queue, call_item.work_id, result=r, 261 exit_pid=exit_pid) 262 del r 263 264 # Liberate the resource as soon as possible, to avoid holding onto 265 # open files or shared memory that is not needed anymore 266 del call_item 267 268 if exit_pid is not None: 269 return 270 271 272class _ExecutorManagerThread(threading.Thread): 273 """Manages the communication between this process and the worker processes. 274 275 The manager is run in a local thread. 276 277 Args: 278 executor: A reference to the ProcessPoolExecutor that owns 279 this thread. A weakref will be own by the manager as well as 280 references to internal objects used to introspect the state of 281 the executor. 282 """ 283 284 def __init__(self, executor): 285 # Store references to necessary internals of the executor. 286 287 # A _ThreadWakeup to allow waking up the queue_manager_thread from the 288 # main Thread and avoid deadlocks caused by permanently locked queues. 289 self.thread_wakeup = executor._executor_manager_thread_wakeup 290 self.shutdown_lock = executor._shutdown_lock 291 292 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 293 # to determine if the ProcessPoolExecutor has been garbage collected 294 # and that the manager can exit. 295 # When the executor gets garbage collected, the weakref callback 296 # will wake up the queue management thread so that it can terminate 297 # if there is no pending work item. 298 def weakref_cb(_, 299 thread_wakeup=self.thread_wakeup, 300 mp_util_debug=mp.util.debug): 301 mp_util_debug('Executor collected: triggering callback for' 302 ' QueueManager wakeup') 303 thread_wakeup.wakeup() 304 305 self.executor_reference = weakref.ref(executor, weakref_cb) 306 307 # A list of the ctx.Process instances used as workers. 308 self.processes = executor._processes 309 310 # A ctx.Queue that will be filled with _CallItems derived from 311 # _WorkItems for processing by the process workers. 312 self.call_queue = executor._call_queue 313 314 # A ctx.SimpleQueue of _ResultItems generated by the process workers. 315 self.result_queue = executor._result_queue 316 317 # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 318 self.work_ids_queue = executor._work_ids 319 320 # Maximum number of tasks a worker process can execute before 321 # exiting safely 322 self.max_tasks_per_child = executor._max_tasks_per_child 323 324 # A dict mapping work ids to _WorkItems e.g. 325 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 326 self.pending_work_items = executor._pending_work_items 327 328 super().__init__() 329 330 def run(self): 331 # Main loop for the executor manager thread. 332 333 while True: 334 # gh-109047: During Python finalization, self.call_queue.put() 335 # creation of a thread can fail with RuntimeError. 336 try: 337 self.add_call_item_to_queue() 338 except BaseException as exc: 339 cause = format_exception(exc) 340 self.terminate_broken(cause) 341 return 342 343 result_item, is_broken, cause = self.wait_result_broken_or_wakeup() 344 345 if is_broken: 346 self.terminate_broken(cause) 347 return 348 if result_item is not None: 349 self.process_result_item(result_item) 350 351 process_exited = result_item.exit_pid is not None 352 if process_exited: 353 p = self.processes.pop(result_item.exit_pid) 354 p.join() 355 356 # Delete reference to result_item to avoid keeping references 357 # while waiting on new results. 358 del result_item 359 360 if executor := self.executor_reference(): 361 if process_exited: 362 with self.shutdown_lock: 363 executor._adjust_process_count() 364 else: 365 executor._idle_worker_semaphore.release() 366 del executor 367 368 if self.is_shutting_down(): 369 self.flag_executor_shutting_down() 370 371 # When only canceled futures remain in pending_work_items, our 372 # next call to wait_result_broken_or_wakeup would hang forever. 373 # This makes sure we have some running futures or none at all. 374 self.add_call_item_to_queue() 375 376 # Since no new work items can be added, it is safe to shutdown 377 # this thread if there are no pending work items. 378 if not self.pending_work_items: 379 self.join_executor_internals() 380 return 381 382 def add_call_item_to_queue(self): 383 # Fills call_queue with _WorkItems from pending_work_items. 384 # This function never blocks. 385 while True: 386 if self.call_queue.full(): 387 return 388 try: 389 work_id = self.work_ids_queue.get(block=False) 390 except queue.Empty: 391 return 392 else: 393 work_item = self.pending_work_items[work_id] 394 395 if work_item.future.set_running_or_notify_cancel(): 396 self.call_queue.put(_CallItem(work_id, 397 work_item.fn, 398 work_item.args, 399 work_item.kwargs), 400 block=True) 401 else: 402 del self.pending_work_items[work_id] 403 continue 404 405 def wait_result_broken_or_wakeup(self): 406 # Wait for a result to be ready in the result_queue while checking 407 # that all worker processes are still running, or for a wake up 408 # signal send. The wake up signals come either from new tasks being 409 # submitted, from the executor being shutdown/gc-ed, or from the 410 # shutdown of the python interpreter. 411 result_reader = self.result_queue._reader 412 assert not self.thread_wakeup._closed 413 wakeup_reader = self.thread_wakeup._reader 414 readers = [result_reader, wakeup_reader] 415 worker_sentinels = [p.sentinel for p in list(self.processes.values())] 416 ready = mp.connection.wait(readers + worker_sentinels) 417 418 cause = None 419 is_broken = True 420 result_item = None 421 if result_reader in ready: 422 try: 423 result_item = result_reader.recv() 424 is_broken = False 425 except BaseException as exc: 426 cause = format_exception(exc) 427 428 elif wakeup_reader in ready: 429 is_broken = False 430 431 self.thread_wakeup.clear() 432 433 return result_item, is_broken, cause 434 435 def process_result_item(self, result_item): 436 # Process the received a result_item. This can be either the PID of a 437 # worker that exited gracefully or a _ResultItem 438 439 # Received a _ResultItem so mark the future as completed. 440 work_item = self.pending_work_items.pop(result_item.work_id, None) 441 # work_item can be None if another process terminated (see above) 442 if work_item is not None: 443 if result_item.exception: 444 work_item.future.set_exception(result_item.exception) 445 else: 446 work_item.future.set_result(result_item.result) 447 448 def is_shutting_down(self): 449 # Check whether we should start shutting down the executor. 450 executor = self.executor_reference() 451 # No more work items can be added if: 452 # - The interpreter is shutting down OR 453 # - The executor that owns this worker has been collected OR 454 # - The executor that owns this worker has been shutdown. 455 return (_global_shutdown or executor is None 456 or executor._shutdown_thread) 457 458 def _terminate_broken(self, cause): 459 # Terminate the executor because it is in a broken state. The cause 460 # argument can be used to display more information on the error that 461 # lead the executor into becoming broken. 462 463 # Mark the process pool broken so that submits fail right now. 464 executor = self.executor_reference() 465 if executor is not None: 466 executor._broken = ('A child process terminated ' 467 'abruptly, the process pool is not ' 468 'usable anymore') 469 executor._shutdown_thread = True 470 executor = None 471 472 # All pending tasks are to be marked failed with the following 473 # BrokenProcessPool error 474 bpe = BrokenProcessPool("A process in the process pool was " 475 "terminated abruptly while the future was " 476 "running or pending.") 477 if cause is not None: 478 bpe.__cause__ = _RemoteTraceback( 479 f"\n'''\n{''.join(cause)}'''") 480 481 # Mark pending tasks as failed. 482 for work_id, work_item in self.pending_work_items.items(): 483 try: 484 work_item.future.set_exception(bpe) 485 except _base.InvalidStateError: 486 # set_exception() fails if the future is cancelled: ignore it. 487 # Trying to check if the future is cancelled before calling 488 # set_exception() would leave a race condition if the future is 489 # cancelled between the check and set_exception(). 490 pass 491 # Delete references to object. See issue16284 492 del work_item 493 self.pending_work_items.clear() 494 495 # Terminate remaining workers forcibly: the queues or their 496 # locks may be in a dirty state and block forever. 497 for p in self.processes.values(): 498 p.terminate() 499 500 self.call_queue._terminate_broken() 501 502 # clean up resources 503 self._join_executor_internals(broken=True) 504 505 def terminate_broken(self, cause): 506 with self.shutdown_lock: 507 self._terminate_broken(cause) 508 509 def flag_executor_shutting_down(self): 510 # Flag the executor as shutting down and cancel remaining tasks if 511 # requested as early as possible if it is not gc-ed yet. 512 executor = self.executor_reference() 513 if executor is not None: 514 executor._shutdown_thread = True 515 # Cancel pending work items if requested. 516 if executor._cancel_pending_futures: 517 # Cancel all pending futures and update pending_work_items 518 # to only have futures that are currently running. 519 new_pending_work_items = {} 520 for work_id, work_item in self.pending_work_items.items(): 521 if not work_item.future.cancel(): 522 new_pending_work_items[work_id] = work_item 523 self.pending_work_items = new_pending_work_items 524 # Drain work_ids_queue since we no longer need to 525 # add items to the call queue. 526 while True: 527 try: 528 self.work_ids_queue.get_nowait() 529 except queue.Empty: 530 break 531 # Make sure we do this only once to not waste time looping 532 # on running processes over and over. 533 executor._cancel_pending_futures = False 534 535 def shutdown_workers(self): 536 n_children_to_stop = self.get_n_children_alive() 537 n_sentinels_sent = 0 538 # Send the right number of sentinels, to make sure all children are 539 # properly terminated. 540 while (n_sentinels_sent < n_children_to_stop 541 and self.get_n_children_alive() > 0): 542 for i in range(n_children_to_stop - n_sentinels_sent): 543 try: 544 self.call_queue.put_nowait(None) 545 n_sentinels_sent += 1 546 except queue.Full: 547 break 548 549 def join_executor_internals(self): 550 with self.shutdown_lock: 551 self._join_executor_internals() 552 553 def _join_executor_internals(self, broken=False): 554 # If broken, call_queue was closed and so can no longer be used. 555 if not broken: 556 self.shutdown_workers() 557 558 # Release the queue's resources as soon as possible. 559 self.call_queue.close() 560 self.call_queue.join_thread() 561 self.thread_wakeup.close() 562 563 # If .join() is not called on the created processes then 564 # some ctx.Queue methods may deadlock on Mac OS X. 565 for p in self.processes.values(): 566 if broken: 567 p.terminate() 568 p.join() 569 570 def get_n_children_alive(self): 571 # This is an upper bound on the number of children alive. 572 return sum(p.is_alive() for p in self.processes.values()) 573 574 575_system_limits_checked = False 576_system_limited = None 577 578 579def _check_system_limits(): 580 global _system_limits_checked, _system_limited 581 if _system_limits_checked: 582 if _system_limited: 583 raise NotImplementedError(_system_limited) 584 _system_limits_checked = True 585 try: 586 import multiprocessing.synchronize 587 except ImportError: 588 _system_limited = ( 589 "This Python build lacks multiprocessing.synchronize, usually due " 590 "to named semaphores being unavailable on this platform." 591 ) 592 raise NotImplementedError(_system_limited) 593 try: 594 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 595 except (AttributeError, ValueError): 596 # sysconf not available or setting not available 597 return 598 if nsems_max == -1: 599 # indetermined limit, assume that limit is determined 600 # by available memory only 601 return 602 if nsems_max >= 256: 603 # minimum number of semaphores available 604 # according to POSIX 605 return 606 _system_limited = ("system provides too few semaphores (%d" 607 " available, 256 necessary)" % nsems_max) 608 raise NotImplementedError(_system_limited) 609 610 611def _chain_from_iterable_of_lists(iterable): 612 """ 613 Specialized implementation of itertools.chain.from_iterable. 614 Each item in *iterable* should be a list. This function is 615 careful not to keep references to yielded objects. 616 """ 617 for element in iterable: 618 element.reverse() 619 while element: 620 yield element.pop() 621 622 623class BrokenProcessPool(_base.BrokenExecutor): 624 """ 625 Raised when a process in a ProcessPoolExecutor terminated abruptly 626 while a future was in the running state. 627 """ 628 629 630class ProcessPoolExecutor(_base.Executor): 631 def __init__(self, max_workers=None, mp_context=None, 632 initializer=None, initargs=(), *, max_tasks_per_child=None): 633 """Initializes a new ProcessPoolExecutor instance. 634 635 Args: 636 max_workers: The maximum number of processes that can be used to 637 execute the given calls. If None or not given then as many 638 worker processes will be created as the machine has processors. 639 mp_context: A multiprocessing context to launch the workers created 640 using the multiprocessing.get_context('start method') API. This 641 object should provide SimpleQueue, Queue and Process. 642 initializer: A callable used to initialize worker processes. 643 initargs: A tuple of arguments to pass to the initializer. 644 max_tasks_per_child: The maximum number of tasks a worker process 645 can complete before it will exit and be replaced with a fresh 646 worker process. The default of None means worker process will 647 live as long as the executor. Requires a non-'fork' mp_context 648 start method. When given, we default to using 'spawn' if no 649 mp_context is supplied. 650 """ 651 _check_system_limits() 652 653 if max_workers is None: 654 self._max_workers = os.process_cpu_count() or 1 655 if sys.platform == 'win32': 656 self._max_workers = min(_MAX_WINDOWS_WORKERS, 657 self._max_workers) 658 else: 659 if max_workers <= 0: 660 raise ValueError("max_workers must be greater than 0") 661 elif (sys.platform == 'win32' and 662 max_workers > _MAX_WINDOWS_WORKERS): 663 raise ValueError( 664 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 665 666 self._max_workers = max_workers 667 668 if mp_context is None: 669 if max_tasks_per_child is not None: 670 mp_context = mp.get_context("spawn") 671 else: 672 mp_context = mp.get_context() 673 self._mp_context = mp_context 674 675 # https://github.com/python/cpython/issues/90622 676 self._safe_to_dynamically_spawn_children = ( 677 self._mp_context.get_start_method(allow_none=False) != "fork") 678 679 if initializer is not None and not callable(initializer): 680 raise TypeError("initializer must be a callable") 681 self._initializer = initializer 682 self._initargs = initargs 683 684 if max_tasks_per_child is not None: 685 if not isinstance(max_tasks_per_child, int): 686 raise TypeError("max_tasks_per_child must be an integer") 687 elif max_tasks_per_child <= 0: 688 raise ValueError("max_tasks_per_child must be >= 1") 689 if self._mp_context.get_start_method(allow_none=False) == "fork": 690 # https://github.com/python/cpython/issues/90622 691 raise ValueError("max_tasks_per_child is incompatible with" 692 " the 'fork' multiprocessing start method;" 693 " supply a different mp_context.") 694 self._max_tasks_per_child = max_tasks_per_child 695 696 # Management thread 697 self._executor_manager_thread = None 698 699 # Map of pids to processes 700 self._processes = {} 701 702 # Shutdown is a two-step process. 703 self._shutdown_thread = False 704 self._shutdown_lock = threading.Lock() 705 self._idle_worker_semaphore = threading.Semaphore(0) 706 self._broken = False 707 self._queue_count = 0 708 self._pending_work_items = {} 709 self._cancel_pending_futures = False 710 711 # _ThreadWakeup is a communication channel used to interrupt the wait 712 # of the main loop of executor_manager_thread from another thread (e.g. 713 # when calling executor.submit or executor.shutdown). We do not use the 714 # _result_queue to send wakeup signals to the executor_manager_thread 715 # as it could result in a deadlock if a worker process dies with the 716 # _result_queue write lock still acquired. 717 # 718 # Care must be taken to only call clear and close from the 719 # executor_manager_thread, since _ThreadWakeup.clear() is not protected 720 # by a lock. 721 self._executor_manager_thread_wakeup = _ThreadWakeup() 722 723 # Create communication channels for the executor 724 # Make the call queue slightly larger than the number of processes to 725 # prevent the worker processes from idling. But don't make it too big 726 # because futures in the call queue cannot be cancelled. 727 queue_size = self._max_workers + EXTRA_QUEUED_CALLS 728 self._call_queue = _SafeQueue( 729 max_size=queue_size, ctx=self._mp_context, 730 pending_work_items=self._pending_work_items, 731 thread_wakeup=self._executor_manager_thread_wakeup) 732 # Killed worker processes can produce spurious "broken pipe" 733 # tracebacks in the queue's own worker thread. But we detect killed 734 # processes anyway, so silence the tracebacks. 735 self._call_queue._ignore_epipe = True 736 self._result_queue = mp_context.SimpleQueue() 737 self._work_ids = queue.Queue() 738 739 def _start_executor_manager_thread(self): 740 if self._executor_manager_thread is None: 741 # Start the processes so that their sentinels are known. 742 if not self._safe_to_dynamically_spawn_children: # ie, using fork. 743 self._launch_processes() 744 self._executor_manager_thread = _ExecutorManagerThread(self) 745 self._executor_manager_thread.start() 746 _threads_wakeups[self._executor_manager_thread] = \ 747 self._executor_manager_thread_wakeup 748 749 def _adjust_process_count(self): 750 # if there's an idle process, we don't need to spawn a new one. 751 if self._idle_worker_semaphore.acquire(blocking=False): 752 return 753 754 process_count = len(self._processes) 755 if process_count < self._max_workers: 756 # Assertion disabled as this codepath is also used to replace a 757 # worker that unexpectedly dies, even when using the 'fork' start 758 # method. That means there is still a potential deadlock bug. If a 759 # 'fork' mp_context worker dies, we'll be forking a new one when 760 # we know a thread is running (self._executor_manager_thread). 761 #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' 762 self._spawn_process() 763 764 def _launch_processes(self): 765 # https://github.com/python/cpython/issues/90622 766 assert not self._executor_manager_thread, ( 767 'Processes cannot be fork()ed after the thread has started, ' 768 'deadlock in the child processes could result.') 769 for _ in range(len(self._processes), self._max_workers): 770 self._spawn_process() 771 772 def _spawn_process(self): 773 p = self._mp_context.Process( 774 target=_process_worker, 775 args=(self._call_queue, 776 self._result_queue, 777 self._initializer, 778 self._initargs, 779 self._max_tasks_per_child)) 780 p.start() 781 self._processes[p.pid] = p 782 783 def submit(self, fn, /, *args, **kwargs): 784 with self._shutdown_lock: 785 if self._broken: 786 raise BrokenProcessPool(self._broken) 787 if self._shutdown_thread: 788 raise RuntimeError('cannot schedule new futures after shutdown') 789 if _global_shutdown: 790 raise RuntimeError('cannot schedule new futures after ' 791 'interpreter shutdown') 792 793 f = _base.Future() 794 w = _WorkItem(f, fn, args, kwargs) 795 796 self._pending_work_items[self._queue_count] = w 797 self._work_ids.put(self._queue_count) 798 self._queue_count += 1 799 # Wake up queue management thread 800 self._executor_manager_thread_wakeup.wakeup() 801 802 if self._safe_to_dynamically_spawn_children: 803 self._adjust_process_count() 804 self._start_executor_manager_thread() 805 return f 806 submit.__doc__ = _base.Executor.submit.__doc__ 807 808 def map(self, fn, *iterables, timeout=None, chunksize=1): 809 """Returns an iterator equivalent to map(fn, iter). 810 811 Args: 812 fn: A callable that will take as many arguments as there are 813 passed iterables. 814 timeout: The maximum number of seconds to wait. If None, then there 815 is no limit on the wait time. 816 chunksize: If greater than one, the iterables will be chopped into 817 chunks of size chunksize and submitted to the process pool. 818 If set to one, the items in the list will be sent one at a time. 819 820 Returns: 821 An iterator equivalent to: map(func, *iterables) but the calls may 822 be evaluated out-of-order. 823 824 Raises: 825 TimeoutError: If the entire result iterator could not be generated 826 before the given timeout. 827 Exception: If fn(*args) raises for any values. 828 """ 829 if chunksize < 1: 830 raise ValueError("chunksize must be >= 1.") 831 832 results = super().map(partial(_process_chunk, fn), 833 itertools.batched(zip(*iterables), chunksize), 834 timeout=timeout) 835 return _chain_from_iterable_of_lists(results) 836 837 def shutdown(self, wait=True, *, cancel_futures=False): 838 with self._shutdown_lock: 839 self._cancel_pending_futures = cancel_futures 840 self._shutdown_thread = True 841 if self._executor_manager_thread_wakeup is not None: 842 # Wake up queue management thread 843 self._executor_manager_thread_wakeup.wakeup() 844 845 if self._executor_manager_thread is not None and wait: 846 self._executor_manager_thread.join() 847 # To reduce the risk of opening too many files, remove references to 848 # objects that use file descriptors. 849 self._executor_manager_thread = None 850 self._call_queue = None 851 if self._result_queue is not None and wait: 852 self._result_queue.close() 853 self._result_queue = None 854 self._processes = None 855 self._executor_manager_thread_wakeup = None 856 857 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 858