1:mod:`concurrent.futures` --- Launching parallel tasks 2====================================================== 3 4.. module:: concurrent.futures 5 :synopsis: Execute computations concurrently using threads or processes. 6 7.. versionadded:: 3.2 8 9**Source code:** :source:`Lib/concurrent/futures/thread.py` 10and :source:`Lib/concurrent/futures/process.py` 11 12-------------- 13 14The :mod:`concurrent.futures` module provides a high-level interface for 15asynchronously executing callables. 16 17The asynchronous execution can be performed with threads, using 18:class:`ThreadPoolExecutor`, or separate processes, using 19:class:`ProcessPoolExecutor`. Both implement the same interface, which is 20defined by the abstract :class:`Executor` class. 21 22 23Executor Objects 24---------------- 25 26.. class:: Executor 27 28 An abstract class that provides methods to execute calls asynchronously. It 29 should not be used directly, but through its concrete subclasses. 30 31 .. method:: submit(fn, *args, **kwargs) 32 33 Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)`` 34 and returns a :class:`Future` object representing the execution of the 35 callable. :: 36 37 with ThreadPoolExecutor(max_workers=1) as executor: 38 future = executor.submit(pow, 323, 1235) 39 print(future.result()) 40 41 .. method:: map(func, *iterables, timeout=None, chunksize=1) 42 43 Similar to :func:`map(func, *iterables) <map>` except: 44 45 * the *iterables* are collected immediately rather than lazily; 46 47 * *func* is executed asynchronously and several calls to 48 *func* may be made concurrently. 49 50 The returned iterator raises a :exc:`concurrent.futures.TimeoutError` 51 if :meth:`~iterator.__next__` is called and the result isn't available 52 after *timeout* seconds from the original call to :meth:`Executor.map`. 53 *timeout* can be an int or a float. If *timeout* is not specified or 54 ``None``, there is no limit to the wait time. 55 56 If a *func* call raises an exception, then that exception will be 57 raised when its value is retrieved from the iterator. 58 59 When using :class:`ProcessPoolExecutor`, this method chops *iterables* 60 into a number of chunks which it submits to the pool as separate 61 tasks. The (approximate) size of these chunks can be specified by 62 setting *chunksize* to a positive integer. For very long iterables, 63 using a large value for *chunksize* can significantly improve 64 performance compared to the default size of 1. With 65 :class:`ThreadPoolExecutor`, *chunksize* has no effect. 66 67 .. versionchanged:: 3.5 68 Added the *chunksize* argument. 69 70 .. method:: shutdown(wait=True) 71 72 Signal the executor that it should free any resources that it is using 73 when the currently pending futures are done executing. Calls to 74 :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will 75 raise :exc:`RuntimeError`. 76 77 If *wait* is ``True`` then this method will not return until all the 78 pending futures are done executing and the resources associated with the 79 executor have been freed. If *wait* is ``False`` then this method will 80 return immediately and the resources associated with the executor will be 81 freed when all pending futures are done executing. Regardless of the 82 value of *wait*, the entire Python program will not exit until all 83 pending futures are done executing. 84 85 You can avoid having to call this method explicitly if you use the 86 :keyword:`with` statement, which will shutdown the :class:`Executor` 87 (waiting as if :meth:`Executor.shutdown` were called with *wait* set to 88 ``True``):: 89 90 import shutil 91 with ThreadPoolExecutor(max_workers=4) as e: 92 e.submit(shutil.copy, 'src1.txt', 'dest1.txt') 93 e.submit(shutil.copy, 'src2.txt', 'dest2.txt') 94 e.submit(shutil.copy, 'src3.txt', 'dest3.txt') 95 e.submit(shutil.copy, 'src4.txt', 'dest4.txt') 96 97 98ThreadPoolExecutor 99------------------ 100 101:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of 102threads to execute calls asynchronously. 103 104Deadlocks can occur when the callable associated with a :class:`Future` waits on 105the results of another :class:`Future`. For example:: 106 107 import time 108 def wait_on_b(): 109 time.sleep(5) 110 print(b.result()) # b will never complete because it is waiting on a. 111 return 5 112 113 def wait_on_a(): 114 time.sleep(5) 115 print(a.result()) # a will never complete because it is waiting on b. 116 return 6 117 118 119 executor = ThreadPoolExecutor(max_workers=2) 120 a = executor.submit(wait_on_b) 121 b = executor.submit(wait_on_a) 122 123And:: 124 125 def wait_on_future(): 126 f = executor.submit(pow, 5, 2) 127 # This will never complete because there is only one worker thread and 128 # it is executing this function. 129 print(f.result()) 130 131 executor = ThreadPoolExecutor(max_workers=1) 132 executor.submit(wait_on_future) 133 134 135.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()) 136 137 An :class:`Executor` subclass that uses a pool of at most *max_workers* 138 threads to execute calls asynchronously. 139 140 *initializer* is an optional callable that is called at the start of 141 each worker thread; *initargs* is a tuple of arguments passed to the 142 initializer. Should *initializer* raise an exception, all currently 143 pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`, 144 as well as any attempt to submit more jobs to the pool. 145 146 .. versionchanged:: 3.5 147 If *max_workers* is ``None`` or 148 not given, it will default to the number of processors on the machine, 149 multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often 150 used to overlap I/O instead of CPU work and the number of workers 151 should be higher than the number of workers 152 for :class:`ProcessPoolExecutor`. 153 154 .. versionadded:: 3.6 155 The *thread_name_prefix* argument was added to allow users to 156 control the :class:`threading.Thread` names for worker threads created by 157 the pool for easier debugging. 158 159 .. versionchanged:: 3.7 160 Added the *initializer* and *initargs* arguments. 161 162 163.. _threadpoolexecutor-example: 164 165ThreadPoolExecutor Example 166~~~~~~~~~~~~~~~~~~~~~~~~~~ 167:: 168 169 import concurrent.futures 170 import urllib.request 171 172 URLS = ['http://www.foxnews.com/', 173 'http://www.cnn.com/', 174 'http://europe.wsj.com/', 175 'http://www.bbc.co.uk/', 176 'http://some-made-up-domain.com/'] 177 178 # Retrieve a single page and report the URL and contents 179 def load_url(url, timeout): 180 with urllib.request.urlopen(url, timeout=timeout) as conn: 181 return conn.read() 182 183 # We can use a with statement to ensure threads are cleaned up promptly 184 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: 185 # Start the load operations and mark each future with its URL 186 future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} 187 for future in concurrent.futures.as_completed(future_to_url): 188 url = future_to_url[future] 189 try: 190 data = future.result() 191 except Exception as exc: 192 print('%r generated an exception: %s' % (url, exc)) 193 else: 194 print('%r page is %d bytes' % (url, len(data))) 195 196 197ProcessPoolExecutor 198------------------- 199 200The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that 201uses a pool of processes to execute calls asynchronously. 202:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which 203allows it to side-step the :term:`Global Interpreter Lock` but also means that 204only picklable objects can be executed and returned. 205 206The ``__main__`` module must be importable by worker subprocesses. This means 207that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. 208 209Calling :class:`Executor` or :class:`Future` methods from a callable submitted 210to a :class:`ProcessPoolExecutor` will result in deadlock. 211 212.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) 213 214 An :class:`Executor` subclass that executes calls asynchronously using a pool 215 of at most *max_workers* processes. If *max_workers* is ``None`` or not 216 given, it will default to the number of processors on the machine. 217 If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError` 218 will be raised. 219 *mp_context* can be a multiprocessing context or None. It will be used to 220 launch the workers. If *mp_context* is ``None`` or not given, the default 221 multiprocessing context is used. 222 223 *initializer* is an optional callable that is called at the start of 224 each worker process; *initargs* is a tuple of arguments passed to the 225 initializer. Should *initializer* raise an exception, all currently 226 pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, 227 as well any attempt to submit more jobs to the pool. 228 229 .. versionchanged:: 3.3 230 When one of the worker processes terminates abruptly, a 231 :exc:`BrokenProcessPool` error is now raised. Previously, behaviour 232 was undefined but operations on the executor or its futures would often 233 freeze or deadlock. 234 235 .. versionchanged:: 3.7 236 The *mp_context* argument was added to allow users to control the 237 start_method for worker processes created by the pool. 238 239 Added the *initializer* and *initargs* arguments. 240 241 242.. _processpoolexecutor-example: 243 244ProcessPoolExecutor Example 245~~~~~~~~~~~~~~~~~~~~~~~~~~~ 246:: 247 248 import concurrent.futures 249 import math 250 251 PRIMES = [ 252 112272535095293, 253 112582705942171, 254 112272535095293, 255 115280095190773, 256 115797848077099, 257 1099726899285419] 258 259 def is_prime(n): 260 if n % 2 == 0: 261 return False 262 263 sqrt_n = int(math.floor(math.sqrt(n))) 264 for i in range(3, sqrt_n + 1, 2): 265 if n % i == 0: 266 return False 267 return True 268 269 def main(): 270 with concurrent.futures.ProcessPoolExecutor() as executor: 271 for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): 272 print('%d is prime: %s' % (number, prime)) 273 274 if __name__ == '__main__': 275 main() 276 277 278Future Objects 279-------------- 280 281The :class:`Future` class encapsulates the asynchronous execution of a callable. 282:class:`Future` instances are created by :meth:`Executor.submit`. 283 284.. class:: Future 285 286 Encapsulates the asynchronous execution of a callable. :class:`Future` 287 instances are created by :meth:`Executor.submit` and should not be created 288 directly except for testing. 289 290 .. method:: cancel() 291 292 Attempt to cancel the call. If the call is currently being executed and 293 cannot be cancelled then the method will return ``False``, otherwise the 294 call will be cancelled and the method will return ``True``. 295 296 .. method:: cancelled() 297 298 Return ``True`` if the call was successfully cancelled. 299 300 .. method:: running() 301 302 Return ``True`` if the call is currently being executed and cannot be 303 cancelled. 304 305 .. method:: done() 306 307 Return ``True`` if the call was successfully cancelled or finished 308 running. 309 310 .. method:: result(timeout=None) 311 312 Return the value returned by the call. If the call hasn't yet completed 313 then this method will wait up to *timeout* seconds. If the call hasn't 314 completed in *timeout* seconds, then a 315 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 316 an int or float. If *timeout* is not specified or ``None``, there is no 317 limit to the wait time. 318 319 If the future is cancelled before completing then :exc:`.CancelledError` 320 will be raised. 321 322 If the call raised, this method will raise the same exception. 323 324 .. method:: exception(timeout=None) 325 326 Return the exception raised by the call. If the call hasn't yet 327 completed then this method will wait up to *timeout* seconds. If the 328 call hasn't completed in *timeout* seconds, then a 329 :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be 330 an int or float. If *timeout* is not specified or ``None``, there is no 331 limit to the wait time. 332 333 If the future is cancelled before completing then :exc:`.CancelledError` 334 will be raised. 335 336 If the call completed without raising, ``None`` is returned. 337 338 .. method:: add_done_callback(fn) 339 340 Attaches the callable *fn* to the future. *fn* will be called, with the 341 future as its only argument, when the future is cancelled or finishes 342 running. 343 344 Added callables are called in the order that they were added and are 345 always called in a thread belonging to the process that added them. If 346 the callable raises an :exc:`Exception` subclass, it will be logged and 347 ignored. If the callable raises a :exc:`BaseException` subclass, the 348 behavior is undefined. 349 350 If the future has already completed or been cancelled, *fn* will be 351 called immediately. 352 353 The following :class:`Future` methods are meant for use in unit tests and 354 :class:`Executor` implementations. 355 356 .. method:: set_running_or_notify_cancel() 357 358 This method should only be called by :class:`Executor` implementations 359 before executing the work associated with the :class:`Future` and by unit 360 tests. 361 362 If the method returns ``False`` then the :class:`Future` was cancelled, 363 i.e. :meth:`Future.cancel` was called and returned `True`. Any threads 364 waiting on the :class:`Future` completing (i.e. through 365 :func:`as_completed` or :func:`wait`) will be woken up. 366 367 If the method returns ``True`` then the :class:`Future` was not cancelled 368 and has been put in the running state, i.e. calls to 369 :meth:`Future.running` will return `True`. 370 371 This method can only be called once and cannot be called after 372 :meth:`Future.set_result` or :meth:`Future.set_exception` have been 373 called. 374 375 .. method:: set_result(result) 376 377 Sets the result of the work associated with the :class:`Future` to 378 *result*. 379 380 This method should only be used by :class:`Executor` implementations and 381 unit tests. 382 383 .. method:: set_exception(exception) 384 385 Sets the result of the work associated with the :class:`Future` to the 386 :class:`Exception` *exception*. 387 388 This method should only be used by :class:`Executor` implementations and 389 unit tests. 390 391 392Module Functions 393---------------- 394 395.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED) 396 397 Wait for the :class:`Future` instances (possibly created by different 398 :class:`Executor` instances) given by *fs* to complete. Returns a named 399 2-tuple of sets. The first set, named ``done``, contains the futures that 400 completed (finished or were cancelled) before the wait completed. The second 401 set, named ``not_done``, contains uncompleted futures. 402 403 *timeout* can be used to control the maximum number of seconds to wait before 404 returning. *timeout* can be an int or float. If *timeout* is not specified 405 or ``None``, there is no limit to the wait time. 406 407 *return_when* indicates when this function should return. It must be one of 408 the following constants: 409 410 .. tabularcolumns:: |l|L| 411 412 +-----------------------------+----------------------------------------+ 413 | Constant | Description | 414 +=============================+========================================+ 415 | :const:`FIRST_COMPLETED` | The function will return when any | 416 | | future finishes or is cancelled. | 417 +-----------------------------+----------------------------------------+ 418 | :const:`FIRST_EXCEPTION` | The function will return when any | 419 | | future finishes by raising an | 420 | | exception. If no future raises an | 421 | | exception then it is equivalent to | 422 | | :const:`ALL_COMPLETED`. | 423 +-----------------------------+----------------------------------------+ 424 | :const:`ALL_COMPLETED` | The function will return when all | 425 | | futures finish or are cancelled. | 426 +-----------------------------+----------------------------------------+ 427 428.. function:: as_completed(fs, timeout=None) 429 430 Returns an iterator over the :class:`Future` instances (possibly created by 431 different :class:`Executor` instances) given by *fs* that yields futures as 432 they complete (finished or were cancelled). Any futures given by *fs* that 433 are duplicated will be returned once. Any futures that completed before 434 :func:`as_completed` is called will be yielded first. The returned iterator 435 raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__` 436 is called and the result isn't available after *timeout* seconds from the 437 original call to :func:`as_completed`. *timeout* can be an int or float. If 438 *timeout* is not specified or ``None``, there is no limit to the wait time. 439 440 441.. seealso:: 442 443 :pep:`3148` -- futures - execute computations asynchronously 444 The proposal which described this feature for inclusion in the Python 445 standard library. 446 447 448Exception classes 449----------------- 450 451.. currentmodule:: concurrent.futures 452 453.. exception:: CancelledError 454 455 Raised when a future is cancelled. 456 457.. exception:: TimeoutError 458 459 Raised when a future operation exceeds the given timeout. 460 461.. exception:: BrokenExecutor 462 463 Derived from :exc:`RuntimeError`, this exception class is raised 464 when an executor is broken for some reason, and cannot be used 465 to submit or execute new tasks. 466 467 .. versionadded:: 3.7 468 469.. currentmodule:: concurrent.futures.thread 470 471.. exception:: BrokenThreadPool 472 473 Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception 474 class is raised when one of the workers of a :class:`ThreadPoolExecutor` 475 has failed initializing. 476 477 .. versionadded:: 3.7 478 479.. currentmodule:: concurrent.futures.process 480 481.. exception:: BrokenProcessPool 482 483 Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly 484 :exc:`RuntimeError`), this exception class is raised when one of the 485 workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean 486 fashion (for example, if it was killed from the outside). 487 488 .. versionadded:: 3.3 489