1# Copyright 2009 Brian Quinlan. All Rights Reserved. 2# Licensed to PSF under a Contributor Agreement. 3 4"""Implements ProcessPoolExecutor. 5 6The following diagram and text describe the data-flow through the system: 7 8|======================= In-process =====================|== Out-of-process ==| 9 10+----------+ +----------+ +--------+ +-----------+ +---------+ 11| | => | Work Ids | | | | Call Q | | Process | 12| | +----------+ | | +-----------+ | Pool | 13| | | ... | | | | ... | +---------+ 14| | | 6 | => | | => | 5, call() | => | | 15| | | 7 | | | | ... | | | 16| Process | | ... | | Local | +-----------+ | Process | 17| Pool | +----------+ | Worker | | #1..n | 18| Executor | | Thread | | | 19| | +----------- + | | +-----------+ | | 20| | <=> | Work Items | <=> | | <= | Result Q | <= | | 21| | +------------+ | | +-----------+ | | 22| | | 6: call() | | | | ... | | | 23| | | future | | | | 4, result | | | 24| | | ... | | | | 3, except | | | 25+----------+ +------------+ +--------+ +-----------+ +---------+ 26 27Executor.submit() called: 28- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 29- adds the id of the _WorkItem to the "Work Ids" queue 30 31Local worker thread: 32- reads work ids from the "Work Ids" queue and looks up the corresponding 33 WorkItem from the "Work Items" dict: if the work item has been cancelled then 34 it is simply removed from the dict, otherwise it is repackaged as a 35 _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 36 until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 37 calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 38- reads _ResultItems from "Result Q", updates the future stored in the 39 "Work Items" dict and deletes the dict entry 40 41Process #1..n: 42- reads _CallItems from "Call Q", executes the calls, and puts the resulting 43 _ResultItems in "Result Q" 44""" 45 46__author__ = 'Brian Quinlan (brian@sweetapp.com)' 47 48import atexit 49import os 50from concurrent.futures import _base 51import queue 52from queue import Full 53import multiprocessing as mp 54import multiprocessing.connection 55from multiprocessing.queues import Queue 56import threading 57import weakref 58from functools import partial 59import itertools 60import sys 61import traceback 62 63# Workers are created as daemon threads and processes. This is done to allow the 64# interpreter to exit when there are still idle processes in a 65# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, 66# allowing workers to die with the interpreter has two undesirable properties: 67# - The workers would still be running during interpreter shutdown, 68# meaning that they would fail in unpredictable ways. 69# - The workers could be killed while evaluating a work item, which could 70# be bad if the callable being evaluated has external side-effects e.g. 71# writing to a file. 72# 73# To work around this problem, an exit handler is installed which tells the 74# workers to exit when their work queues are empty and then waits until the 75# threads/processes finish. 76 77_threads_wakeups = weakref.WeakKeyDictionary() 78_global_shutdown = False 79 80 81class _ThreadWakeup: 82 def __init__(self): 83 self._reader, self._writer = mp.Pipe(duplex=False) 84 85 def close(self): 86 self._writer.close() 87 self._reader.close() 88 89 def wakeup(self): 90 self._writer.send_bytes(b"") 91 92 def clear(self): 93 while self._reader.poll(): 94 self._reader.recv_bytes() 95 96 97def _python_exit(): 98 global _global_shutdown 99 _global_shutdown = True 100 items = list(_threads_wakeups.items()) 101 for _, thread_wakeup in items: 102 thread_wakeup.wakeup() 103 for t, _ in items: 104 t.join() 105 106# Controls how many more calls than processes will be queued in the call queue. 107# A smaller number will mean that processes spend more time idle waiting for 108# work while a larger number will make Future.cancel() succeed less frequently 109# (Futures in the call queue cannot be cancelled). 110EXTRA_QUEUED_CALLS = 1 111 112 113# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 114# It can wait on, at most, 63 objects. There is an overhead of two objects: 115# - the result queue reader 116# - the thread wakeup reader 117_MAX_WINDOWS_WORKERS = 63 - 2 118 119# Hack to embed stringification of remote traceback in local traceback 120 121class _RemoteTraceback(Exception): 122 def __init__(self, tb): 123 self.tb = tb 124 def __str__(self): 125 return self.tb 126 127class _ExceptionWithTraceback: 128 def __init__(self, exc, tb): 129 tb = traceback.format_exception(type(exc), exc, tb) 130 tb = ''.join(tb) 131 self.exc = exc 132 self.tb = '\n"""\n%s"""' % tb 133 def __reduce__(self): 134 return _rebuild_exc, (self.exc, self.tb) 135 136def _rebuild_exc(exc, tb): 137 exc.__cause__ = _RemoteTraceback(tb) 138 return exc 139 140class _WorkItem(object): 141 def __init__(self, future, fn, args, kwargs): 142 self.future = future 143 self.fn = fn 144 self.args = args 145 self.kwargs = kwargs 146 147class _ResultItem(object): 148 def __init__(self, work_id, exception=None, result=None): 149 self.work_id = work_id 150 self.exception = exception 151 self.result = result 152 153class _CallItem(object): 154 def __init__(self, work_id, fn, args, kwargs): 155 self.work_id = work_id 156 self.fn = fn 157 self.args = args 158 self.kwargs = kwargs 159 160 161class _SafeQueue(Queue): 162 """Safe Queue set exception to the future object linked to a job""" 163 def __init__(self, max_size=0, *, ctx, pending_work_items): 164 self.pending_work_items = pending_work_items 165 super().__init__(max_size, ctx=ctx) 166 167 def _on_queue_feeder_error(self, e, obj): 168 if isinstance(obj, _CallItem): 169 tb = traceback.format_exception(type(e), e, e.__traceback__) 170 e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 171 work_item = self.pending_work_items.pop(obj.work_id, None) 172 # work_item can be None if another process terminated. In this case, 173 # the queue_manager_thread fails all work_items with BrokenProcessPool 174 if work_item is not None: 175 work_item.future.set_exception(e) 176 else: 177 super()._on_queue_feeder_error(e, obj) 178 179 180def _get_chunks(*iterables, chunksize): 181 """ Iterates over zip()ed iterables in chunks. """ 182 it = zip(*iterables) 183 while True: 184 chunk = tuple(itertools.islice(it, chunksize)) 185 if not chunk: 186 return 187 yield chunk 188 189def _process_chunk(fn, chunk): 190 """ Processes a chunk of an iterable passed to map. 191 192 Runs the function passed to map() on a chunk of the 193 iterable passed to map. 194 195 This function is run in a separate process. 196 197 """ 198 return [fn(*args) for args in chunk] 199 200 201def _sendback_result(result_queue, work_id, result=None, exception=None): 202 """Safely send back the given result or exception""" 203 try: 204 result_queue.put(_ResultItem(work_id, result=result, 205 exception=exception)) 206 except BaseException as e: 207 exc = _ExceptionWithTraceback(e, e.__traceback__) 208 result_queue.put(_ResultItem(work_id, exception=exc)) 209 210 211def _process_worker(call_queue, result_queue, initializer, initargs): 212 """Evaluates calls from call_queue and places the results in result_queue. 213 214 This worker is run in a separate process. 215 216 Args: 217 call_queue: A ctx.Queue of _CallItems that will be read and 218 evaluated by the worker. 219 result_queue: A ctx.Queue of _ResultItems that will written 220 to by the worker. 221 initializer: A callable initializer, or None 222 initargs: A tuple of args for the initializer 223 """ 224 if initializer is not None: 225 try: 226 initializer(*initargs) 227 except BaseException: 228 _base.LOGGER.critical('Exception in initializer:', exc_info=True) 229 # The parent will notice that the process stopped and 230 # mark the pool broken 231 return 232 while True: 233 call_item = call_queue.get(block=True) 234 if call_item is None: 235 # Wake up queue management thread 236 result_queue.put(os.getpid()) 237 return 238 try: 239 r = call_item.fn(*call_item.args, **call_item.kwargs) 240 except BaseException as e: 241 exc = _ExceptionWithTraceback(e, e.__traceback__) 242 _sendback_result(result_queue, call_item.work_id, exception=exc) 243 else: 244 _sendback_result(result_queue, call_item.work_id, result=r) 245 del r 246 247 # Liberate the resource as soon as possible, to avoid holding onto 248 # open files or shared memory that is not needed anymore 249 del call_item 250 251 252def _add_call_item_to_queue(pending_work_items, 253 work_ids, 254 call_queue): 255 """Fills call_queue with _WorkItems from pending_work_items. 256 257 This function never blocks. 258 259 Args: 260 pending_work_items: A dict mapping work ids to _WorkItems e.g. 261 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 262 work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids 263 are consumed and the corresponding _WorkItems from 264 pending_work_items are transformed into _CallItems and put in 265 call_queue. 266 call_queue: A multiprocessing.Queue that will be filled with _CallItems 267 derived from _WorkItems. 268 """ 269 while True: 270 if call_queue.full(): 271 return 272 try: 273 work_id = work_ids.get(block=False) 274 except queue.Empty: 275 return 276 else: 277 work_item = pending_work_items[work_id] 278 279 if work_item.future.set_running_or_notify_cancel(): 280 call_queue.put(_CallItem(work_id, 281 work_item.fn, 282 work_item.args, 283 work_item.kwargs), 284 block=True) 285 else: 286 del pending_work_items[work_id] 287 continue 288 289 290def _queue_management_worker(executor_reference, 291 processes, 292 pending_work_items, 293 work_ids_queue, 294 call_queue, 295 result_queue, 296 thread_wakeup): 297 """Manages the communication between this process and the worker processes. 298 299 This function is run in a local thread. 300 301 Args: 302 executor_reference: A weakref.ref to the ProcessPoolExecutor that owns 303 this thread. Used to determine if the ProcessPoolExecutor has been 304 garbage collected and that this function can exit. 305 process: A list of the ctx.Process instances used as 306 workers. 307 pending_work_items: A dict mapping work ids to _WorkItems e.g. 308 {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 309 work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). 310 call_queue: A ctx.Queue that will be filled with _CallItems 311 derived from _WorkItems for processing by the process workers. 312 result_queue: A ctx.SimpleQueue of _ResultItems generated by the 313 process workers. 314 thread_wakeup: A _ThreadWakeup to allow waking up the 315 queue_manager_thread from the main Thread and avoid deadlocks 316 caused by permanently locked queues. 317 """ 318 executor = None 319 320 def shutting_down(): 321 return (_global_shutdown or executor is None 322 or executor._shutdown_thread) 323 324 def shutdown_worker(): 325 # This is an upper bound on the number of children alive. 326 n_children_alive = sum(p.is_alive() for p in processes.values()) 327 n_children_to_stop = n_children_alive 328 n_sentinels_sent = 0 329 # Send the right number of sentinels, to make sure all children are 330 # properly terminated. 331 while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: 332 for i in range(n_children_to_stop - n_sentinels_sent): 333 try: 334 call_queue.put_nowait(None) 335 n_sentinels_sent += 1 336 except Full: 337 break 338 n_children_alive = sum(p.is_alive() for p in processes.values()) 339 340 # Release the queue's resources as soon as possible. 341 call_queue.close() 342 # If .join() is not called on the created processes then 343 # some ctx.Queue methods may deadlock on Mac OS X. 344 for p in processes.values(): 345 p.join() 346 347 result_reader = result_queue._reader 348 wakeup_reader = thread_wakeup._reader 349 readers = [result_reader, wakeup_reader] 350 351 while True: 352 _add_call_item_to_queue(pending_work_items, 353 work_ids_queue, 354 call_queue) 355 356 # Wait for a result to be ready in the result_queue while checking 357 # that all worker processes are still running, or for a wake up 358 # signal send. The wake up signals come either from new tasks being 359 # submitted, from the executor being shutdown/gc-ed, or from the 360 # shutdown of the python interpreter. 361 worker_sentinels = [p.sentinel for p in processes.values()] 362 ready = mp.connection.wait(readers + worker_sentinels) 363 364 cause = None 365 is_broken = True 366 if result_reader in ready: 367 try: 368 result_item = result_reader.recv() 369 is_broken = False 370 except BaseException as e: 371 cause = traceback.format_exception(type(e), e, e.__traceback__) 372 373 elif wakeup_reader in ready: 374 is_broken = False 375 result_item = None 376 thread_wakeup.clear() 377 if is_broken: 378 # Mark the process pool broken so that submits fail right now. 379 executor = executor_reference() 380 if executor is not None: 381 executor._broken = ('A child process terminated ' 382 'abruptly, the process pool is not ' 383 'usable anymore') 384 executor._shutdown_thread = True 385 executor = None 386 bpe = BrokenProcessPool("A process in the process pool was " 387 "terminated abruptly while the future was " 388 "running or pending.") 389 if cause is not None: 390 bpe.__cause__ = _RemoteTraceback( 391 f"\n'''\n{''.join(cause)}'''") 392 # All futures in flight must be marked failed 393 for work_id, work_item in pending_work_items.items(): 394 work_item.future.set_exception(bpe) 395 # Delete references to object. See issue16284 396 del work_item 397 pending_work_items.clear() 398 # Terminate remaining workers forcibly: the queues or their 399 # locks may be in a dirty state and block forever. 400 for p in processes.values(): 401 p.terminate() 402 shutdown_worker() 403 return 404 if isinstance(result_item, int): 405 # Clean shutdown of a worker using its PID 406 # (avoids marking the executor broken) 407 assert shutting_down() 408 p = processes.pop(result_item) 409 p.join() 410 if not processes: 411 shutdown_worker() 412 return 413 elif result_item is not None: 414 work_item = pending_work_items.pop(result_item.work_id, None) 415 # work_item can be None if another process terminated (see above) 416 if work_item is not None: 417 if result_item.exception: 418 work_item.future.set_exception(result_item.exception) 419 else: 420 work_item.future.set_result(result_item.result) 421 # Delete references to object. See issue16284 422 del work_item 423 # Delete reference to result_item 424 del result_item 425 426 # Check whether we should start shutting down. 427 executor = executor_reference() 428 # No more work items can be added if: 429 # - The interpreter is shutting down OR 430 # - The executor that owns this worker has been collected OR 431 # - The executor that owns this worker has been shutdown. 432 if shutting_down(): 433 try: 434 # Flag the executor as shutting down as early as possible if it 435 # is not gc-ed yet. 436 if executor is not None: 437 executor._shutdown_thread = True 438 # Since no new work items can be added, it is safe to shutdown 439 # this thread if there are no pending work items. 440 if not pending_work_items: 441 shutdown_worker() 442 return 443 except Full: 444 # This is not a problem: we will eventually be woken up (in 445 # result_queue.get()) and be able to send a sentinel again. 446 pass 447 executor = None 448 449 450_system_limits_checked = False 451_system_limited = None 452 453 454def _check_system_limits(): 455 global _system_limits_checked, _system_limited 456 if _system_limits_checked: 457 if _system_limited: 458 raise NotImplementedError(_system_limited) 459 _system_limits_checked = True 460 try: 461 nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 462 except (AttributeError, ValueError): 463 # sysconf not available or setting not available 464 return 465 if nsems_max == -1: 466 # indetermined limit, assume that limit is determined 467 # by available memory only 468 return 469 if nsems_max >= 256: 470 # minimum number of semaphores available 471 # according to POSIX 472 return 473 _system_limited = ("system provides too few semaphores (%d" 474 " available, 256 necessary)" % nsems_max) 475 raise NotImplementedError(_system_limited) 476 477 478def _chain_from_iterable_of_lists(iterable): 479 """ 480 Specialized implementation of itertools.chain.from_iterable. 481 Each item in *iterable* should be a list. This function is 482 careful not to keep references to yielded objects. 483 """ 484 for element in iterable: 485 element.reverse() 486 while element: 487 yield element.pop() 488 489 490class BrokenProcessPool(_base.BrokenExecutor): 491 """ 492 Raised when a process in a ProcessPoolExecutor terminated abruptly 493 while a future was in the running state. 494 """ 495 496 497class ProcessPoolExecutor(_base.Executor): 498 def __init__(self, max_workers=None, mp_context=None, 499 initializer=None, initargs=()): 500 """Initializes a new ProcessPoolExecutor instance. 501 502 Args: 503 max_workers: The maximum number of processes that can be used to 504 execute the given calls. If None or not given then as many 505 worker processes will be created as the machine has processors. 506 mp_context: A multiprocessing context to launch the workers. This 507 object should provide SimpleQueue, Queue and Process. 508 initializer: A callable used to initialize worker processes. 509 initargs: A tuple of arguments to pass to the initializer. 510 """ 511 _check_system_limits() 512 513 if max_workers is None: 514 self._max_workers = os.cpu_count() or 1 515 if sys.platform == 'win32': 516 self._max_workers = min(_MAX_WINDOWS_WORKERS, 517 self._max_workers) 518 else: 519 if max_workers <= 0: 520 raise ValueError("max_workers must be greater than 0") 521 elif (sys.platform == 'win32' and 522 max_workers > _MAX_WINDOWS_WORKERS): 523 raise ValueError( 524 f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 525 526 self._max_workers = max_workers 527 528 if mp_context is None: 529 mp_context = mp.get_context() 530 self._mp_context = mp_context 531 532 if initializer is not None and not callable(initializer): 533 raise TypeError("initializer must be a callable") 534 self._initializer = initializer 535 self._initargs = initargs 536 537 # Management thread 538 self._queue_management_thread = None 539 540 # Map of pids to processes 541 self._processes = {} 542 543 # Shutdown is a two-step process. 544 self._shutdown_thread = False 545 self._shutdown_lock = threading.Lock() 546 self._broken = False 547 self._queue_count = 0 548 self._pending_work_items = {} 549 550 # Create communication channels for the executor 551 # Make the call queue slightly larger than the number of processes to 552 # prevent the worker processes from idling. But don't make it too big 553 # because futures in the call queue cannot be cancelled. 554 queue_size = self._max_workers + EXTRA_QUEUED_CALLS 555 self._call_queue = _SafeQueue( 556 max_size=queue_size, ctx=self._mp_context, 557 pending_work_items=self._pending_work_items) 558 # Killed worker processes can produce spurious "broken pipe" 559 # tracebacks in the queue's own worker thread. But we detect killed 560 # processes anyway, so silence the tracebacks. 561 self._call_queue._ignore_epipe = True 562 self._result_queue = mp_context.SimpleQueue() 563 self._work_ids = queue.Queue() 564 565 # _ThreadWakeup is a communication channel used to interrupt the wait 566 # of the main loop of queue_manager_thread from another thread (e.g. 567 # when calling executor.submit or executor.shutdown). We do not use the 568 # _result_queue to send the wakeup signal to the queue_manager_thread 569 # as it could result in a deadlock if a worker process dies with the 570 # _result_queue write lock still acquired. 571 self._queue_management_thread_wakeup = _ThreadWakeup() 572 573 def _start_queue_management_thread(self): 574 if self._queue_management_thread is None: 575 # When the executor gets garbarge collected, the weakref callback 576 # will wake up the queue management thread so that it can terminate 577 # if there is no pending work item. 578 def weakref_cb(_, 579 thread_wakeup=self._queue_management_thread_wakeup): 580 mp.util.debug('Executor collected: triggering callback for' 581 ' QueueManager wakeup') 582 thread_wakeup.wakeup() 583 # Start the processes so that their sentinels are known. 584 self._adjust_process_count() 585 self._queue_management_thread = threading.Thread( 586 target=_queue_management_worker, 587 args=(weakref.ref(self, weakref_cb), 588 self._processes, 589 self._pending_work_items, 590 self._work_ids, 591 self._call_queue, 592 self._result_queue, 593 self._queue_management_thread_wakeup), 594 name="QueueManagerThread") 595 self._queue_management_thread.daemon = True 596 self._queue_management_thread.start() 597 _threads_wakeups[self._queue_management_thread] = \ 598 self._queue_management_thread_wakeup 599 600 def _adjust_process_count(self): 601 for _ in range(len(self._processes), self._max_workers): 602 p = self._mp_context.Process( 603 target=_process_worker, 604 args=(self._call_queue, 605 self._result_queue, 606 self._initializer, 607 self._initargs)) 608 p.start() 609 self._processes[p.pid] = p 610 611 def submit(*args, **kwargs): 612 if len(args) >= 2: 613 self, fn, *args = args 614 elif not args: 615 raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object " 616 "needs an argument") 617 elif 'fn' in kwargs: 618 fn = kwargs.pop('fn') 619 self, *args = args 620 import warnings 621 warnings.warn("Passing 'fn' as keyword argument is deprecated", 622 DeprecationWarning, stacklevel=2) 623 else: 624 raise TypeError('submit expected at least 1 positional argument, ' 625 'got %d' % (len(args)-1)) 626 627 with self._shutdown_lock: 628 if self._broken: 629 raise BrokenProcessPool(self._broken) 630 if self._shutdown_thread: 631 raise RuntimeError('cannot schedule new futures after shutdown') 632 if _global_shutdown: 633 raise RuntimeError('cannot schedule new futures after ' 634 'interpreter shutdown') 635 636 f = _base.Future() 637 w = _WorkItem(f, fn, args, kwargs) 638 639 self._pending_work_items[self._queue_count] = w 640 self._work_ids.put(self._queue_count) 641 self._queue_count += 1 642 # Wake up queue management thread 643 self._queue_management_thread_wakeup.wakeup() 644 645 self._start_queue_management_thread() 646 return f 647 submit.__text_signature__ = _base.Executor.submit.__text_signature__ 648 submit.__doc__ = _base.Executor.submit.__doc__ 649 650 def map(self, fn, *iterables, timeout=None, chunksize=1): 651 """Returns an iterator equivalent to map(fn, iter). 652 653 Args: 654 fn: A callable that will take as many arguments as there are 655 passed iterables. 656 timeout: The maximum number of seconds to wait. If None, then there 657 is no limit on the wait time. 658 chunksize: If greater than one, the iterables will be chopped into 659 chunks of size chunksize and submitted to the process pool. 660 If set to one, the items in the list will be sent one at a time. 661 662 Returns: 663 An iterator equivalent to: map(func, *iterables) but the calls may 664 be evaluated out-of-order. 665 666 Raises: 667 TimeoutError: If the entire result iterator could not be generated 668 before the given timeout. 669 Exception: If fn(*args) raises for any values. 670 """ 671 if chunksize < 1: 672 raise ValueError("chunksize must be >= 1.") 673 674 results = super().map(partial(_process_chunk, fn), 675 _get_chunks(*iterables, chunksize=chunksize), 676 timeout=timeout) 677 return _chain_from_iterable_of_lists(results) 678 679 def shutdown(self, wait=True): 680 with self._shutdown_lock: 681 self._shutdown_thread = True 682 if self._queue_management_thread: 683 # Wake up queue management thread 684 self._queue_management_thread_wakeup.wakeup() 685 if wait: 686 self._queue_management_thread.join() 687 # To reduce the risk of opening too many files, remove references to 688 # objects that use file descriptors. 689 self._queue_management_thread = None 690 if self._call_queue is not None: 691 self._call_queue.close() 692 if wait: 693 self._call_queue.join_thread() 694 self._call_queue = None 695 self._result_queue = None 696 self._processes = None 697 698 if self._queue_management_thread_wakeup: 699 self._queue_management_thread_wakeup.close() 700 self._queue_management_thread_wakeup = None 701 702 shutdown.__doc__ = _base.Executor.shutdown.__doc__ 703 704atexit.register(_python_exit) 705