• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1:mod:`concurrent.futures` --- Launching parallel tasks
2======================================================
3
4.. module:: concurrent.futures
5   :synopsis: Execute computations concurrently using threads or processes.
6
7.. versionadded:: 3.2
8
9**Source code:** :source:`Lib/concurrent/futures/thread.py`
10and :source:`Lib/concurrent/futures/process.py`
11
12--------------
13
14The :mod:`concurrent.futures` module provides a high-level interface for
15asynchronously executing callables.
16
17The asynchronous execution can be performed with threads, using
18:class:`ThreadPoolExecutor`, or separate processes, using
19:class:`ProcessPoolExecutor`.  Both implement the same interface, which is
20defined by the abstract :class:`Executor` class.
21
22
23Executor Objects
24----------------
25
26.. class:: Executor
27
28   An abstract class that provides methods to execute calls asynchronously.  It
29   should not be used directly, but through its concrete subclasses.
30
31    .. method:: submit(fn, *args, **kwargs)
32
33       Schedules the callable, *fn*, to be executed as ``fn(*args **kwargs)``
34       and returns a :class:`Future` object representing the execution of the
35       callable. ::
36
37          with ThreadPoolExecutor(max_workers=1) as executor:
38              future = executor.submit(pow, 323, 1235)
39              print(future.result())
40
41    .. method:: map(func, *iterables, timeout=None, chunksize=1)
42
43       Similar to :func:`map(func, *iterables) <map>` except:
44
45       * the *iterables* are collected immediately rather than lazily;
46
47       * *func* is executed asynchronously and several calls to
48         *func* may be made concurrently.
49
50       The returned iterator raises a :exc:`concurrent.futures.TimeoutError`
51       if :meth:`~iterator.__next__` is called and the result isn't available
52       after *timeout* seconds from the original call to :meth:`Executor.map`.
53       *timeout* can be an int or a float.  If *timeout* is not specified or
54       ``None``, there is no limit to the wait time.
55
56       If a *func* call raises an exception, then that exception will be
57       raised when its value is retrieved from the iterator.
58
59       When using :class:`ProcessPoolExecutor`, this method chops *iterables*
60       into a number of chunks which it submits to the pool as separate
61       tasks.  The (approximate) size of these chunks can be specified by
62       setting *chunksize* to a positive integer.  For very long iterables,
63       using a large value for *chunksize* can significantly improve
64       performance compared to the default size of 1.  With
65       :class:`ThreadPoolExecutor`, *chunksize* has no effect.
66
67       .. versionchanged:: 3.5
68          Added the *chunksize* argument.
69
70    .. method:: shutdown(wait=True)
71
72       Signal the executor that it should free any resources that it is using
73       when the currently pending futures are done executing.  Calls to
74       :meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
75       raise :exc:`RuntimeError`.
76
77       If *wait* is ``True`` then this method will not return until all the
78       pending futures are done executing and the resources associated with the
79       executor have been freed.  If *wait* is ``False`` then this method will
80       return immediately and the resources associated with the executor will be
81       freed when all pending futures are done executing.  Regardless of the
82       value of *wait*, the entire Python program will not exit until all
83       pending futures are done executing.
84
85       You can avoid having to call this method explicitly if you use the
86       :keyword:`with` statement, which will shutdown the :class:`Executor`
87       (waiting as if :meth:`Executor.shutdown` were called with *wait* set to
88       ``True``)::
89
90          import shutil
91          with ThreadPoolExecutor(max_workers=4) as e:
92              e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
93              e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
94              e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
95              e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
96
97
98ThreadPoolExecutor
99------------------
100
101:class:`ThreadPoolExecutor` is an :class:`Executor` subclass that uses a pool of
102threads to execute calls asynchronously.
103
104Deadlocks can occur when the callable associated with a :class:`Future` waits on
105the results of another :class:`Future`.  For example::
106
107   import time
108   def wait_on_b():
109       time.sleep(5)
110       print(b.result())  # b will never complete because it is waiting on a.
111       return 5
112
113   def wait_on_a():
114       time.sleep(5)
115       print(a.result())  # a will never complete because it is waiting on b.
116       return 6
117
118
119   executor = ThreadPoolExecutor(max_workers=2)
120   a = executor.submit(wait_on_b)
121   b = executor.submit(wait_on_a)
122
123And::
124
125   def wait_on_future():
126       f = executor.submit(pow, 5, 2)
127       # This will never complete because there is only one worker thread and
128       # it is executing this function.
129       print(f.result())
130
131   executor = ThreadPoolExecutor(max_workers=1)
132   executor.submit(wait_on_future)
133
134
135.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
136
137   An :class:`Executor` subclass that uses a pool of at most *max_workers*
138   threads to execute calls asynchronously.
139
140   *initializer* is an optional callable that is called at the start of
141   each worker thread; *initargs* is a tuple of arguments passed to the
142   initializer.  Should *initializer* raise an exception, all currently
143   pending jobs will raise a :exc:`~concurrent.futures.thread.BrokenThreadPool`,
144   as well as any attempt to submit more jobs to the pool.
145
146   .. versionchanged:: 3.5
147      If *max_workers* is ``None`` or
148      not given, it will default to the number of processors on the machine,
149      multiplied by ``5``, assuming that :class:`ThreadPoolExecutor` is often
150      used to overlap I/O instead of CPU work and the number of workers
151      should be higher than the number of workers
152      for :class:`ProcessPoolExecutor`.
153
154   .. versionadded:: 3.6
155      The *thread_name_prefix* argument was added to allow users to
156      control the :class:`threading.Thread` names for worker threads created by
157      the pool for easier debugging.
158
159   .. versionchanged:: 3.7
160      Added the *initializer* and *initargs* arguments.
161
162
163.. _threadpoolexecutor-example:
164
165ThreadPoolExecutor Example
166~~~~~~~~~~~~~~~~~~~~~~~~~~
167::
168
169   import concurrent.futures
170   import urllib.request
171
172   URLS = ['http://www.foxnews.com/',
173           'http://www.cnn.com/',
174           'http://europe.wsj.com/',
175           'http://www.bbc.co.uk/',
176           'http://some-made-up-domain.com/']
177
178   # Retrieve a single page and report the URL and contents
179   def load_url(url, timeout):
180       with urllib.request.urlopen(url, timeout=timeout) as conn:
181           return conn.read()
182
183   # We can use a with statement to ensure threads are cleaned up promptly
184   with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
185       # Start the load operations and mark each future with its URL
186       future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
187       for future in concurrent.futures.as_completed(future_to_url):
188           url = future_to_url[future]
189           try:
190               data = future.result()
191           except Exception as exc:
192               print('%r generated an exception: %s' % (url, exc))
193           else:
194               print('%r page is %d bytes' % (url, len(data)))
195
196
197ProcessPoolExecutor
198-------------------
199
200The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
201uses a pool of processes to execute calls asynchronously.
202:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
203allows it to side-step the :term:`Global Interpreter Lock` but also means that
204only picklable objects can be executed and returned.
205
206The ``__main__`` module must be importable by worker subprocesses. This means
207that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
208
209Calling :class:`Executor` or :class:`Future` methods from a callable submitted
210to a :class:`ProcessPoolExecutor` will result in deadlock.
211
212.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
213
214   An :class:`Executor` subclass that executes calls asynchronously using a pool
215   of at most *max_workers* processes.  If *max_workers* is ``None`` or not
216   given, it will default to the number of processors on the machine.
217   If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError`
218   will be raised.
219   *mp_context* can be a multiprocessing context or None. It will be used to
220   launch the workers. If *mp_context* is ``None`` or not given, the default
221   multiprocessing context is used.
222
223   *initializer* is an optional callable that is called at the start of
224   each worker process; *initargs* is a tuple of arguments passed to the
225   initializer.  Should *initializer* raise an exception, all currently
226   pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
227   as well any attempt to submit more jobs to the pool.
228
229   .. versionchanged:: 3.3
230      When one of the worker processes terminates abruptly, a
231      :exc:`BrokenProcessPool` error is now raised.  Previously, behaviour
232      was undefined but operations on the executor or its futures would often
233      freeze or deadlock.
234
235   .. versionchanged:: 3.7
236      The *mp_context* argument was added to allow users to control the
237      start_method for worker processes created by the pool.
238
239      Added the *initializer* and *initargs* arguments.
240
241
242.. _processpoolexecutor-example:
243
244ProcessPoolExecutor Example
245~~~~~~~~~~~~~~~~~~~~~~~~~~~
246::
247
248   import concurrent.futures
249   import math
250
251   PRIMES = [
252       112272535095293,
253       112582705942171,
254       112272535095293,
255       115280095190773,
256       115797848077099,
257       1099726899285419]
258
259   def is_prime(n):
260       if n % 2 == 0:
261           return False
262
263       sqrt_n = int(math.floor(math.sqrt(n)))
264       for i in range(3, sqrt_n + 1, 2):
265           if n % i == 0:
266               return False
267       return True
268
269   def main():
270       with concurrent.futures.ProcessPoolExecutor() as executor:
271           for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
272               print('%d is prime: %s' % (number, prime))
273
274   if __name__ == '__main__':
275       main()
276
277
278Future Objects
279--------------
280
281The :class:`Future` class encapsulates the asynchronous execution of a callable.
282:class:`Future` instances are created by :meth:`Executor.submit`.
283
284.. class:: Future
285
286   Encapsulates the asynchronous execution of a callable.  :class:`Future`
287   instances are created by :meth:`Executor.submit` and should not be created
288   directly except for testing.
289
290    .. method:: cancel()
291
292       Attempt to cancel the call.  If the call is currently being executed and
293       cannot be cancelled then the method will return ``False``, otherwise the
294       call will be cancelled and the method will return ``True``.
295
296    .. method:: cancelled()
297
298       Return ``True`` if the call was successfully cancelled.
299
300    .. method:: running()
301
302       Return ``True`` if the call is currently being executed and cannot be
303       cancelled.
304
305    .. method:: done()
306
307       Return ``True`` if the call was successfully cancelled or finished
308       running.
309
310    .. method:: result(timeout=None)
311
312       Return the value returned by the call. If the call hasn't yet completed
313       then this method will wait up to *timeout* seconds.  If the call hasn't
314       completed in *timeout* seconds, then a
315       :exc:`concurrent.futures.TimeoutError` will be raised. *timeout* can be
316       an int or float.  If *timeout* is not specified or ``None``, there is no
317       limit to the wait time.
318
319       If the future is cancelled before completing then :exc:`.CancelledError`
320       will be raised.
321
322       If the call raised, this method will raise the same exception.
323
324    .. method:: exception(timeout=None)
325
326       Return the exception raised by the call.  If the call hasn't yet
327       completed then this method will wait up to *timeout* seconds.  If the
328       call hasn't completed in *timeout* seconds, then a
329       :exc:`concurrent.futures.TimeoutError` will be raised.  *timeout* can be
330       an int or float.  If *timeout* is not specified or ``None``, there is no
331       limit to the wait time.
332
333       If the future is cancelled before completing then :exc:`.CancelledError`
334       will be raised.
335
336       If the call completed without raising, ``None`` is returned.
337
338    .. method:: add_done_callback(fn)
339
340       Attaches the callable *fn* to the future.  *fn* will be called, with the
341       future as its only argument, when the future is cancelled or finishes
342       running.
343
344       Added callables are called in the order that they were added and are
345       always called in a thread belonging to the process that added them.  If
346       the callable raises an :exc:`Exception` subclass, it will be logged and
347       ignored.  If the callable raises a :exc:`BaseException` subclass, the
348       behavior is undefined.
349
350       If the future has already completed or been cancelled, *fn* will be
351       called immediately.
352
353   The following :class:`Future` methods are meant for use in unit tests and
354   :class:`Executor` implementations.
355
356    .. method:: set_running_or_notify_cancel()
357
358       This method should only be called by :class:`Executor` implementations
359       before executing the work associated with the :class:`Future` and by unit
360       tests.
361
362       If the method returns ``False`` then the :class:`Future` was cancelled,
363       i.e. :meth:`Future.cancel` was called and returned `True`.  Any threads
364       waiting on the :class:`Future` completing (i.e. through
365       :func:`as_completed` or :func:`wait`) will be woken up.
366
367       If the method returns ``True`` then the :class:`Future` was not cancelled
368       and has been put in the running state, i.e. calls to
369       :meth:`Future.running` will return `True`.
370
371       This method can only be called once and cannot be called after
372       :meth:`Future.set_result` or :meth:`Future.set_exception` have been
373       called.
374
375    .. method:: set_result(result)
376
377       Sets the result of the work associated with the :class:`Future` to
378       *result*.
379
380       This method should only be used by :class:`Executor` implementations and
381       unit tests.
382
383    .. method:: set_exception(exception)
384
385       Sets the result of the work associated with the :class:`Future` to the
386       :class:`Exception` *exception*.
387
388       This method should only be used by :class:`Executor` implementations and
389       unit tests.
390
391
392Module Functions
393----------------
394
395.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
396
397   Wait for the :class:`Future` instances (possibly created by different
398   :class:`Executor` instances) given by *fs* to complete.  Returns a named
399   2-tuple of sets.  The first set, named ``done``, contains the futures that
400   completed (finished or were cancelled) before the wait completed.  The second
401   set, named ``not_done``, contains uncompleted futures.
402
403   *timeout* can be used to control the maximum number of seconds to wait before
404   returning.  *timeout* can be an int or float.  If *timeout* is not specified
405   or ``None``, there is no limit to the wait time.
406
407   *return_when* indicates when this function should return.  It must be one of
408   the following constants:
409
410   .. tabularcolumns:: |l|L|
411
412   +-----------------------------+----------------------------------------+
413   | Constant                    | Description                            |
414   +=============================+========================================+
415   | :const:`FIRST_COMPLETED`    | The function will return when any      |
416   |                             | future finishes or is cancelled.       |
417   +-----------------------------+----------------------------------------+
418   | :const:`FIRST_EXCEPTION`    | The function will return when any      |
419   |                             | future finishes by raising an          |
420   |                             | exception.  If no future raises an     |
421   |                             | exception then it is equivalent to     |
422   |                             | :const:`ALL_COMPLETED`.                |
423   +-----------------------------+----------------------------------------+
424   | :const:`ALL_COMPLETED`      | The function will return when all      |
425   |                             | futures finish or are cancelled.       |
426   +-----------------------------+----------------------------------------+
427
428.. function:: as_completed(fs, timeout=None)
429
430   Returns an iterator over the :class:`Future` instances (possibly created by
431   different :class:`Executor` instances) given by *fs* that yields futures as
432   they complete (finished or were cancelled). Any futures given by *fs* that
433   are duplicated will be returned once. Any futures that completed before
434   :func:`as_completed` is called will be yielded first.  The returned iterator
435   raises a :exc:`concurrent.futures.TimeoutError` if :meth:`~iterator.__next__`
436   is called and the result isn't available after *timeout* seconds from the
437   original call to :func:`as_completed`.  *timeout* can be an int or float. If
438   *timeout* is not specified or ``None``, there is no limit to the wait time.
439
440
441.. seealso::
442
443   :pep:`3148` -- futures - execute computations asynchronously
444      The proposal which described this feature for inclusion in the Python
445      standard library.
446
447
448Exception classes
449-----------------
450
451.. currentmodule:: concurrent.futures
452
453.. exception:: CancelledError
454
455   Raised when a future is cancelled.
456
457.. exception:: TimeoutError
458
459   Raised when a future operation exceeds the given timeout.
460
461.. exception:: BrokenExecutor
462
463   Derived from :exc:`RuntimeError`, this exception class is raised
464   when an executor is broken for some reason, and cannot be used
465   to submit or execute new tasks.
466
467   .. versionadded:: 3.7
468
469.. currentmodule:: concurrent.futures.thread
470
471.. exception:: BrokenThreadPool
472
473   Derived from :exc:`~concurrent.futures.BrokenExecutor`, this exception
474   class is raised when one of the workers of a :class:`ThreadPoolExecutor`
475   has failed initializing.
476
477   .. versionadded:: 3.7
478
479.. currentmodule:: concurrent.futures.process
480
481.. exception:: BrokenProcessPool
482
483   Derived from :exc:`~concurrent.futures.BrokenExecutor` (formerly
484   :exc:`RuntimeError`), this exception class is raised when one of the
485   workers of a :class:`ProcessPoolExecutor` has terminated in a non-clean
486   fashion (for example, if it was killed from the outside).
487
488   .. versionadded:: 3.3
489