• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1:mod:`concurrent.futures` --- Launching parallel tasks
2======================================================
3
4.. module:: concurrent.futures
5   :synopsis: Execute computations concurrently using threads or processes.
6
7.. versionadded:: 3.2
8
9**Source code:** :source:`Lib/concurrent/futures/thread.py`
10and :source:`Lib/concurrent/futures/process.py`
11
12--------------
13
14The :mod:`concurrent.futures` module provides a high-level interface for
15asynchronously executing callables.
16
17The asynchronous execution can be performed with threads, using
18:class:`ThreadPoolExecutor`, or separate processes, using
19:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20defined by the abstract :class:`Executor` class.
21
22
23Executor Objects
24----------------
25
26.. class:: Executor
27
28   An abstract class that provides methods to execute calls asynchronously.  It
29   should not be used directly, but through its concrete subclasses.
30
31    .. method:: submit(fn, /, *args, **kwargs)
32
33       Schedules the callable, *fn*, to be executed as ``fn(*args, **kwargs)``
34       and returns a :class:`Future` object representing the execution of the
35       callable. ::
36
37          with ThreadPoolExecutor(max_workers=1) as executor:
38              future = executor.submit(pow, 323, 1235)
39              print(future.result())
40
41    .. method:: map(func, *iterables, timeout=None, chunksize=1)
42
43       Similar to :func:`map(func, *iterables) <map>` except:
44
45       * the *iterables* are collected immediately rather than lazily;
46
47       * *func* is executed asynchronously and several calls to
48         *func* may be made concurrently.
49
50       The returned iterator raises a :exc:`concurrent.futures.TimeoutError`
51       if :meth:`~iterator.__next__` is called and the result isn't available
52       after *timeout* seconds from the original call to :meth:`Executor.map`.
53       *timeout* can be an int or a float.  If *timeout* is not specified or
54       ``None``, there is no limit to the wait time.
55
56       If a *func* call raises an exception, then that exception will be
57       raised when its value is retrieved from the iterator.
58
59       When using :class:`ProcessPoolExecutor`, this method chops *iterables*
60       into a number of chunks which it submits to the pool as separate
61       tasks.  The (approximate) size of these chunks can be specified by
62       setting *chunksize* to a positive integer.  For very long iterables,
63       using a large value for *chunksize* can significantly improve
64       performance compared to the default size of 1.  With
65       :class:`ThreadPoolExecutor`, *chunksize* has no effect.
66
67       .. versionchanged:: 3.5
68          Added the *chunksize* argument.
69
70    .. method:: shutdown(wait=True, *, cancel_futures=False)
71
72       Signal the executor that it should free any resources that it is using
73       when the currently pending futures are done executing.  Calls to
74       :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
75       raise :exc:`RuntimeError`.
76
77       If *wait* is ``True`` then this method will not return until all the
78       pending futures are done executing and the resources associated with the
79       executor have been freed.  If *wait* is ``False`` then this method will
80       return immediately and the resources associated with the executor will be
81       freed when all pending futures are done executing.  Regardless of the
82       value of *wait*, the entire Python program will not exit until all
83       pending futures are done executing.
84
85       If *cancel_futures* is ``True``, this method will cancel all pending
86       futures that the executor has not started running. Any futures that
87       are completed or running won't be cancelled, regardless of the value
88       of *cancel_futures*.
89
90       If both *cancel_futures* and *wait* are ``True``, all futures that the
91       executor has started running will be completed prior to this method
92       returning. The remaining futures are cancelled.
93
94       You can avoid having to call this method explicitly if you use the
95       :keyword:`with` statement, which will shutdown the :class:`Executor`
96       (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
97       ``True``)::
98
99          import shutil
100          with ThreadPoolExecutor(max_workers=4) as e:
101              e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
102              e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
103              e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
104              e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
105
106       .. versionchanged:: 3.9
107          Added *cancel_futures*.
108
109
110ThreadPoolExecutor
111------------------
112
113:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of
114threads to execute calls asynchronously.
115
116Deadlocks can occur when the callable associated with a :class:`Future` waits on
117the results of another :class:`Future`.  For example::
118
119   import time
120   def wait_on_b():
121       time.sleep(5)
122       print(b.result())  # b will never complete because it is waiting on a.
123       return 5
124
125   def wait_on_a():
126       time.sleep(5)
127       print(a.result())  # a will never complete because it is waiting on b.
128       return 6
129
130
131   executor = ThreadPoolExecutor(max_workers=2)
132   a = executor.submit(wait_on_b)
133   b = executor.submit(wait_on_a)
134
135And::
136
137   def wait_on_future():
138       f = executor.submit(pow, 5, 2)
139       # This will never complete because there is only one worker thread and
140       # it is executing this function.
141       print(f.result())
142
143   executor = ThreadPoolExecutor(max_workers=1)
144   executor.submit(wait_on_future)
145
146
147.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
148
149   An :class:`Executor` subclass that uses a pool of at most *max_workers*
150   threads to execute calls asynchronously.
151
152   *initializer* is an optional callable that is called at the start of
153   each worker thread; *initargs* is a tuple of arguments passed to the
154   initializer.  Should *initializer* raise an exception, all currently
155   pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
156   as well as any attempt to submit more jobs to the pool.
157
158   .. versionchanged:: 3.5
159      If *max_workers* is ``None`` or
160      not given, it will default to the number of processors on the machine,
161      multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often
162      used to overlap I/O instead of CPU work and the number of workers
163      should be higher than the number of workers
164      for :class:`ProcessPoolExecutor`.
165
166   .. versionadded:: 3.6
167      The *thread_name_prefix* argument was added to allow users to
168      control the :class:`threading.Thread` names for worker threads created by
169      the pool for easier debugging.
170
171   .. versionchanged:: 3.7
172      Added the *initializer* and *initargs* arguments.
173
174   .. versionchanged:: 3.8
175      Default value of *max_workers* is changed to ``min(32, os.cpu_count() + 4)``.
176      This default value preserves at least 5 workers for I/O bound tasks.
177      It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL.
178      And it avoids using very large resources implicitly on many-core machines.
179
180      ThreadPoolExecutor now reuses idle worker threads before starting
181      *max_workers* worker threads too.
182
183
184.. _threadpoolexecutor-example:
185
186ThreadPoolExecutor Example
187~~~~~~~~~~~~~~~~~~~~~~~~~~
188::
189
190   import concurrent.futures
191   import urllib.request
192
193   URLS = ['http://www.foxnews.com/',
194           'http://www.cnn.com/',
195           'http://europe.wsj.com/',
196           'http://www.bbc.co.uk/',
197           'http://some-made-up-domain.com/']
198
199   # Retrieve a single page and report the URL and contents
200   def load_url(url, timeout):
201       with urllib.request.urlopen(url, timeout=timeout) as conn:
202           return conn.read()
203
204   # We can use a with statement to ensure threads are cleaned up promptly
205   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
206       # Start the load operations and mark each future with its URL
207       future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
208       for future in concurrent.futures.as_completed(future_to_url):
209           url = future_to_url[future]
210           try:
211               data = future.result()
212           except Exception as exc:
213               print('%r generated an exception: %s' % (url, exc))
214           else:
215               print('%r page is %d bytes' % (url, len(data)))
216
217
218ProcessPoolExecutor
219-------------------
220
221The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
222uses a pool of processes to execute calls asynchronously.
223:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
224allows it to side-step the :term:`Global Interpreter Lock
225<global interpreter lock>` but also means that
226only picklable objects can be executed and returned.
227
228The ``__main__`` module must be importable by worker subprocesses. This means
229that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
230
231Calling :class:`Executor` or :class:`Future` methods from a callable submitted
232to a :class:`ProcessPoolExecutor` will result in deadlock.
233
234.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
235
236   An :class:`Executor` subclass that executes calls asynchronously using a pool
237   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
238   given, it will default to the number of processors on the machine.
239   If *max_workers* is less than or equal to ``0``, then a :exc:`ValueError`
240   will be raised.
241   On Windows, *max_workers* must be less than or equal to ``61``. If it is not
242   then :exc:`ValueError` will be raised. If *max_workers* is ``None``, then
243   the default chosen will be at most ``61``, even if more processors are
244   available.
245   *mp_context* can be a multiprocessing context or None. It will be used to
246   launch the workers. If *mp_context* is ``None`` or not given, the default
247   multiprocessing context is used.
248
249   *initializer* is an optional callable that is called at the start of
250   each worker process; *initargs* is a tuple of arguments passed to the
251   initializer.  Should *initializer* raise an exception, all currently
252   pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
253   as well as any attempt to submit more jobs to the pool.
254
255   .. versionchanged:: 3.3
256      When one of the worker processes terminates abruptly, a
257      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
258      was undefined but operations on the executor or its futures would often
259      freeze or deadlock.
260
261   .. versionchanged:: 3.7
262      The *mp_context* argument was added to allow users to control the
263      start_method for worker processes created by the pool.
264
265      Added the *initializer* and *initargs* arguments.
266
267
268.. _processpoolexecutor-example:
269
270ProcessPoolExecutor Example
271~~~~~~~~~~~~~~~~~~~~~~~~~~~
272::
273
274   import concurrent.futures
275   import math
276
277   PRIMES = [
278       112272535095293,
279       112582705942171,
280       112272535095293,
281       115280095190773,
282       115797848077099,
283       1099726899285419]
284
285   def is_prime(n):
286       if n < 2:
287           return False
288       if n == 2:
289           return True
290       if n % 2 == 0:
291           return False
292
293       sqrt_n = int(math.floor(math.sqrt(n)))
294       for i in range(3, sqrt_n + 1, 2):
295           if n % i == 0:
296               return False
297       return True
298
299   def main():
300       with concurrent.futures.ProcessPoolExecutor() as executor:
301           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
302               print('%d is prime: %s' % (number, prime))
303
304   if __name__ == '__main__':
305       main()
306
307
308Future Objects
309--------------
310
311The :class:`Future` class encapsulates the asynchronous execution of a callable.
312:class:`Future` instances are created by :meth:`Executor.submit`.
313
314.. class:: Future
315
316   Encapsulates the asynchronous execution of a callable.  :class:`Future`
317   instances are created by :meth:`Executor.submit` and should not be created
318   directly except for testing.
319
320    .. method:: cancel()
321
322       Attempt to cancel the call.  If the call is currently being executed or
323       finished running and cannot be cancelled then the method will return
324       ``False``, otherwise the call will be cancelled and the method will
325       return ``True``.
326
327    .. method:: cancelled()
328
329       Return ``True`` if the call was successfully cancelled.
330
331    .. method:: running()
332
333       Return ``True`` if the call is currently being executed and cannot be
334       cancelled.
335
336    .. method:: done()
337
338       Return ``True`` if the call was successfully cancelled or finished
339       running.
340
341    .. method:: result(timeout=None)
342
343       Return the value returned by the call. If the call hasn't yet completed
344       then this method will wait up to *timeout* seconds.  If the call hasn't
345       completed in *timeout* seconds, then a
346       :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be
347       an int or float.  If *timeout* is not specified or ``None``, there is no
348       limit to the wait time.
349
350       If the future is cancelled before completing then :exc:`.CancelledError`
351       will be raised.
352
353       If the call raised an exception, this method will raise the same exception.
354
355    .. method:: exception(timeout=None)
356
357       Return the exception raised by the call.  If the call hasn't yet
358       completed then this method will wait up to *timeout* seconds.  If the
359       call hasn't completed in *timeout* seconds, then a
360       :exc:`concurrent.futures.TimeoutError` will be raised.  *timeout* can be
361       an int or float.  If *timeout* is not specified or ``None``, there is no
362       limit to the wait time.
363
364       If the future is cancelled before completing then :exc:`.CancelledError`
365       will be raised.
366
367       If the call completed without raising, ``None`` is returned.
368
369    .. method:: add_done_callback(fn)
370
371       Attaches the callable *fn* to the future.  *fn* will be called, with the
372       future as its only argument, when the future is cancelled or finishes
373       running.
374
375       Added callables are called in the order that they were added and are
376       always called in a thread belonging to the process that added them.  If
377       the callable raises an :exc:`Exception` subclass, it will be logged and
378       ignored.  If the callable raises a :exc:`BaseException` subclass, the
379       behavior is undefined.
380
381       If the future has already completed or been cancelled, *fn* will be
382       called immediately.
383
384   The following :class:`Future` methods are meant for use in unit tests and
385   :class:`Executor` implementations.
386
387    .. method:: set_running_or_notify_cancel()
388
389       This method should only be called by :class:`Executor` implementations
390       before executing the work associated with the :class:`Future` and by unit
391       tests.
392
393       If the method returns ``False`` then the :class:`Future` was cancelled,
394       i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
395       waiting on the :class:`Future` completing (i.e. through
396       :func:`as_completed` or :func:`wait`) will be woken up.
397
398       If the method returns ``True`` then the :class:`Future` was not cancelled
399       and has been put in the running state, i.e. calls to
400       :meth:`Future.running` will return `True`.
401
402       This method can only be called once and cannot be called after
403       :meth:`Future.set_result` or :meth:`Future.set_exception` have been
404       called.
405
406    .. method:: set_result(result)
407
408       Sets the result of the work associated with the :class:`Future` to
409       *result*.
410
411       This method should only be used by :class:`Executor` implementations and
412       unit tests.
413
414       .. versionchanged:: 3.8
415          This method raises
416          :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
417          already done.
418
419    .. method:: set_exception(exception)
420
421       Sets the result of the work associated with the :class:`Future` to the
422       :class:`Exception` *exception*.
423
424       This method should only be used by :class:`Executor` implementations and
425       unit tests.
426
427       .. versionchanged:: 3.8
428          This method raises
429          :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
430          already done.
431
432Module Functions
433----------------
434
435.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
436
437   Wait for the :class:`Future` instances (possibly created by different
438   :class:`Executor` instances) given by *fs* to complete. Duplicate futures
439   given to *fs* are removed and will be returned only once. Returns a named
440   2-tuple of sets.  The first set, named ``done``, contains the futures that
441   completed (finished or cancelled futures) before the wait completed.  The
442   second set, named ``not_done``, contains the futures that did not complete
443   (pending or running futures).
444
445   *timeout* can be used to control the maximum number of seconds to wait before
446   returning.  *timeout* can be an int or float.  If *timeout* is not specified
447   or ``None``, there is no limit to the wait time.
448
449   *return_when* indicates when this function should return.  It must be one of
450   the following constants:
451
452   .. tabularcolumns:: |l|L|
453
454   +-----------------------------+----------------------------------------+
455   | Constant                    | Description                            |
456   +=============================+========================================+
457   | :const:`FIRST_COMPLETED`    | The function will return when any      |
458   |                             | future finishes or is cancelled.       |
459   +-----------------------------+----------------------------------------+
460   | :const:`FIRST_EXCEPTION`    | The function will return when any      |
461   |                             | future finishes by raising an          |
462   |                             | exception.  If no future raises an     |
463   |                             | exception then it is equivalent to     |
464   |                             | :const:`ALL_COMPLETED`.                |
465   +-----------------------------+----------------------------------------+
466   | :const:`ALL_COMPLETED`      | The function will return when all      |
467   |                             | futures finish or are cancelled.       |
468   +-----------------------------+----------------------------------------+
469
470.. function:: as_completed(fs, timeout=None)
471
472   Returns an iterator over the :class:`Future` instances (possibly created by
473   different :class:`Executor` instances) given by *fs* that yields futures as
474   they complete (finished or cancelled futures). Any futures given by *fs* that
475   are duplicated will be returned once. Any futures that completed before
476   :func:`as_completed` is called will be yielded first.  The returned iterator
477   raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__`
478   is called and the result isn't available after *timeout* seconds from the
479   original call to :func:`as_completed`.  *timeout* can be an int or float. If
480   *timeout* is not specified or ``None``, there is no limit to the wait time.
481
482
483.. seealso::
484
485   :pep:`3148` -- futures - execute computations asynchronously
486      The proposal which described this feature for inclusion in the Python
487      standard library.
488
489
490Exception classes
491-----------------
492
493.. currentmodule:: concurrent.futures
494
495.. exception:: CancelledError
496
497   Raised when a future is cancelled.
498
499.. exception:: TimeoutError
500
501   Raised when a future operation exceeds the given timeout.
502
503.. exception:: BrokenExecutor
504
505   Derived from :exc:`RuntimeError`, this exception class is raised
506   when an executor is broken for some reason, and cannot be used
507   to submit or execute new tasks.
508
509   .. versionadded:: 3.7
510
511.. exception:: InvalidStateError
512
513   Raised when an operation is performed on a future that is not allowed
514   in the current state.
515
516   .. versionadded:: 3.8
517
518.. currentmodule:: concurrent.futures.thread
519
520.. exception:: BrokenThreadPool
521
522   Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
523   class is raised when one of the workers of a :class:`ThreadPoolExecutor`
524   has failed initializing.
525
526   .. versionadded:: 3.7
527
528.. currentmodule:: concurrent.futures.process
529
530.. exception:: BrokenProcessPool
531
532   Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
533   :exc:`RuntimeError`), this exception class is raised when one of the
534   workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
535   fashion (for example, if it was killed from the outside).
536
537   .. versionadded:: 3.3
538