• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1:mod:`!concurrent.futures` --- Launching parallel tasks
2=======================================================
3
4.. module:: concurrent.futures
5   :synopsis: Execute computations concurrently using threads or processes.
6
7.. versionadded:: 3.2
8
9**Source code:** :source:`Lib/concurrent/futures/thread.py`
10and :source:`Lib/concurrent/futures/process.py`
11
12--------------
13
14The :mod:`concurrent.futures` module provides a high-level interface for
15asynchronously executing callables.
16
17The asynchronous execution can be performed with threads, using
18:class:`ThreadPoolExecutor`, or separate processes, using
19:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20defined by the abstract :class:`Executor` class.
21
22.. include:: ../includes/wasm-notavail.rst
23
24Executor Objects
25----------------
26
27.. class:: Executor
28
29   An abstract class that provides methods to execute calls asynchronously.  It
30   should not be used directly, but through its concrete subclasses.
31
32   .. method:: submit(fn, /, *args, **kwargs)
33
34      Schedules the callable, *fn*, to be executed as ``fn(*args, **kwargs)``
35      and returns a :class:`Future` object representing the execution of the
36      callable. ::
37
38         with ThreadPoolExecutor(max_workers=1) as executor:
39             future = executor.submit(pow, 323, 1235)
40             print(future.result())
41
42   .. method:: map(fn, *iterables, timeout=None, chunksize=1)
43
44      Similar to :func:`map(fn, *iterables) <map>` except:
45
46      * the *iterables* are collected immediately rather than lazily;
47
48      * *fn* is executed asynchronously and several calls to
49        *fn* may be made concurrently.
50
51      The returned iterator raises a :exc:`TimeoutError`
52      if :meth:`~iterator.__next__` is called and the result isn't available
53      after *timeout* seconds from the original call to :meth:`Executor.map`.
54      *timeout* can be an int or a float.  If *timeout* is not specified or
55      ``None``, there is no limit to the wait time.
56
57      If a *fn* call raises an exception, then that exception will be
58      raised when its value is retrieved from the iterator.
59
60      When using :class:`ProcessPoolExecutor`, this method chops *iterables*
61      into a number of chunks which it submits to the pool as separate
62      tasks.  The (approximate) size of these chunks can be specified by
63      setting *chunksize* to a positive integer.  For very long iterables,
64      using a large value for *chunksize* can significantly improve
65      performance compared to the default size of 1.  With
66      :class:`ThreadPoolExecutor`, *chunksize* has no effect.
67
68      .. versionchanged:: 3.5
69         Added the *chunksize* argument.
70
71   .. method:: shutdown(wait=True, *, cancel_futures=False)
72
73      Signal the executor that it should free any resources that it is using
74      when the currently pending futures are done executing.  Calls to
75      :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
76      raise :exc:`RuntimeError`.
77
78      If *wait* is ``True`` then this method will not return until all the
79      pending futures are done executing and the resources associated with the
80      executor have been freed.  If *wait* is ``False`` then this method will
81      return immediately and the resources associated with the executor will be
82      freed when all pending futures are done executing.  Regardless of the
83      value of *wait*, the entire Python program will not exit until all
84      pending futures are done executing.
85
86      If *cancel_futures* is ``True``, this method will cancel all pending
87      futures that the executor has not started running. Any futures that
88      are completed or running won't be cancelled, regardless of the value
89      of *cancel_futures*.
90
91      If both *cancel_futures* and *wait* are ``True``, all futures that the
92      executor has started running will be completed prior to this method
93      returning. The remaining futures are cancelled.
94
95      You can avoid having to call this method explicitly if you use the
96      :keyword:`with` statement, which will shutdown the :class:`Executor`
97      (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
98      ``True``)::
99
100         import shutil
101         with ThreadPoolExecutor(max_workers=4) as e:
102             e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
103             e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
104             e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
105             e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
106
107      .. versionchanged:: 3.9
108         Added *cancel_futures*.
109
110
111ThreadPoolExecutor
112------------------
113
114:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of
115threads to execute calls asynchronously.
116
117Deadlocks can occur when the callable associated with a :class:`Future` waits on
118the results of another :class:`Future`.  For example::
119
120   import time
121   def wait_on_b():
122       time.sleep(5)
123       print(b.result())  # b will never complete because it is waiting on a.
124       return 5
125
126   def wait_on_a():
127       time.sleep(5)
128       print(a.result())  # a will never complete because it is waiting on b.
129       return 6
130
131
132   executor = ThreadPoolExecutor(max_workers=2)
133   a = executor.submit(wait_on_b)
134   b = executor.submit(wait_on_a)
135
136And::
137
138   def wait_on_future():
139       f = executor.submit(pow, 5, 2)
140       # This will never complete because there is only one worker thread and
141       # it is executing this function.
142       print(f.result())
143
144   executor = ThreadPoolExecutor(max_workers=1)
145   executor.submit(wait_on_future)
146
147
148.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
149
150   An :class:`Executor` subclass that uses a pool of at most *max_workers*
151   threads to execute calls asynchronously.
152
153   All threads enqueued to ``ThreadPoolExecutor`` will be joined before the
154   interpreter can exit. Note that the exit handler which does this is
155   executed *before* any exit handlers added using ``atexit``. This means
156   exceptions in the main thread must be caught and handled in order to
157   signal threads to exit gracefully. For this reason, it is recommended
158   that ``ThreadPoolExecutor`` not be used for long-running tasks.
159
160   *initializer* is an optional callable that is called at the start of
161   each worker thread; *initargs* is a tuple of arguments passed to the
162   initializer.  Should *initializer* raise an exception, all currently
163   pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
164   as well as any attempt to submit more jobs to the pool.
165
166   .. versionchanged:: 3.5
167      If *max_workers* is ``None`` or
168      not given, it will default to the number of processors on the machine,
169      multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often
170      used to overlap I/O instead of CPU work and the number of workers
171      should be higher than the number of workers
172      for :class:`ProcessPoolExecutor`.
173
174   .. versionchanged:: 3.6
175      Added the *thread_name_prefix* parameter to allow users to
176      control the :class:`threading.Thread` names for worker threads created by
177      the pool for easier debugging.
178
179   .. versionchanged:: 3.7
180      Added the *initializer* and *initargs* arguments.
181
182   .. versionchanged:: 3.8
183      Default value of *max_workers* is changed to ``min(32, os.cpu_count() + 4)``.
184      This default value preserves at least 5 workers for I/O bound tasks.
185      It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL.
186      And it avoids using very large resources implicitly on many-core machines.
187
188      ThreadPoolExecutor now reuses idle worker threads before starting
189      *max_workers* worker threads too.
190
191   .. versionchanged:: 3.13
192      Default value of *max_workers* is changed to
193      ``min(32, (os.process_cpu_count() or 1) + 4)``.
194
195
196.. _threadpoolexecutor-example:
197
198ThreadPoolExecutor Example
199~~~~~~~~~~~~~~~~~~~~~~~~~~
200::
201
202   import concurrent.futures
203   import urllib.request
204
205   URLS = ['http://www.foxnews.com/',
206           'http://www.cnn.com/',
207           'http://europe.wsj.com/',
208           'http://www.bbc.co.uk/',
209           'http://nonexistent-subdomain.python.org/']
210
211   # Retrieve a single page and report the URL and contents
212   def load_url(url, timeout):
213       with urllib.request.urlopen(url, timeout=timeout) as conn:
214           return conn.read()
215
216   # We can use a with statement to ensure threads are cleaned up promptly
217   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
218       # Start the load operations and mark each future with its URL
219       future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
220       for future in concurrent.futures.as_completed(future_to_url):
221           url = future_to_url[future]
222           try:
223               data = future.result()
224           except Exception as exc:
225               print('%r generated an exception: %s' % (url, exc))
226           else:
227               print('%r page is %d bytes' % (url, len(data)))
228
229
230ProcessPoolExecutor
231-------------------
232
233The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
234uses a pool of processes to execute calls asynchronously.
235:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
236allows it to side-step the :term:`Global Interpreter Lock
237<global interpreter lock>` but also means that
238only picklable objects can be executed and returned.
239
240The ``__main__`` module must be importable by worker subprocesses. This means
241that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
242
243Calling :class:`Executor` or :class:`Future` methods from a callable submitted
244to a :class:`ProcessPoolExecutor` will result in deadlock.
245
246.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
247
248   An :class:`Executor` subclass that executes calls asynchronously using a pool
249   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
250   given, it will default to :func:`os.process_cpu_count`.
251   If *max_workers* is less than or equal to ``0``, then a :exc:`ValueError`
252   will be raised.
253   On Windows, *max_workers* must be less than or equal to ``61``. If it is not
254   then :exc:`ValueError` will be raised. If *max_workers* is ``None``, then
255   the default chosen will be at most ``61``, even if more processors are
256   available.
257   *mp_context* can be a :mod:`multiprocessing` context or ``None``. It will be
258   used to launch the workers. If *mp_context* is ``None`` or not given, the
259   default :mod:`multiprocessing` context is used.
260   See :ref:`multiprocessing-start-methods`.
261
262   *initializer* is an optional callable that is called at the start of
263   each worker process; *initargs* is a tuple of arguments passed to the
264   initializer.  Should *initializer* raise an exception, all currently
265   pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
266   as well as any attempt to submit more jobs to the pool.
267
268   *max_tasks_per_child* is an optional argument that specifies the maximum
269   number of tasks a single process can execute before it will exit and be
270   replaced with a fresh worker process. By default *max_tasks_per_child* is
271   ``None`` which means worker processes will live as long as the pool. When
272   a max is specified, the "spawn" multiprocessing start method will be used by
273   default in absence of a *mp_context* parameter. This feature is incompatible
274   with the "fork" start method.
275
276   .. versionchanged:: 3.3
277      When one of the worker processes terminates abruptly, a
278      :exc:`~concurrent.futures.process.BrokenProcessPool` error is now raised.
279      Previously, behaviour
280      was undefined but operations on the executor or its futures would often
281      freeze or deadlock.
282
283   .. versionchanged:: 3.7
284      The *mp_context* argument was added to allow users to control the
285      start_method for worker processes created by the pool.
286
287      Added the *initializer* and *initargs* arguments.
288
289      .. note::
290         The default :mod:`multiprocessing` start method
291         (see :ref:`multiprocessing-start-methods`) will change away from
292         *fork* in Python 3.14.  Code that requires *fork* be used for their
293         :class:`ProcessPoolExecutor` should explicitly specify that by
294         passing a ``mp_context=multiprocessing.get_context("fork")``
295         parameter.
296
297   .. versionchanged:: 3.11
298      The *max_tasks_per_child* argument was added to allow users to
299      control the lifetime of workers in the pool.
300
301   .. versionchanged:: 3.12
302      On POSIX systems, if your application has multiple threads and the
303      :mod:`multiprocessing` context uses the ``"fork"`` start method:
304      The :func:`os.fork` function called internally to spawn workers may raise a
305      :exc:`DeprecationWarning`. Pass a *mp_context* configured to use a
306      different start method. See the :func:`os.fork` documentation for
307      further explanation.
308
309   .. versionchanged:: 3.13
310      *max_workers* uses :func:`os.process_cpu_count` by default, instead of
311      :func:`os.cpu_count`.
312
313.. _processpoolexecutor-example:
314
315ProcessPoolExecutor Example
316~~~~~~~~~~~~~~~~~~~~~~~~~~~
317::
318
319   import concurrent.futures
320   import math
321
322   PRIMES = [
323       112272535095293,
324       112582705942171,
325       112272535095293,
326       115280095190773,
327       115797848077099,
328       1099726899285419]
329
330   def is_prime(n):
331       if n < 2:
332           return False
333       if n == 2:
334           return True
335       if n % 2 == 0:
336           return False
337
338       sqrt_n = int(math.floor(math.sqrt(n)))
339       for i in range(3, sqrt_n + 1, 2):
340           if n % i == 0:
341               return False
342       return True
343
344   def main():
345       with concurrent.futures.ProcessPoolExecutor() as executor:
346           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
347               print('%d is prime: %s' % (number, prime))
348
349   if __name__ == '__main__':
350       main()
351
352
353Future Objects
354--------------
355
356The :class:`Future` class encapsulates the asynchronous execution of a callable.
357:class:`Future` instances are created by :meth:`Executor.submit`.
358
359.. class:: Future
360
361   Encapsulates the asynchronous execution of a callable.  :class:`Future`
362   instances are created by :meth:`Executor.submit` and should not be created
363   directly except for testing.
364
365   .. method:: cancel()
366
367      Attempt to cancel the call.  If the call is currently being executed or
368      finished running and cannot be cancelled then the method will return
369      ``False``, otherwise the call will be cancelled and the method will
370      return ``True``.
371
372   .. method:: cancelled()
373
374      Return ``True`` if the call was successfully cancelled.
375
376   .. method:: running()
377
378      Return ``True`` if the call is currently being executed and cannot be
379      cancelled.
380
381   .. method:: done()
382
383      Return ``True`` if the call was successfully cancelled or finished
384      running.
385
386   .. method:: result(timeout=None)
387
388      Return the value returned by the call. If the call hasn't yet completed
389      then this method will wait up to *timeout* seconds.  If the call hasn't
390      completed in *timeout* seconds, then a
391      :exc:`TimeoutError` will be raised. *timeout* can be
392      an int or float.  If *timeout* is not specified or ``None``, there is no
393      limit to the wait time.
394
395      If the future is cancelled before completing then :exc:`.CancelledError`
396      will be raised.
397
398      If the call raised an exception, this method will raise the same exception.
399
400   .. method:: exception(timeout=None)
401
402      Return the exception raised by the call.  If the call hasn't yet
403      completed then this method will wait up to *timeout* seconds.  If the
404      call hasn't completed in *timeout* seconds, then a
405      :exc:`TimeoutError` will be raised.  *timeout* can be
406      an int or float.  If *timeout* is not specified or ``None``, there is no
407      limit to the wait time.
408
409      If the future is cancelled before completing then :exc:`.CancelledError`
410      will be raised.
411
412      If the call completed without raising, ``None`` is returned.
413
414   .. method:: add_done_callback(fn)
415
416      Attaches the callable *fn* to the future.  *fn* will be called, with the
417      future as its only argument, when the future is cancelled or finishes
418      running.
419
420      Added callables are called in the order that they were added and are
421      always called in a thread belonging to the process that added them.  If
422      the callable raises an :exc:`Exception` subclass, it will be logged and
423      ignored.  If the callable raises a :exc:`BaseException` subclass, the
424      behavior is undefined.
425
426      If the future has already completed or been cancelled, *fn* will be
427      called immediately.
428
429   The following :class:`Future` methods are meant for use in unit tests and
430   :class:`Executor` implementations.
431
432   .. method:: set_running_or_notify_cancel()
433
434      This method should only be called by :class:`Executor` implementations
435      before executing the work associated with the :class:`Future` and by unit
436      tests.
437
438      If the method returns ``False`` then the :class:`Future` was cancelled,
439      i.e. :meth:`Future.cancel` was called and returned ``True``.  Any threads
440      waiting on the :class:`Future` completing (i.e. through
441      :func:`as_completed` or :func:`wait`) will be woken up.
442
443      If the method returns ``True`` then the :class:`Future` was not cancelled
444      and has been put in the running state, i.e. calls to
445      :meth:`Future.running` will return ``True``.
446
447      This method can only be called once and cannot be called after
448      :meth:`Future.set_result` or :meth:`Future.set_exception` have been
449      called.
450
451   .. method:: set_result(result)
452
453      Sets the result of the work associated with the :class:`Future` to
454      *result*.
455
456      This method should only be used by :class:`Executor` implementations and
457      unit tests.
458
459      .. versionchanged:: 3.8
460         This method raises
461         :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
462         already done.
463
464   .. method:: set_exception(exception)
465
466      Sets the result of the work associated with the :class:`Future` to the
467      :class:`Exception` *exception*.
468
469      This method should only be used by :class:`Executor` implementations and
470      unit tests.
471
472      .. versionchanged:: 3.8
473         This method raises
474         :exc:`concurrent.futures.InvalidStateError` if the :class:`Future` is
475         already done.
476
477Module Functions
478----------------
479
480.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
481
482   Wait for the :class:`Future` instances (possibly created by different
483   :class:`Executor` instances) given by *fs* to complete. Duplicate futures
484   given to *fs* are removed and will be returned only once. Returns a named
485   2-tuple of sets.  The first set, named ``done``, contains the futures that
486   completed (finished or cancelled futures) before the wait completed.  The
487   second set, named ``not_done``, contains the futures that did not complete
488   (pending or running futures).
489
490   *timeout* can be used to control the maximum number of seconds to wait before
491   returning.  *timeout* can be an int or float.  If *timeout* is not specified
492   or ``None``, there is no limit to the wait time.
493
494   *return_when* indicates when this function should return.  It must be one of
495   the following constants:
496
497   .. list-table::
498      :header-rows: 1
499
500      * - Constant
501        - Description
502
503      * - .. data:: FIRST_COMPLETED
504        - The function will return when any future finishes or is cancelled.
505
506      * - .. data:: FIRST_EXCEPTION
507        - The function will return when any future finishes by raising an
508          exception. If no future raises an exception
509          then it is equivalent to :const:`ALL_COMPLETED`.
510
511      * - .. data:: ALL_COMPLETED
512        - The function will return when all futures finish or are cancelled.
513
514.. function:: as_completed(fs, timeout=None)
515
516   Returns an iterator over the :class:`Future` instances (possibly created by
517   different :class:`Executor` instances) given by *fs* that yields futures as
518   they complete (finished or cancelled futures). Any futures given by *fs* that
519   are duplicated will be returned once. Any futures that completed before
520   :func:`as_completed` is called will be yielded first.  The returned iterator
521   raises a :exc:`TimeoutError` if :meth:`~iterator.__next__`
522   is called and the result isn't available after *timeout* seconds from the
523   original call to :func:`as_completed`.  *timeout* can be an int or float. If
524   *timeout* is not specified or ``None``, there is no limit to the wait time.
525
526
527.. seealso::
528
529   :pep:`3148` -- futures - execute computations asynchronously
530      The proposal which described this feature for inclusion in the Python
531      standard library.
532
533
534Exception classes
535-----------------
536
537.. currentmodule:: concurrent.futures
538
539.. exception:: CancelledError
540
541   Raised when a future is cancelled.
542
543.. exception:: TimeoutError
544
545   A deprecated alias of :exc:`TimeoutError`,
546   raised when a future operation exceeds the given timeout.
547
548   .. versionchanged:: 3.11
549
550      This class was made an alias of :exc:`TimeoutError`.
551
552
553.. exception:: BrokenExecutor
554
555   Derived from :exc:`RuntimeError`, this exception class is raised
556   when an executor is broken for some reason, and cannot be used
557   to submit or execute new tasks.
558
559   .. versionadded:: 3.7
560
561.. exception:: InvalidStateError
562
563   Raised when an operation is performed on a future that is not allowed
564   in the current state.
565
566   .. versionadded:: 3.8
567
568.. currentmodule:: concurrent.futures.thread
569
570.. exception:: BrokenThreadPool
571
572   Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
573   class is raised when one of the workers
574   of a :class:`~concurrent.futures.ThreadPoolExecutor`
575   has failed initializing.
576
577   .. versionadded:: 3.7
578
579.. currentmodule:: concurrent.futures.process
580
581.. exception:: BrokenProcessPool
582
583   Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
584   :exc:`RuntimeError`), this exception class is raised when one of the
585   workers of a :class:`~concurrent.futures.ProcessPoolExecutor`
586   has terminated in a non-clean
587   fashion (for example, if it was killed from the outside).
588
589   .. versionadded:: 3.3
590