1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ProcessPoolExecutor. 5 6The following diagram and text describe the data-flow through the system: 7 8|======================= In-process =====================|== Out-of-process ==| 9 10+----------+ +----------+ +--------+ +-----------+ +---------+ 11| | => | Work Ids | | | | Call Q | | Process | 12| | +----------+ | | +-----------+ | Pool | 13| | | ... | | | | ... | +---------+ 14| | | 6 | => | | => | 5, call() | => | | 15| | | 7 | | | | ... | | | 16| Process | | ... | | Local | +-----------+ | Process | 17| Pool | +----------+ | Worker | | #1..n | 18| Executor | | Thread | | | 19| | +----------- + | | +-----------+ | | 20| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21| | +------------+ | | +-----------+ | | 22| | | 6: call() | | | | ... | | | 23| | | future | | | | 4, result | | | 24| | | ... | | | | 3, except | | | 25+----------+ +------------+ +--------+ +-----------+ +---------+ 26 27Executor.submit() called: 28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29- adds the id of the _WorkItem to the "Work Ids" queue 30 31Local worker thread: 32- reads work ids from the "Work Ids" queue and looks up the corresponding 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then 34 it is simply removed from the dict, otherwise it is repackaged as a 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38- reads _ResultItems from "Result Q", updates the future stored in the 39 "Work Items" dict and deletes the dict entry 40 41Process #1..n: 42- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43 _ResultItems in "Result Q" 44""" 45 46__author__ = 'Brian Quinlan (brian@sweetapp.com)' 47 48import os 49from concurrent.futures import _base 50import queue 51import multiprocessing as mp 52import multiprocessing.connection 53from multiprocessing.queues import Queue 54import threading 55import weakref 56from functools import partial 57import itertools 58import sys 59import traceback 60 61 62_threads_wakeups = weakref.WeakKeyDictionary() 63_global_shutdown = False 64 65 66class _ThreadWakeup: 67 def __init__(self): 68 self._closed = False 69 self._reader, self._writer = mp.Pipe(duplex=False) 70 71 def close(self): 72 if not self._closed: 73 self._closed = True 74 self._writer.close() 75 self._reader.close() 76 77 def wakeup(self): 78 if not self._closed: 79 self._writer.send_bytes(b"") 80 81 def clear(self): 82 if not self._closed: 83 while self._reader.poll(): 84 self._reader.recv_bytes() 85 86 87def _python_exit(): 88 global _global_shutdown 89 _global_shutdown = True 90 items = list(_threads_wakeups.items()) 91 for _, thread_wakeup in items: 92 # call not protected by ProcessPoolExecutor._shutdown_lock 93 thread_wakeup.wakeup() 94 for t, _ in items: 95 t.join() 96 97# Register for `_python_exit()` to be called just before joining all 98# non-daemon threads. This is used instead of `atexit.register()` for 99# compatibility with subinterpreters, which no longer support daemon threads. 100# See bpo-39812 for context. 101threading._register_atexit(_python_exit) 102 103# Controls how many more calls than processes will be queued in the call queue. 104# A smaller number will mean that processes spend more time idle waiting for 105# work while a larger number will make Future.cancel() succeed less frequently 106# (Futures in the call queue cannot be cancelled). 107EXTRA_QUEUED_CALLS = 1 108 109 110# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 111# It can wait on, at most, 63 objects. There is an overhead of two objects: 112# - the result queue reader 113# - the thread wakeup reader 114_MAX_WINDOWS_WORKERS = 63 - 2 115 116# Hack to embed stringification of remote traceback in local traceback 117 118class _RemoteTraceback(Exception): 119 def __init__(self, tb): 120 self.tb = tb 121 def __str__(self): 122 return self.tb 123 124class _ExceptionWithTraceback: 125 def __init__(self, exc, tb): 126 tb = traceback.format_exception(type(exc), exc, tb) 127 tb = ''.join(tb) 128 self.exc = exc 129 self.tb = '\n"""\n%s"""' % tb 130 def __reduce__(self): 131 return _rebuild_exc, (self.exc, self.tb) 132 133def _rebuild_exc(exc, tb): 134 exc.__cause__ = _RemoteTraceback(tb) 135 return exc 136 137class _WorkItem(object): 138 def __init__(self, future, fn, args, kwargs): 139 self.future = future 140 self.fn = fn 141 self.args = args 142 self.kwargs = kwargs 143 144class _ResultItem(object): 145 def __init__(self, work_id, exception=None, result=None): 146 self.work_id = work_id 147 self.exception = exception 148 self.result = result 149 150class _CallItem(object): 151 def __init__(self, work_id, fn, args, kwargs): 152 self.work_id = work_id 153 self.fn = fn 154 self.args = args 155 self.kwargs = kwargs 156 157 158class _SafeQueue(Queue): 159 """Safe Queue set exception to the future object linked to a job""" 160 def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, 161 thread_wakeup): 162 self.pending_work_items = pending_work_items 163 self.shutdown_lock = shutdown_lock 164 self.thread_wakeup = thread_wakeup 165 super().__init__(max_size, ctx=ctx) 166 167 def _on_queue_feeder_error(self, e, obj): 168 if isinstance(obj, _CallItem): 169 tb = traceback.format_exception(type(e), e, e.__traceback__) 170 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 171 work_item = self.pending_work_items.pop(obj.work_id, None) 172 with self.shutdown_lock: 173 self.thread_wakeup.wakeup() 174 # work_item can be None if another process terminated. In this 175 # case, the executor_manager_thread fails all work_items 176 # with BrokenProcessPool 177 if work_item is not None: 178 work_item.future.set_exception(e) 179 else: 180 super()._on_queue_feeder_error(e, obj) 181 182 183def _get_chunks(*iterables, chunksize): 184 """ Iterates over zip()ed iterables in chunks. """ 185 it = zip(*iterables) 186 while True: 187 chunk = tuple(itertools.islice(it, chunksize)) 188 if not chunk: 189 return 190 yield chunk 191 192 193def _process_chunk(fn, chunk): 194 """ Processes a chunk of an iterable passed to map. 195 196 Runs the function passed to map() on a chunk of the 197 iterable passed to map. 198 199 This function is run in a separate process. 200 201 """ 202 return [fn(*args) for args in chunk] 203 204 205def _sendback_result(result_queue, work_id, result=None, exception=None): 206 """Safely send back the given result or exception""" 207 try: 208 result_queue.put(_ResultItem(work_id, result=result, 209 exception=exception)) 210 except BaseException as e: 211 exc = _ExceptionWithTraceback(e, e.__traceback__) 212 result_queue.put(_ResultItem(work_id, exception=exc)) 213 214 215def _process_worker(call_queue, result_queue, initializer, initargs): 216 """Evaluates calls from call_queue and places the results in result_queue. 217 218 This worker is run in a separate process. 219 220 Args: 221 call_queue: A ctx.Queue of _CallItems that will be read and 222 evaluated by the worker. 223 result_queue: A ctx.Queue of _ResultItems that will written 224 to by the worker. 225 initializer: A callable initializer, or None 226 initargs: A tuple of args for the initializer 227 """ 228 if initializer is not None: 229 try: 230 initializer(*initargs) 231 except BaseException: 232 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 233 # The parent will notice that the process stopped and 234 # mark the pool broken 235 return 236 while True: 237 call_item = call_queue.get(block=True) 238 if call_item is None: 239 # Wake up queue management thread 240 result_queue.put(os.getpid()) 241 return 242 try: 243 r = call_item.fn(*call_item.args, **call_item.kwargs) 244 except BaseException as e: 245 exc = _ExceptionWithTraceback(e, e.__traceback__) 246 _sendback_result(result_queue, call_item.work_id, exception=exc) 247 else: 248 _sendback_result(result_queue, call_item.work_id, result=r) 249 del r 250 251 # Liberate the resource as soon as possible, to avoid holding onto 252 # open files or shared memory that is not needed anymore 253 del call_item 254 255 256class _ExecutorManagerThread(threading.Thread): 257 """Manages the communication between this process and the worker processes. 258 259 The manager is run in a local thread. 260 261 Args: 262 executor: A reference to the ProcessPoolExecutor that owns 263 this thread. A weakref will be own by the manager as well as 264 references to internal objects used to introspect the state of 265 the executor. 266 """ 267 268 def __init__(self, executor): 269 # Store references to necessary internals of the executor. 270 271 # A _ThreadWakeup to allow waking up the queue_manager_thread from the 272 # main Thread and avoid deadlocks caused by permanently locked queues. 273 self.thread_wakeup = executor._executor_manager_thread_wakeup 274 self.shutdown_lock = executor._shutdown_lock 275 276 # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 277 # to determine if the ProcessPoolExecutor has been garbage collected 278 # and that the manager can exit. 279 # When the executor gets garbage collected, the weakref callback 280 # will wake up the queue management thread so that it can terminate 281 # if there is no pending work item. 282 def weakref_cb(_, 283 thread_wakeup=self.thread_wakeup, 284 shutdown_lock=self.shutdown_lock): 285 mp.util.debug('Executor collected: triggering callback for' 286 ' QueueManager wakeup') 287 with shutdown_lock: 288 thread_wakeup.wakeup() 289 290 self.executor_reference = weakref.ref(executor, weakref_cb) 291 292 # A list of the ctx.Process instances used as workers. 293 self.processes = executor._processes 294 295 # A ctx.Queue that will be filled with _CallItems derived from 296 # _WorkItems for processing by the process workers. 297 self.call_queue = executor._call_queue 298 299 # A ctx.SimpleQueue of _ResultItems generated by the process workers. 300 self.result_queue = executor._result_queue 301 302 # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 303 self.work_ids_queue = executor._work_ids 304 305 # A dict mapping work ids to _WorkItems e.g. 306 # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 307 self.pending_work_items = executor._pending_work_items 308 309 super().__init__() 310 311 def run(self): 312 # Main loop for the executor manager thread. 313 314 while True: 315 self.add_call_item_to_queue() 316 317 result_item, is_broken, cause = self.wait_result_broken_or_wakeup() 318 319 if is_broken: 320 self.terminate_broken(cause) 321 return 322 if result_item is not None: 323 self.process_result_item(result_item) 324 # Delete reference to result_item to avoid keeping references 325 # while waiting on new results. 326 del result_item 327 328 # attempt to increment idle process count 329 executor = self.executor_reference() 330 if executor is not None: 331 executor._idle_worker_semaphore.release() 332 del executor 333 334 if self.is_shutting_down(): 335 self.flag_executor_shutting_down() 336 337 # Since no new work items can be added, it is safe to shutdown 338 # this thread if there are no pending work items. 339 if not self.pending_work_items: 340 self.join_executor_internals() 341 return 342 343 def add_call_item_to_queue(self): 344 # Fills call_queue with _WorkItems from pending_work_items. 345 # This function never blocks. 346 while True: 347 if self.call_queue.full(): 348 return 349 try: 350 work_id = self.work_ids_queue.get(block=False) 351 except queue.Empty: 352 return 353 else: 354 work_item = self.pending_work_items[work_id] 355 356 if work_item.future.set_running_or_notify_cancel(): 357 self.call_queue.put(_CallItem(work_id, 358 work_item.fn, 359 work_item.args, 360 work_item.kwargs), 361 block=True) 362 else: 363 del self.pending_work_items[work_id] 364 continue 365 366 def wait_result_broken_or_wakeup(self): 367 # Wait for a result to be ready in the result_queue while checking 368 # that all worker processes are still running, or for a wake up 369 # signal send. The wake up signals come either from new tasks being 370 # submitted, from the executor being shutdown/gc-ed, or from the 371 # shutdown of the python interpreter. 372 result_reader = self.result_queue._reader 373 assert not self.thread_wakeup._closed 374 wakeup_reader = self.thread_wakeup._reader 375 readers = [result_reader, wakeup_reader] 376 worker_sentinels = [p.sentinel for p in list(self.processes.values())] 377 ready = mp.connection.wait(readers + worker_sentinels) 378 379 cause = None 380 is_broken = True 381 result_item = None 382 if result_reader in ready: 383 try: 384 result_item = result_reader.recv() 385 is_broken = False 386 except BaseException as e: 387 cause = traceback.format_exception(type(e), e, e.__traceback__) 388 389 elif wakeup_reader in ready: 390 is_broken = False 391 392 with self.shutdown_lock: 393 self.thread_wakeup.clear() 394 395 return result_item, is_broken, cause 396 397 def process_result_item(self, result_item): 398 # Process the received a result_item. This can be either the PID of a 399 # worker that exited gracefully or a _ResultItem 400 401 if isinstance(result_item, int): 402 # Clean shutdown of a worker using its PID 403 # (avoids marking the executor broken) 404 assert self.is_shutting_down() 405 p = self.processes.pop(result_item) 406 p.join() 407 if not self.processes: 408 self.join_executor_internals() 409 return 410 else: 411 # Received a _ResultItem so mark the future as completed. 412 work_item = self.pending_work_items.pop(result_item.work_id, None) 413 # work_item can be None if another process terminated (see above) 414 if work_item is not None: 415 if result_item.exception: 416 work_item.future.set_exception(result_item.exception) 417 else: 418 work_item.future.set_result(result_item.result) 419 420 def is_shutting_down(self): 421 # Check whether we should start shutting down the executor. 422 executor = self.executor_reference() 423 # No more work items can be added if: 424 # - The interpreter is shutting down OR 425 # - The executor that owns this worker has been collected OR 426 # - The executor that owns this worker has been shutdown. 427 return (_global_shutdown or executor is None 428 or executor._shutdown_thread) 429 430 def terminate_broken(self, cause): 431 # Terminate the executor because it is in a broken state. The cause 432 # argument can be used to display more information on the error that 433 # lead the executor into becoming broken. 434 435 # Mark the process pool broken so that submits fail right now. 436 executor = self.executor_reference() 437 if executor is not None: 438 executor._broken = ('A child process terminated ' 439 'abruptly, the process pool is not ' 440 'usable anymore') 441 executor._shutdown_thread = True 442 executor = None 443 444 # All pending tasks are to be marked failed with the following 445 # BrokenProcessPool error 446 bpe = BrokenProcessPool("A process in the process pool was " 447 "terminated abruptly while the future was " 448 "running or pending.") 449 if cause is not None: 450 bpe.__cause__ = _RemoteTraceback( 451 f"\n'''\n{''.join(cause)}'''") 452 453 # Mark pending tasks as failed. 454 for work_id, work_item in self.pending_work_items.items(): 455 work_item.future.set_exception(bpe) 456 # Delete references to object. See issue16284 457 del work_item 458 self.pending_work_items.clear() 459 460 # Terminate remaining workers forcibly: the queues or their 461 # locks may be in a dirty state and block forever. 462 for p in self.processes.values(): 463 p.terminate() 464 465 # clean up resources 466 self.join_executor_internals() 467 468 def flag_executor_shutting_down(self): 469 # Flag the executor as shutting down and cancel remaining tasks if 470 # requested as early as possible if it is not gc-ed yet. 471 executor = self.executor_reference() 472 if executor is not None: 473 executor._shutdown_thread = True 474 # Cancel pending work items if requested. 475 if executor._cancel_pending_futures: 476 # Cancel all pending futures and update pending_work_items 477 # to only have futures that are currently running. 478 new_pending_work_items = {} 479 for work_id, work_item in self.pending_work_items.items(): 480 if not work_item.future.cancel(): 481 new_pending_work_items[work_id] = work_item 482 self.pending_work_items = new_pending_work_items 483 # Drain work_ids_queue since we no longer need to 484 # add items to the call queue. 485 while True: 486 try: 487 self.work_ids_queue.get_nowait() 488 except queue.Empty: 489 break 490 # Make sure we do this only once to not waste time looping 491 # on running processes over and over. 492 executor._cancel_pending_futures = False 493 494 def shutdown_workers(self): 495 n_children_to_stop = self.get_n_children_alive() 496 n_sentinels_sent = 0 497 # Send the right number of sentinels, to make sure all children are 498 # properly terminated. 499 while (n_sentinels_sent < n_children_to_stop 500 and self.get_n_children_alive() > 0): 501 for i in range(n_children_to_stop - n_sentinels_sent): 502 try: 503 self.call_queue.put_nowait(None) 504 n_sentinels_sent += 1 505 except queue.Full: 506 break 507 508 def join_executor_internals(self): 509 self.shutdown_workers() 510 # Release the queue's resources as soon as possible. 511 self.call_queue.close() 512 self.call_queue.join_thread() 513 with self.shutdown_lock: 514 self.thread_wakeup.close() 515 # If .join() is not called on the created processes then 516 # some ctx.Queue methods may deadlock on Mac OS X. 517 for p in self.processes.values(): 518 p.join() 519 520 def get_n_children_alive(self): 521 # This is an upper bound on the number of children alive. 522 return sum(p.is_alive() for p in self.processes.values()) 523 524 525_system_limits_checked = False 526_system_limited = None 527 528 529def _check_system_limits(): 530 global _system_limits_checked, _system_limited 531 if _system_limits_checked: 532 if _system_limited: 533 raise NotImplementedError(_system_limited) 534 _system_limits_checked = True 535 try: 536 import multiprocessing.synchronize 537 except ImportError: 538 _system_limited = ( 539 "This Python build lacks multiprocessing.synchronize, usually due " 540 "to named semaphores being unavailable on this platform." 541 ) 542 raise NotImplementedError(_system_limited) 543 try: 544 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 545 except (AttributeError, ValueError): 546 # sysconf not available or setting not available 547 return 548 if nsems_max == -1: 549 # indetermined limit, assume that limit is determined 550 # by available memory only 551 return 552 if nsems_max >= 256: 553 # minimum number of semaphores available 554 # according to POSIX 555 return 556 _system_limited = ("system provides too few semaphores (%d" 557 " available, 256 necessary)" % nsems_max) 558 raise NotImplementedError(_system_limited) 559 560 561def _chain_from_iterable_of_lists(iterable): 562 """ 563 Specialized implementation of itertools.chain.from_iterable. 564 Each item in *iterable* should be a list. This function is 565 careful not to keep references to yielded objects. 566 """ 567 for element in iterable: 568 element.reverse() 569 while element: 570 yield element.pop() 571 572 573class BrokenProcessPool(_base.BrokenExecutor): 574 """ 575 Raised when a process in a ProcessPoolExecutor terminated abruptly 576 while a future was in the running state. 577 """ 578 579 580class ProcessPoolExecutor(_base.Executor): 581 def __init__(self, max_workers=None, mp_context=None, 582 initializer=None, initargs=()): 583 """Initializes a new ProcessPoolExecutor instance. 584 585 Args: 586 max_workers: The maximum number of processes that can be used to 587 execute the given calls. If None or not given then as many 588 worker processes will be created as the machine has processors. 589 mp_context: A multiprocessing context to launch the workers. This 590 object should provide SimpleQueue, Queue and Process. 591 initializer: A callable used to initialize worker processes. 592 initargs: A tuple of arguments to pass to the initializer. 593 """ 594 _check_system_limits() 595 596 if max_workers is None: 597 self._max_workers = os.cpu_count() or 1 598 if sys.platform == 'win32': 599 self._max_workers = min(_MAX_WINDOWS_WORKERS, 600 self._max_workers) 601 else: 602 if max_workers <= 0: 603 raise ValueError("max_workers must be greater than 0") 604 elif (sys.platform == 'win32' and 605 max_workers > _MAX_WINDOWS_WORKERS): 606 raise ValueError( 607 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 608 609 self._max_workers = max_workers 610 611 if mp_context is None: 612 mp_context = mp.get_context() 613 self._mp_context = mp_context 614 615 if initializer is not None and not callable(initializer): 616 raise TypeError("initializer must be a callable") 617 self._initializer = initializer 618 self._initargs = initargs 619 620 # Management thread 621 self._executor_manager_thread = None 622 623 # Map of pids to processes 624 self._processes = {} 625 626 # Shutdown is a two-step process. 627 self._shutdown_thread = False 628 self._shutdown_lock = threading.Lock() 629 self._idle_worker_semaphore = threading.Semaphore(0) 630 self._broken = False 631 self._queue_count = 0 632 self._pending_work_items = {} 633 self._cancel_pending_futures = False 634 635 # _ThreadWakeup is a communication channel used to interrupt the wait 636 # of the main loop of executor_manager_thread from another thread (e.g. 637 # when calling executor.submit or executor.shutdown). We do not use the 638 # _result_queue to send wakeup signals to the executor_manager_thread 639 # as it could result in a deadlock if a worker process dies with the 640 # _result_queue write lock still acquired. 641 # 642 # _shutdown_lock must be locked to access _ThreadWakeup. 643 self._executor_manager_thread_wakeup = _ThreadWakeup() 644 645 # Create communication channels for the executor 646 # Make the call queue slightly larger than the number of processes to 647 # prevent the worker processes from idling. But don't make it too big 648 # because futures in the call queue cannot be cancelled. 649 queue_size = self._max_workers + EXTRA_QUEUED_CALLS 650 self._call_queue = _SafeQueue( 651 max_size=queue_size, ctx=self._mp_context, 652 pending_work_items=self._pending_work_items, 653 shutdown_lock=self._shutdown_lock, 654 thread_wakeup=self._executor_manager_thread_wakeup) 655 # Killed worker processes can produce spurious "broken pipe" 656 # tracebacks in the queue's own worker thread. But we detect killed 657 # processes anyway, so silence the tracebacks. 658 self._call_queue._ignore_epipe = True 659 self._result_queue = mp_context.SimpleQueue() 660 self._work_ids = queue.Queue() 661 662 def _start_executor_manager_thread(self): 663 if self._executor_manager_thread is None: 664 # Start the processes so that their sentinels are known. 665 self._executor_manager_thread = _ExecutorManagerThread(self) 666 self._executor_manager_thread.start() 667 _threads_wakeups[self._executor_manager_thread] = \ 668 self._executor_manager_thread_wakeup 669 670 def _adjust_process_count(self): 671 # if there's an idle process, we don't need to spawn a new one. 672 if self._idle_worker_semaphore.acquire(blocking=False): 673 return 674 675 process_count = len(self._processes) 676 if process_count < self._max_workers: 677 p = self._mp_context.Process( 678 target=_process_worker, 679 args=(self._call_queue, 680 self._result_queue, 681 self._initializer, 682 self._initargs)) 683 p.start() 684 self._processes[p.pid] = p 685 686 def submit(self, fn, /, *args, **kwargs): 687 with self._shutdown_lock: 688 if self._broken: 689 raise BrokenProcessPool(self._broken) 690 if self._shutdown_thread: 691 raise RuntimeError('cannot schedule new futures after shutdown') 692 if _global_shutdown: 693 raise RuntimeError('cannot schedule new futures after ' 694 'interpreter shutdown') 695 696 f = _base.Future() 697 w = _WorkItem(f, fn, args, kwargs) 698 699 self._pending_work_items[self._queue_count] = w 700 self._work_ids.put(self._queue_count) 701 self._queue_count += 1 702 # Wake up queue management thread 703 self._executor_manager_thread_wakeup.wakeup() 704 705 self._adjust_process_count() 706 self._start_executor_manager_thread() 707 return f 708 submit.__doc__ = _base.Executor.submit.__doc__ 709 710 def map(self, fn, *iterables, timeout=None, chunksize=1): 711 """Returns an iterator equivalent to map(fn, iter). 712 713 Args: 714 fn: A callable that will take as many arguments as there are 715 passed iterables. 716 timeout: The maximum number of seconds to wait. If None, then there 717 is no limit on the wait time. 718 chunksize: If greater than one, the iterables will be chopped into 719 chunks of size chunksize and submitted to the process pool. 720 If set to one, the items in the list will be sent one at a time. 721 722 Returns: 723 An iterator equivalent to: map(func, *iterables) but the calls may 724 be evaluated out-of-order. 725 726 Raises: 727 TimeoutError: If the entire result iterator could not be generated 728 before the given timeout. 729 Exception: If fn(*args) raises for any values. 730 """ 731 if chunksize < 1: 732 raise ValueError("chunksize must be >= 1.") 733 734 results = super().map(partial(_process_chunk, fn), 735 _get_chunks(*iterables, chunksize=chunksize), 736 timeout=timeout) 737 return _chain_from_iterable_of_lists(results) 738 739 def shutdown(self, wait=True, *, cancel_futures=False): 740 with self._shutdown_lock: 741 self._cancel_pending_futures = cancel_futures 742 self._shutdown_thread = True 743 if self._executor_manager_thread_wakeup is not None: 744 # Wake up queue management thread 745 self._executor_manager_thread_wakeup.wakeup() 746 747 if self._executor_manager_thread is not None and wait: 748 self._executor_manager_thread.join() 749 # To reduce the risk of opening too many files, remove references to 750 # objects that use file descriptors. 751 self._executor_manager_thread = None 752 self._call_queue = None 753 if self._result_queue is not None and wait: 754 self._result_queue.close() 755 self._result_queue = None 756 self._processes = None 757 self._executor_manager_thread_wakeup = None 758 759 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 760