1:mod:`concurrent.futures` --- Launching parallel tasks 2====================================================== 3 4.. module:: concurrent.futures 5 :synopsis: Execute computations concurrently using threads or processes. 6 7.. versionadded:: 3.2 8 9**Source code:** :source:`Lib/concurrent/futures/thread.py` 10and :source:`Lib/concurrent/futures/process.py` 11 12-------------- 13 14The :mod:`concurrent.futures` module provides a high-level interface for 15asynchronously executing callables. 16 17The asynchronous execution can be performed with threads, using 18:class:`ThreadPoolExecutor`, or separate processes, using 19:class:`ProcessPoolExecutor`. Both implement the same interface, which is 20defined by the abstract :class:`Executor` class. 21 22 23Executor Objects 24---------------- 25 26.. class:: Executor 27 28 An abstract class that provides methods to execute calls asynchronously. It 29 should not be used directly, but through its concrete subclasses. 30 31 .. method:: submit(fn, /, *args, **kwargs) 32 33 Schedules the callable, *fn*, to be executed as ``fn(*args, **kwargs)`` 34 and returns a :class:`Future` object representing the execution of the 35 callable. :: 36 37 with ThreadPoolExecutor(max_workers=1) as executor: 38 future = executor.submit(pow, 323, 1235) 39 print(future.result()) 40 41 .. method:: map(func, *iterables, timeout=None, chunksize=1) 42 43 Similar to :func:`map(func, *iterables) <map>` except: 44 45 * the *iterables* are collected immediately rather than lazily; 46 47 * *func* is executed asynchronously and several calls to 48 *func* may be made concurrently. 49 50 The returned iterator raises a :exc:`concurrent.futures.TimeoutError` 51 if :meth:`~iterator.__next__` is called and the result isn't available 52 after *timeout* seconds from the original call to :meth:`Executor.map`. 53 *timeout* can be an int or a float. If *timeout* is not specified or 54 ``None``, there is no limit to the wait time. 55 56 If a *func* call raises an exception, then that exception will be 57 raised when its value is retrieved from the iterator. 58 59 When using :class:`ProcessPoolExecutor`, this method chops *iterables* 60 into a number of chunks which it submits to the pool as separate 61 tasks. The (approximate) size of these chunks can be specified by 62 setting *chunksize* to a positive integer. For very long iterables, 63 using a large value for *chunksize* can significantly improve 64 performance compared to the default size of 1. With 65 :class:`ThreadPoolExecutor`, *chunksize* has no effect. 66 67 .. versionchanged:: 3.5 68 Added the *chunksize* argument. 69 70 .. method:: shutdown(wait=True, *, cancel_futures=False) 71 72 Signal the executor that it should free any resources that it is using 73 when the currently pending futures are done executing. Calls to 74 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will 75 raise :exc:`RuntimeError`. 76 77 If *wait* is ``True`` then this method will not return until all the 78 pending futures are done executing and the resources associated with the 79 executor have been freed. If *wait* is ``False`` then this method will 80 return immediately and the resources associated with the executor will be 81 freed when all pending futures are done executing. Regardless of the 82 value of *wait*, the entire Python program will not exit until all 83 pending futures are done executing. 84 85 If *cancel_futures* is ``True``, this method will cancel all pending 86 futures that the executor has not started running. Any futures that 87 are completed or running won't be cancelled, regardless of the value 88 of *cancel_futures*. 89 90 If both *cancel_futures* and *wait* are ``True``, all futures that the 91 executor has started running will be completed prior to this method 92 returning. The remaining futures are cancelled. 93 94 You can avoid having to call this method explicitly if you use the 95 :keyword:`with` statement, which will shutdown the :class:`Executor` 96 (waiting as if :meth:`Executor.shutdown` were called with *wait* set to 97 ``True``):: 98 99 import shutil 100 with ThreadPoolExecutor(max_workers=4) as e: 101 e.submit(shutil.copy, 'src1.txt', 'dest1.txt') 102 e.submit(shutil.copy, 'src2.txt', 'dest2.txt') 103 e.submit(shutil.copy, 'src3.txt', 'dest3.txt') 104 e.submit(shutil.copy, 'src4.txt', 'dest4.txt') 105 106 .. versionchanged:: 3.9 107 Added *cancel_futures*. 108 109 110ThreadPoolExecutor 111------------------ 112 113:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of 114threads to execute calls asynchronously. 115 116Deadlocks can occur when the callable associated with a :class:`Future` waits on 117the results of another :class:`Future`. For example:: 118 119 import time 120 def wait_on_b(): 121 time.sleep(5) 122 print(b.result()) # b will never complete because it is waiting on a. 123 return 5 124 125 def wait_on_a(): 126 time.sleep(5) 127 print(a.result()) # a will never complete because it is waiting on b. 128 return 6 129 130 131 executor = ThreadPoolExecutor(max_workers=2) 132 a = executor.submit(wait_on_b) 133 b = executor.submit(wait_on_a) 134 135And:: 136 137 def wait_on_future(): 138 f = executor.submit(pow, 5, 2) 139 # This will never complete because there is only one worker thread and 140 # it is executing this function. 141 print(f.result()) 142 143 executor = ThreadPoolExecutor(max_workers=1) 144 executor.submit(wait_on_future) 145 146 147.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()) 148 149 An :class:`Executor` subclass that uses a pool of at most *max_workers* 150 threads to execute calls asynchronously. 151 152 *initializer* is an optional callable that is called at the start of 153 each worker thread; *initargs* is a tuple of arguments passed to the 154 initializer. Should *initializer* raise an exception, all currently 155 pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`, 156 as well as any attempt to submit more jobs to the pool. 157 158 .. versionchanged:: 3.5 159 If *max_workers* is ``None`` or 160 not given, it will default to the number of processors on the machine, 161 multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often 162 used to overlap I/O instead of CPU work and the number of workers 163 should be higher than the number of workers 164 for :class:`ProcessPoolExecutor`. 165 166 .. versionadded:: 3.6 167 The *thread_name_prefix* argument was added to allow users to 168 control the :class:`threading.Thread` names for worker threads created by 169 the pool for easier debugging. 170 171 .. versionchanged:: 3.7 172 Added the *initializer* and *initargs* arguments. 173 174 .. versionchanged:: 3.8 175 Default value of *max_workers* is changed to ``min(32, os.cpu_count() + 4)``. 176 This default value preserves at least 5 workers for I/O bound tasks. 177 It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. 178 And it avoids using very large resources implicitly on many-core machines. 179 180 ThreadPoolExecutor now reuses idle worker threads before starting 181 *max_workers* worker threads too. 182 183 184.. _threadpoolexecutor-example: 185 186ThreadPoolExecutor Example 187~~~~~~~~~~~~~~~~~~~~~~~~~~ 188:: 189 190 import concurrent.futures 191 import urllib.request 192 193 URLS = ['http://www.foxnews.com/', 194 'http://www.cnn.com/', 195 'http://europe.wsj.com/', 196 'http://www.bbc.co.uk/', 197 'http://some-made-up-domain.com/'] 198 199 # Retrieve a single page and report the URL and contents 200 def load_url(url, timeout): 201 with urllib.request.urlopen(url, timeout=timeout) as conn: 202 return conn.read() 203 204 # We can use a with statement to ensure threads are cleaned up promptly 205 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 206 # Start the load operations and mark each future with its URL 207 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} 208 for future in concurrent.futures.as_completed(future_to_url): 209 url = future_to_url[future] 210 try: 211 data = future.result() 212 except Exception as exc: 213 print('%r generated an exception: %s' % (url, exc)) 214 else: 215 print('%r page is %d bytes' % (url, len(data))) 216 217 218ProcessPoolExecutor 219------------------- 220 221The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that 222uses a pool of processes to execute calls asynchronously. 223:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which 224allows it to side-step the :term:`Global Interpreter Lock 225<global interpreter lock>` but also means that 226only picklable objects can be executed and returned. 227 228The ``__main__`` module must be importable by worker subprocesses. This means 229that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. 230 231Calling :class:`Executor` or :class:`Future` methods from a callable submitted 232to a :class:`ProcessPoolExecutor` will result in deadlock. 233 234.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) 235 236 An :class:`Executor` subclass that executes calls asynchronously using a pool 237 of at most *max_workers* processes. If *max_workers* is ``None`` or not 238 given, it will default to the number of processors on the machine. 239 If *max_workers* is less than or equal to ``0``, then a :exc:`ValueError` 240 will be raised. 241 On Windows, *max_workers* must be less than or equal to ``61``. If it is not 242 then :exc:`ValueError` will be raised. If *max_workers* is ``None``, then 243 the default chosen will be at most ``61``, even if more processors are 244 available. 245 *mp_context* can be a multiprocessing context or None. It will be used to 246 launch the workers. If *mp_context* is ``None`` or not given, the default 247 multiprocessing context is used. 248 249 *initializer* is an optional callable that is called at the start of 250 each worker process; *initargs* is a tuple of arguments passed to the 251 initializer. Should *initializer* raise an exception, all currently 252 pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, 253 as well as any attempt to submit more jobs to the pool. 254 255 .. versionchanged:: 3.3 256 When one of the worker processes terminates abruptly, a 257 :exc:`BrokenProcessPool` error is now raised. Previously, behaviour 258 was undefined but operations on the executor or its futures would often 259 freeze or deadlock. 260 261 .. versionchanged:: 3.7 262 The *mp_context* argument was added to allow users to control the 263 start_method for worker processes created by the pool. 264 265 Added the *initializer* and *initargs* arguments. 266 267 268.. _processpoolexecutor-example: 269 270ProcessPoolExecutor Example 271~~~~~~~~~~~~~~~~~~~~~~~~~~~ 272:: 273 274 import concurrent.futures 275 import math 276 277 PRIMES = [ 278 112272535095293, 279 112582705942171, 280 112272535095293, 281 115280095190773, 282 115797848077099, 283 1099726899285419] 284 285 def is_prime(n): 286 if n < 2: 287 return False 288 if n == 2: 289 return True 290 if n % 2 == 0: 291 return False 292 293 sqrt_n = int(math.floor(math.sqrt(n))) 294 for i in range(3, sqrt_n + 1, 2): 295 if n % i == 0: 296 return False 297 return True 298 299 def main(): 300 with concurrent.futures.ProcessPoolExecutor() as executor: 301 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 302 print('%d is prime: %s' % (number, prime)) 303 304 if __name__ == '__main__': 305 main() 306 307 308Future Objects 309-------------- 310 311The :class:`Future` class encapsulates the asynchronous execution of a callable. 312:class:`Future` instances are created by :meth:`Executor.submit`. 313 314.. class:: Future 315 316 Encapsulates the asynchronous execution of a callable. :class:`Future` 317 instances are created by :meth:`Executor.submit` and should not be created 318 directly except for testing. 319 320 .. method:: cancel() 321 322 Attempt to cancel the call. If the call is currently being executed or 323 finished running and cannot be cancelled then the method will return 324 ``False``, otherwise the call will be cancelled and the method will 325 return ``True``. 326 327 .. method:: cancelled() 328 329 Return ``True`` if the call was successfully cancelled. 330 331 .. method:: running() 332 333 Return ``True`` if the call is currently being executed and cannot be 334 cancelled. 335 336 .. method:: done() 337 338 Return ``True`` if the call was successfully cancelled or finished 339 running. 340 341 .. method:: result(timeout=None) 342 343 Return the value returned by the call. If the call hasn't yet completed 344 then this method will wait up to *timeout* seconds. If the call hasn't 345 completed in *timeout* seconds, then a 346 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 347 an int or float. If *timeout* is not specified or ``None``, there is no 348 limit to the wait time. 349 350 If the future is cancelled before completing then :exc:`.CancelledError` 351 will be raised. 352 353 If the call raised an exception, this method will raise the same exception. 354 355 .. method:: exception(timeout=None) 356 357 Return the exception raised by the call. If the call hasn't yet 358 completed then this method will wait up to *timeout* seconds. If the 359 call hasn't completed in *timeout* seconds, then a 360 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 361 an int or float. If *timeout* is not specified or ``None``, there is no 362 limit to the wait time. 363 364 If the future is cancelled before completing then :exc:`.CancelledError` 365 will be raised. 366 367 If the call completed without raising, ``None`` is returned. 368 369 .. method:: add_done_callback(fn) 370 371 Attaches the callable *fn* to the future. *fn* will be called, with the 372 future as its only argument, when the future is cancelled or finishes 373 running. 374 375 Added callables are called in the order that they were added and are 376 always called in a thread belonging to the process that added them. If 377 the callable raises an :exc:`Exception` subclass, it will be logged and 378 ignored. If the callable raises a :exc:`BaseException` subclass, the 379 behavior is undefined. 380 381 If the future has already completed or been cancelled, *fn* will be 382 called immediately. 383 384 The following :class:`Future` methods are meant for use in unit tests and 385 :class:`Executor` implementations. 386 387 .. method:: set_running_or_notify_cancel() 388 389 This method should only be called by :class:`Executor` implementations 390 before executing the work associated with the :class:`Future` and by unit 391 tests. 392 393 If the method returns ``False`` then the :class:`Future` was cancelled, 394 i.e. :meth:`Future.cancel` was called and returned `True`. Any threads 395 waiting on the :class:`Future` completing (i.e. through 396 :func:`as_completed` or :func:`wait`) will be woken up. 397 398 If the method returns ``True`` then the :class:`Future` was not cancelled 399 and has been put in the running state, i.e. calls to 400 :meth:`Future.running` will return `True`. 401 402 This method can only be called once and cannot be called after 403 :meth:`Future.set_result` or :meth:`Future.set_exception` have been 404 called. 405 406 .. method:: set_result(result) 407 408 Sets the result of the work associated with the :class:`Future` to 409 *result*. 410 411 This method should only be used by :class:`Executor` implementations and 412 unit tests. 413 414 .. versionchanged:: 3.8 415 This method raises 416 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 417 already done. 418 419 .. method:: set_exception(exception) 420 421 Sets the result of the work associated with the :class:`Future` to the 422 :class:`Exception` *exception*. 423 424 This method should only be used by :class:`Executor` implementations and 425 unit tests. 426 427 .. versionchanged:: 3.8 428 This method raises 429 :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is 430 already done. 431 432Module Functions 433---------------- 434 435.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) 436 437 Wait for the :class:`Future` instances (possibly created by different 438 :class:`Executor` instances) given by *fs* to complete. Duplicate futures 439 given to *fs* are removed and will be returned only once. Returns a named 440 2-tuple of sets. The first set, named ``done``, contains the futures that 441 completed (finished or cancelled futures) before the wait completed. The 442 second set, named ``not_done``, contains the futures that did not complete 443 (pending or running futures). 444 445 *timeout* can be used to control the maximum number of seconds to wait before 446 returning. *timeout* can be an int or float. If *timeout* is not specified 447 or ``None``, there is no limit to the wait time. 448 449 *return_when* indicates when this function should return. It must be one of 450 the following constants: 451 452 .. tabularcolumns:: |l|L| 453 454 +-----------------------------+----------------------------------------+ 455 | Constant | Description | 456 +=============================+========================================+ 457 | :const:`FIRST_COMPLETED` | The function will return when any | 458 | | future finishes or is cancelled. | 459 +-----------------------------+----------------------------------------+ 460 | :const:`FIRST_EXCEPTION` | The function will return when any | 461 | | future finishes by raising an | 462 | | exception. If no future raises an | 463 | | exception then it is equivalent to | 464 | | :const:`ALL_COMPLETED`. | 465 +-----------------------------+----------------------------------------+ 466 | :const:`ALL_COMPLETED` | The function will return when all | 467 | | futures finish or are cancelled. | 468 +-----------------------------+----------------------------------------+ 469 470.. function:: as_completed(fs, timeout=None) 471 472 Returns an iterator over the :class:`Future` instances (possibly created by 473 different :class:`Executor` instances) given by *fs* that yields futures as 474 they complete (finished or cancelled futures). Any futures given by *fs* that 475 are duplicated will be returned once. Any futures that completed before 476 :func:`as_completed` is called will be yielded first. The returned iterator 477 raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__` 478 is called and the result isn't available after *timeout* seconds from the 479 original call to :func:`as_completed`. *timeout* can be an int or float. If 480 *timeout* is not specified or ``None``, there is no limit to the wait time. 481 482 483.. seealso:: 484 485 :pep:`3148` -- futures - execute computations asynchronously 486 The proposal which described this feature for inclusion in the Python 487 standard library. 488 489 490Exception classes 491----------------- 492 493.. currentmodule:: concurrent.futures 494 495.. exception:: CancelledError 496 497 Raised when a future is cancelled. 498 499.. exception:: TimeoutError 500 501 Raised when a future operation exceeds the given timeout. 502 503.. exception:: BrokenExecutor 504 505 Derived from :exc:`RuntimeError`, this exception class is raised 506 when an executor is broken for some reason, and cannot be used 507 to submit or execute new tasks. 508 509 .. versionadded:: 3.7 510 511.. exception:: InvalidStateError 512 513 Raised when an operation is performed on a future that is not allowed 514 in the current state. 515 516 .. versionadded:: 3.8 517 518.. currentmodule:: concurrent.futures.thread 519 520.. exception:: BrokenThreadPool 521 522 Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception 523 class is raised when one of the workers of a :class:`ThreadPoolExecutor` 524 has failed initializing. 525 526 .. versionadded:: 3.7 527 528.. currentmodule:: concurrent.futures.process 529 530.. exception:: BrokenProcessPool 531 532 Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly 533 :exc:`RuntimeError`), this exception class is raised when one of the 534 workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean 535 fashion (for example, if it was killed from the outside). 536 537 .. versionadded:: 3.3 538