• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import logging
8import threading
9import time
10import types
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
15_AS_COMPLETED = '_AS_COMPLETED'
16
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27    PENDING,
28    RUNNING,
29    CANCELLED,
30    CANCELLED_AND_NOTIFIED,
31    FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35    PENDING: "pending",
36    RUNNING: "running",
37    CANCELLED: "cancelled",
38    CANCELLED_AND_NOTIFIED: "cancelled",
39    FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
44
45class Error(Exception):
46    """Base class for all future-related exceptions."""
47    pass
48
49class CancelledError(Error):
50    """The Future was cancelled."""
51    pass
52
53class TimeoutError(Error):
54    """The operation exceeded the given deadline."""
55    pass
56
57class InvalidStateError(Error):
58    """The operation is not allowed in this state."""
59    pass
60
61class _Waiter(object):
62    """Provides the event that wait() and as_completed() block on."""
63    def __init__(self):
64        self.event = threading.Event()
65        self.finished_futures = []
66
67    def add_result(self, future):
68        self.finished_futures.append(future)
69
70    def add_exception(self, future):
71        self.finished_futures.append(future)
72
73    def add_cancelled(self, future):
74        self.finished_futures.append(future)
75
76class _AsCompletedWaiter(_Waiter):
77    """Used by as_completed()."""
78
79    def __init__(self):
80        super(_AsCompletedWaiter, self).__init__()
81        self.lock = threading.Lock()
82
83    def add_result(self, future):
84        with self.lock:
85            super(_AsCompletedWaiter, self).add_result(future)
86            self.event.set()
87
88    def add_exception(self, future):
89        with self.lock:
90            super(_AsCompletedWaiter, self).add_exception(future)
91            self.event.set()
92
93    def add_cancelled(self, future):
94        with self.lock:
95            super(_AsCompletedWaiter, self).add_cancelled(future)
96            self.event.set()
97
98class _FirstCompletedWaiter(_Waiter):
99    """Used by wait(return_when=FIRST_COMPLETED)."""
100
101    def add_result(self, future):
102        super().add_result(future)
103        self.event.set()
104
105    def add_exception(self, future):
106        super().add_exception(future)
107        self.event.set()
108
109    def add_cancelled(self, future):
110        super().add_cancelled(future)
111        self.event.set()
112
113class _AllCompletedWaiter(_Waiter):
114    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
115
116    def __init__(self, num_pending_calls, stop_on_exception):
117        self.num_pending_calls = num_pending_calls
118        self.stop_on_exception = stop_on_exception
119        self.lock = threading.Lock()
120        super().__init__()
121
122    def _decrement_pending_calls(self):
123        with self.lock:
124            self.num_pending_calls -= 1
125            if not self.num_pending_calls:
126                self.event.set()
127
128    def add_result(self, future):
129        super().add_result(future)
130        self._decrement_pending_calls()
131
132    def add_exception(self, future):
133        super().add_exception(future)
134        if self.stop_on_exception:
135            self.event.set()
136        else:
137            self._decrement_pending_calls()
138
139    def add_cancelled(self, future):
140        super().add_cancelled(future)
141        self._decrement_pending_calls()
142
143class _AcquireFutures(object):
144    """A context manager that does an ordered acquire of Future conditions."""
145
146    def __init__(self, futures):
147        self.futures = sorted(futures, key=id)
148
149    def __enter__(self):
150        for future in self.futures:
151            future._condition.acquire()
152
153    def __exit__(self, *args):
154        for future in self.futures:
155            future._condition.release()
156
157def _create_and_install_waiters(fs, return_when):
158    if return_when == _AS_COMPLETED:
159        waiter = _AsCompletedWaiter()
160    elif return_when == FIRST_COMPLETED:
161        waiter = _FirstCompletedWaiter()
162    else:
163        pending_count = sum(
164                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
165
166        if return_when == FIRST_EXCEPTION:
167            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
168        elif return_when == ALL_COMPLETED:
169            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
170        else:
171            raise ValueError("Invalid return condition: %r" % return_when)
172
173    for f in fs:
174        f._waiters.append(waiter)
175
176    return waiter
177
178
179def _yield_finished_futures(fs, waiter, ref_collect):
180    """
181    Iterate on the list *fs*, yielding finished futures one by one in
182    reverse order.
183    Before yielding a future, *waiter* is removed from its waiters
184    and the future is removed from each set in the collection of sets
185    *ref_collect*.
186
187    The aim of this function is to avoid keeping stale references after
188    the future is yielded and before the iterator resumes.
189    """
190    while fs:
191        f = fs[-1]
192        for futures_set in ref_collect:
193            futures_set.remove(f)
194        with f._condition:
195            f._waiters.remove(waiter)
196        del f
197        # Careful not to keep a reference to the popped value
198        yield fs.pop()
199
200
201def as_completed(fs, timeout=None):
202    """An iterator over the given futures that yields each as it completes.
203
204    Args:
205        fs: The sequence of Futures (possibly created by different Executors) to
206            iterate over.
207        timeout: The maximum number of seconds to wait. If None, then there
208            is no limit on the wait time.
209
210    Returns:
211        An iterator that yields the given Futures as they complete (finished or
212        cancelled). If any given Futures are duplicated, they will be returned
213        once.
214
215    Raises:
216        TimeoutError: If the entire result iterator could not be generated
217            before the given timeout.
218    """
219    if timeout is not None:
220        end_time = timeout + time.monotonic()
221
222    fs = set(fs)
223    total_futures = len(fs)
224    with _AcquireFutures(fs):
225        finished = set(
226                f for f in fs
227                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
228        pending = fs - finished
229        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
230    finished = list(finished)
231    try:
232        yield from _yield_finished_futures(finished, waiter,
233                                           ref_collect=(fs,))
234
235        while pending:
236            if timeout is None:
237                wait_timeout = None
238            else:
239                wait_timeout = end_time - time.monotonic()
240                if wait_timeout < 0:
241                    raise TimeoutError(
242                            '%d (of %d) futures unfinished' % (
243                            len(pending), total_futures))
244
245            waiter.event.wait(wait_timeout)
246
247            with waiter.lock:
248                finished = waiter.finished_futures
249                waiter.finished_futures = []
250                waiter.event.clear()
251
252            # reverse to keep finishing order
253            finished.reverse()
254            yield from _yield_finished_futures(finished, waiter,
255                                               ref_collect=(fs, pending))
256
257    finally:
258        # Remove waiter from unfinished futures
259        for f in fs:
260            with f._condition:
261                f._waiters.remove(waiter)
262
263DoneAndNotDoneFutures = collections.namedtuple(
264        'DoneAndNotDoneFutures', 'done not_done')
265def wait(fs, timeout=None, return_when=ALL_COMPLETED):
266    """Wait for the futures in the given sequence to complete.
267
268    Args:
269        fs: The sequence of Futures (possibly created by different Executors) to
270            wait upon.
271        timeout: The maximum number of seconds to wait. If None, then there
272            is no limit on the wait time.
273        return_when: Indicates when this function should return. The options
274            are:
275
276            FIRST_COMPLETED - Return when any future finishes or is
277                              cancelled.
278            FIRST_EXCEPTION - Return when any future finishes by raising an
279                              exception. If no future raises an exception
280                              then it is equivalent to ALL_COMPLETED.
281            ALL_COMPLETED -   Return when all futures finish or are cancelled.
282
283    Returns:
284        A named 2-tuple of sets. The first set, named 'done', contains the
285        futures that completed (is finished or cancelled) before the wait
286        completed. The second set, named 'not_done', contains uncompleted
287        futures.
288    """
289    with _AcquireFutures(fs):
290        done = set(f for f in fs
291                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
292        not_done = set(fs) - done
293
294        if (return_when == FIRST_COMPLETED) and done:
295            return DoneAndNotDoneFutures(done, not_done)
296        elif (return_when == FIRST_EXCEPTION) and done:
297            if any(f for f in done
298                   if not f.cancelled() and f.exception() is not None):
299                return DoneAndNotDoneFutures(done, not_done)
300
301        if len(done) == len(fs):
302            return DoneAndNotDoneFutures(done, not_done)
303
304        waiter = _create_and_install_waiters(fs, return_when)
305
306    waiter.event.wait(timeout)
307    for f in fs:
308        with f._condition:
309            f._waiters.remove(waiter)
310
311    done.update(waiter.finished_futures)
312    return DoneAndNotDoneFutures(done, set(fs) - done)
313
314class Future(object):
315    """Represents the result of an asynchronous computation."""
316
317    def __init__(self):
318        """Initializes the future. Should not be called by clients."""
319        self._condition = threading.Condition()
320        self._state = PENDING
321        self._result = None
322        self._exception = None
323        self._waiters = []
324        self._done_callbacks = []
325
326    def _invoke_callbacks(self):
327        for callback in self._done_callbacks:
328            try:
329                callback(self)
330            except Exception:
331                LOGGER.exception('exception calling callback for %r', self)
332
333    def __repr__(self):
334        with self._condition:
335            if self._state == FINISHED:
336                if self._exception:
337                    return '<%s at %#x state=%s raised %s>' % (
338                        self.__class__.__name__,
339                        id(self),
340                        _STATE_TO_DESCRIPTION_MAP[self._state],
341                        self._exception.__class__.__name__)
342                else:
343                    return '<%s at %#x state=%s returned %s>' % (
344                        self.__class__.__name__,
345                        id(self),
346                        _STATE_TO_DESCRIPTION_MAP[self._state],
347                        self._result.__class__.__name__)
348            return '<%s at %#x state=%s>' % (
349                    self.__class__.__name__,
350                    id(self),
351                   _STATE_TO_DESCRIPTION_MAP[self._state])
352
353    def cancel(self):
354        """Cancel the future if possible.
355
356        Returns True if the future was cancelled, False otherwise. A future
357        cannot be cancelled if it is running or has already completed.
358        """
359        with self._condition:
360            if self._state in [RUNNING, FINISHED]:
361                return False
362
363            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
364                return True
365
366            self._state = CANCELLED
367            self._condition.notify_all()
368
369        self._invoke_callbacks()
370        return True
371
372    def cancelled(self):
373        """Return True if the future was cancelled."""
374        with self._condition:
375            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
376
377    def running(self):
378        """Return True if the future is currently executing."""
379        with self._condition:
380            return self._state == RUNNING
381
382    def done(self):
383        """Return True of the future was cancelled or finished executing."""
384        with self._condition:
385            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
386
387    def __get_result(self):
388        if self._exception:
389            raise self._exception
390        else:
391            return self._result
392
393    def add_done_callback(self, fn):
394        """Attaches a callable that will be called when the future finishes.
395
396        Args:
397            fn: A callable that will be called with this future as its only
398                argument when the future completes or is cancelled. The callable
399                will always be called by a thread in the same process in which
400                it was added. If the future has already completed or been
401                cancelled then the callable will be called immediately. These
402                callables are called in the order that they were added.
403        """
404        with self._condition:
405            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
406                self._done_callbacks.append(fn)
407                return
408        try:
409            fn(self)
410        except Exception:
411            LOGGER.exception('exception calling callback for %r', self)
412
413    def result(self, timeout=None):
414        """Return the result of the call that the future represents.
415
416        Args:
417            timeout: The number of seconds to wait for the result if the future
418                isn't done. If None, then there is no limit on the wait time.
419
420        Returns:
421            The result of the call that the future represents.
422
423        Raises:
424            CancelledError: If the future was cancelled.
425            TimeoutError: If the future didn't finish executing before the given
426                timeout.
427            Exception: If the call raised then that exception will be raised.
428        """
429        with self._condition:
430            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
431                raise CancelledError()
432            elif self._state == FINISHED:
433                return self.__get_result()
434
435            self._condition.wait(timeout)
436
437            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
438                raise CancelledError()
439            elif self._state == FINISHED:
440                return self.__get_result()
441            else:
442                raise TimeoutError()
443
444    def exception(self, timeout=None):
445        """Return the exception raised by the call that the future represents.
446
447        Args:
448            timeout: The number of seconds to wait for the exception if the
449                future isn't done. If None, then there is no limit on the wait
450                time.
451
452        Returns:
453            The exception raised by the call that the future represents or None
454            if the call completed without raising.
455
456        Raises:
457            CancelledError: If the future was cancelled.
458            TimeoutError: If the future didn't finish executing before the given
459                timeout.
460        """
461
462        with self._condition:
463            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
464                raise CancelledError()
465            elif self._state == FINISHED:
466                return self._exception
467
468            self._condition.wait(timeout)
469
470            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
471                raise CancelledError()
472            elif self._state == FINISHED:
473                return self._exception
474            else:
475                raise TimeoutError()
476
477    # The following methods should only be used by Executors and in tests.
478    def set_running_or_notify_cancel(self):
479        """Mark the future as running or process any cancel notifications.
480
481        Should only be used by Executor implementations and unit tests.
482
483        If the future has been cancelled (cancel() was called and returned
484        True) then any threads waiting on the future completing (though calls
485        to as_completed() or wait()) are notified and False is returned.
486
487        If the future was not cancelled then it is put in the running state
488        (future calls to running() will return True) and True is returned.
489
490        This method should be called by Executor implementations before
491        executing the work associated with this future. If this method returns
492        False then the work should not be executed.
493
494        Returns:
495            False if the Future was cancelled, True otherwise.
496
497        Raises:
498            RuntimeError: if this method was already called or if set_result()
499                or set_exception() was called.
500        """
501        with self._condition:
502            if self._state == CANCELLED:
503                self._state = CANCELLED_AND_NOTIFIED
504                for waiter in self._waiters:
505                    waiter.add_cancelled(self)
506                # self._condition.notify_all() is not necessary because
507                # self.cancel() triggers a notification.
508                return False
509            elif self._state == PENDING:
510                self._state = RUNNING
511                return True
512            else:
513                LOGGER.critical('Future %s in unexpected state: %s',
514                                id(self),
515                                self._state)
516                raise RuntimeError('Future in unexpected state')
517
518    def set_result(self, result):
519        """Sets the return value of work associated with the future.
520
521        Should only be used by Executor implementations and unit tests.
522        """
523        with self._condition:
524            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
525                raise InvalidStateError('{}: {!r}'.format(self._state, self))
526            self._result = result
527            self._state = FINISHED
528            for waiter in self._waiters:
529                waiter.add_result(self)
530            self._condition.notify_all()
531        self._invoke_callbacks()
532
533    def set_exception(self, exception):
534        """Sets the result of the future as being the given exception.
535
536        Should only be used by Executor implementations and unit tests.
537        """
538        with self._condition:
539            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
540                raise InvalidStateError('{}: {!r}'.format(self._state, self))
541            self._exception = exception
542            self._state = FINISHED
543            for waiter in self._waiters:
544                waiter.add_exception(self)
545            self._condition.notify_all()
546        self._invoke_callbacks()
547
548    __class_getitem__ = classmethod(types.GenericAlias)
549
550class Executor(object):
551    """This is an abstract base class for concrete asynchronous executors."""
552
553    def submit(self, fn, /, *args, **kwargs):
554        """Submits a callable to be executed with the given arguments.
555
556        Schedules the callable to be executed as fn(*args, **kwargs) and returns
557        a Future instance representing the execution of the callable.
558
559        Returns:
560            A Future representing the given call.
561        """
562        raise NotImplementedError()
563
564    def map(self, fn, *iterables, timeout=None, chunksize=1):
565        """Returns an iterator equivalent to map(fn, iter).
566
567        Args:
568            fn: A callable that will take as many arguments as there are
569                passed iterables.
570            timeout: The maximum number of seconds to wait. If None, then there
571                is no limit on the wait time.
572            chunksize: The size of the chunks the iterable will be broken into
573                before being passed to a child process. This argument is only
574                used by ProcessPoolExecutor; it is ignored by
575                ThreadPoolExecutor.
576
577        Returns:
578            An iterator equivalent to: map(func, *iterables) but the calls may
579            be evaluated out-of-order.
580
581        Raises:
582            TimeoutError: If the entire result iterator could not be generated
583                before the given timeout.
584            Exception: If fn(*args) raises for any values.
585        """
586        if timeout is not None:
587            end_time = timeout + time.monotonic()
588
589        fs = [self.submit(fn, *args) for args in zip(*iterables)]
590
591        # Yield must be hidden in closure so that the futures are submitted
592        # before the first iterator value is required.
593        def result_iterator():
594            try:
595                # reverse to keep finishing order
596                fs.reverse()
597                while fs:
598                    # Careful not to keep a reference to the popped future
599                    if timeout is None:
600                        yield fs.pop().result()
601                    else:
602                        yield fs.pop().result(end_time - time.monotonic())
603            finally:
604                for future in fs:
605                    future.cancel()
606        return result_iterator()
607
608    def shutdown(self, wait=True, *, cancel_futures=False):
609        """Clean-up the resources associated with the Executor.
610
611        It is safe to call this method several times. Otherwise, no other
612        methods can be called after this one.
613
614        Args:
615            wait: If True then shutdown will not return until all running
616                futures have finished executing and the resources used by the
617                executor have been reclaimed.
618            cancel_futures: If True then shutdown will cancel all pending
619                futures. Futures that are completed or running will not be
620                cancelled.
621        """
622        pass
623
624    def __enter__(self):
625        return self
626
627    def __exit__(self, exc_type, exc_val, exc_tb):
628        self.shutdown(wait=True)
629        return False
630
631
632class BrokenExecutor(RuntimeError):
633    """
634    Raised when a executor has become non-functional after a severe failure.
635    """
636