• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import logging
8import threading
9import time
10import types
11
12FIRST_COMPLETED = 'FIRST_COMPLETED'
13FIRST_EXCEPTION = 'FIRST_EXCEPTION'
14ALL_COMPLETED = 'ALL_COMPLETED'
15_AS_COMPLETED = '_AS_COMPLETED'
16
17# Possible future states (for internal use by the futures package).
18PENDING = 'PENDING'
19RUNNING = 'RUNNING'
20# The future was cancelled by the user...
21CANCELLED = 'CANCELLED'
22# ...and _Waiter.add_cancelled() was called by a worker.
23CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
24FINISHED = 'FINISHED'
25
26_FUTURE_STATES = [
27    PENDING,
28    RUNNING,
29    CANCELLED,
30    CANCELLED_AND_NOTIFIED,
31    FINISHED
32]
33
34_STATE_TO_DESCRIPTION_MAP = {
35    PENDING: "pending",
36    RUNNING: "running",
37    CANCELLED: "cancelled",
38    CANCELLED_AND_NOTIFIED: "cancelled",
39    FINISHED: "finished"
40}
41
42# Logger for internal use by the futures package.
43LOGGER = logging.getLogger("concurrent.futures")
44
45class Error(Exception):
46    """Base class for all future-related exceptions."""
47    pass
48
49class CancelledError(Error):
50    """The Future was cancelled."""
51    pass
52
53class TimeoutError(Error):
54    """The operation exceeded the given deadline."""
55    pass
56
57class InvalidStateError(Error):
58    """The operation is not allowed in this state."""
59    pass
60
61class _Waiter(object):
62    """Provides the event that wait() and as_completed() block on."""
63    def __init__(self):
64        self.event = threading.Event()
65        self.finished_futures = []
66
67    def add_result(self, future):
68        self.finished_futures.append(future)
69
70    def add_exception(self, future):
71        self.finished_futures.append(future)
72
73    def add_cancelled(self, future):
74        self.finished_futures.append(future)
75
76class _AsCompletedWaiter(_Waiter):
77    """Used by as_completed()."""
78
79    def __init__(self):
80        super(_AsCompletedWaiter, self).__init__()
81        self.lock = threading.Lock()
82
83    def add_result(self, future):
84        with self.lock:
85            super(_AsCompletedWaiter, self).add_result(future)
86            self.event.set()
87
88    def add_exception(self, future):
89        with self.lock:
90            super(_AsCompletedWaiter, self).add_exception(future)
91            self.event.set()
92
93    def add_cancelled(self, future):
94        with self.lock:
95            super(_AsCompletedWaiter, self).add_cancelled(future)
96            self.event.set()
97
98class _FirstCompletedWaiter(_Waiter):
99    """Used by wait(return_when=FIRST_COMPLETED)."""
100
101    def add_result(self, future):
102        super().add_result(future)
103        self.event.set()
104
105    def add_exception(self, future):
106        super().add_exception(future)
107        self.event.set()
108
109    def add_cancelled(self, future):
110        super().add_cancelled(future)
111        self.event.set()
112
113class _AllCompletedWaiter(_Waiter):
114    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
115
116    def __init__(self, num_pending_calls, stop_on_exception):
117        self.num_pending_calls = num_pending_calls
118        self.stop_on_exception = stop_on_exception
119        self.lock = threading.Lock()
120        super().__init__()
121
122    def _decrement_pending_calls(self):
123        with self.lock:
124            self.num_pending_calls -= 1
125            if not self.num_pending_calls:
126                self.event.set()
127
128    def add_result(self, future):
129        super().add_result(future)
130        self._decrement_pending_calls()
131
132    def add_exception(self, future):
133        super().add_exception(future)
134        if self.stop_on_exception:
135            self.event.set()
136        else:
137            self._decrement_pending_calls()
138
139    def add_cancelled(self, future):
140        super().add_cancelled(future)
141        self._decrement_pending_calls()
142
143class _AcquireFutures(object):
144    """A context manager that does an ordered acquire of Future conditions."""
145
146    def __init__(self, futures):
147        self.futures = sorted(futures, key=id)
148
149    def __enter__(self):
150        for future in self.futures:
151            future._condition.acquire()
152
153    def __exit__(self, *args):
154        for future in self.futures:
155            future._condition.release()
156
157def _create_and_install_waiters(fs, return_when):
158    if return_when == _AS_COMPLETED:
159        waiter = _AsCompletedWaiter()
160    elif return_when == FIRST_COMPLETED:
161        waiter = _FirstCompletedWaiter()
162    else:
163        pending_count = sum(
164                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
165
166        if return_when == FIRST_EXCEPTION:
167            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
168        elif return_when == ALL_COMPLETED:
169            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
170        else:
171            raise ValueError("Invalid return condition: %r" % return_when)
172
173    for f in fs:
174        f._waiters.append(waiter)
175
176    return waiter
177
178
179def _yield_finished_futures(fs, waiter, ref_collect):
180    """
181    Iterate on the list *fs*, yielding finished futures one by one in
182    reverse order.
183    Before yielding a future, *waiter* is removed from its waiters
184    and the future is removed from each set in the collection of sets
185    *ref_collect*.
186
187    The aim of this function is to avoid keeping stale references after
188    the future is yielded and before the iterator resumes.
189    """
190    while fs:
191        f = fs[-1]
192        for futures_set in ref_collect:
193            futures_set.remove(f)
194        with f._condition:
195            f._waiters.remove(waiter)
196        del f
197        # Careful not to keep a reference to the popped value
198        yield fs.pop()
199
200
201def as_completed(fs, timeout=None):
202    """An iterator over the given futures that yields each as it completes.
203
204    Args:
205        fs: The sequence of Futures (possibly created by different Executors) to
206            iterate over.
207        timeout: The maximum number of seconds to wait. If None, then there
208            is no limit on the wait time.
209
210    Returns:
211        An iterator that yields the given Futures as they complete (finished or
212        cancelled). If any given Futures are duplicated, they will be returned
213        once.
214
215    Raises:
216        TimeoutError: If the entire result iterator could not be generated
217            before the given timeout.
218    """
219    if timeout is not None:
220        end_time = timeout + time.monotonic()
221
222    fs = set(fs)
223    total_futures = len(fs)
224    with _AcquireFutures(fs):
225        finished = set(
226                f for f in fs
227                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
228        pending = fs - finished
229        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
230    finished = list(finished)
231    try:
232        yield from _yield_finished_futures(finished, waiter,
233                                           ref_collect=(fs,))
234
235        while pending:
236            if timeout is None:
237                wait_timeout = None
238            else:
239                wait_timeout = end_time - time.monotonic()
240                if wait_timeout < 0:
241                    raise TimeoutError(
242                            '%d (of %d) futures unfinished' % (
243                            len(pending), total_futures))
244
245            waiter.event.wait(wait_timeout)
246
247            with waiter.lock:
248                finished = waiter.finished_futures
249                waiter.finished_futures = []
250                waiter.event.clear()
251
252            # reverse to keep finishing order
253            finished.reverse()
254            yield from _yield_finished_futures(finished, waiter,
255                                               ref_collect=(fs, pending))
256
257    finally:
258        # Remove waiter from unfinished futures
259        for f in fs:
260            with f._condition:
261                f._waiters.remove(waiter)
262
263DoneAndNotDoneFutures = collections.namedtuple(
264        'DoneAndNotDoneFutures', 'done not_done')
265def wait(fs, timeout=None, return_when=ALL_COMPLETED):
266    """Wait for the futures in the given sequence to complete.
267
268    Args:
269        fs: The sequence of Futures (possibly created by different Executors) to
270            wait upon.
271        timeout: The maximum number of seconds to wait. If None, then there
272            is no limit on the wait time.
273        return_when: Indicates when this function should return. The options
274            are:
275
276            FIRST_COMPLETED - Return when any future finishes or is
277                              cancelled.
278            FIRST_EXCEPTION - Return when any future finishes by raising an
279                              exception. If no future raises an exception
280                              then it is equivalent to ALL_COMPLETED.
281            ALL_COMPLETED -   Return when all futures finish or are cancelled.
282
283    Returns:
284        A named 2-tuple of sets. The first set, named 'done', contains the
285        futures that completed (is finished or cancelled) before the wait
286        completed. The second set, named 'not_done', contains uncompleted
287        futures. Duplicate futures given to *fs* are removed and will be
288        returned only once.
289    """
290    fs = set(fs)
291    with _AcquireFutures(fs):
292        done = {f for f in fs
293                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
294        not_done = fs - done
295        if (return_when == FIRST_COMPLETED) and done:
296            return DoneAndNotDoneFutures(done, not_done)
297        elif (return_when == FIRST_EXCEPTION) and done:
298            if any(f for f in done
299                   if not f.cancelled() and f.exception() is not None):
300                return DoneAndNotDoneFutures(done, not_done)
301
302        if len(done) == len(fs):
303            return DoneAndNotDoneFutures(done, not_done)
304
305        waiter = _create_and_install_waiters(fs, return_when)
306
307    waiter.event.wait(timeout)
308    for f in fs:
309        with f._condition:
310            f._waiters.remove(waiter)
311
312    done.update(waiter.finished_futures)
313    return DoneAndNotDoneFutures(done, fs - done)
314
315class Future(object):
316    """Represents the result of an asynchronous computation."""
317
318    def __init__(self):
319        """Initializes the future. Should not be called by clients."""
320        self._condition = threading.Condition()
321        self._state = PENDING
322        self._result = None
323        self._exception = None
324        self._waiters = []
325        self._done_callbacks = []
326
327    def _invoke_callbacks(self):
328        for callback in self._done_callbacks:
329            try:
330                callback(self)
331            except Exception:
332                LOGGER.exception('exception calling callback for %r', self)
333
334    def __repr__(self):
335        with self._condition:
336            if self._state == FINISHED:
337                if self._exception:
338                    return '<%s at %#x state=%s raised %s>' % (
339                        self.__class__.__name__,
340                        id(self),
341                        _STATE_TO_DESCRIPTION_MAP[self._state],
342                        self._exception.__class__.__name__)
343                else:
344                    return '<%s at %#x state=%s returned %s>' % (
345                        self.__class__.__name__,
346                        id(self),
347                        _STATE_TO_DESCRIPTION_MAP[self._state],
348                        self._result.__class__.__name__)
349            return '<%s at %#x state=%s>' % (
350                    self.__class__.__name__,
351                    id(self),
352                   _STATE_TO_DESCRIPTION_MAP[self._state])
353
354    def cancel(self):
355        """Cancel the future if possible.
356
357        Returns True if the future was cancelled, False otherwise. A future
358        cannot be cancelled if it is running or has already completed.
359        """
360        with self._condition:
361            if self._state in [RUNNING, FINISHED]:
362                return False
363
364            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
365                return True
366
367            self._state = CANCELLED
368            self._condition.notify_all()
369
370        self._invoke_callbacks()
371        return True
372
373    def cancelled(self):
374        """Return True if the future was cancelled."""
375        with self._condition:
376            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
377
378    def running(self):
379        """Return True if the future is currently executing."""
380        with self._condition:
381            return self._state == RUNNING
382
383    def done(self):
384        """Return True of the future was cancelled or finished executing."""
385        with self._condition:
386            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
387
388    def __get_result(self):
389        if self._exception:
390            try:
391                raise self._exception
392            finally:
393                # Break a reference cycle with the exception in self._exception
394                self = None
395        else:
396            return self._result
397
398    def add_done_callback(self, fn):
399        """Attaches a callable that will be called when the future finishes.
400
401        Args:
402            fn: A callable that will be called with this future as its only
403                argument when the future completes or is cancelled. The callable
404                will always be called by a thread in the same process in which
405                it was added. If the future has already completed or been
406                cancelled then the callable will be called immediately. These
407                callables are called in the order that they were added.
408        """
409        with self._condition:
410            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
411                self._done_callbacks.append(fn)
412                return
413        try:
414            fn(self)
415        except Exception:
416            LOGGER.exception('exception calling callback for %r', self)
417
418    def result(self, timeout=None):
419        """Return the result of the call that the future represents.
420
421        Args:
422            timeout: The number of seconds to wait for the result if the future
423                isn't done. If None, then there is no limit on the wait time.
424
425        Returns:
426            The result of the call that the future represents.
427
428        Raises:
429            CancelledError: If the future was cancelled.
430            TimeoutError: If the future didn't finish executing before the given
431                timeout.
432            Exception: If the call raised then that exception will be raised.
433        """
434        try:
435            with self._condition:
436                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
437                    raise CancelledError()
438                elif self._state == FINISHED:
439                    return self.__get_result()
440
441                self._condition.wait(timeout)
442
443                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
444                    raise CancelledError()
445                elif self._state == FINISHED:
446                    return self.__get_result()
447                else:
448                    raise TimeoutError()
449        finally:
450            # Break a reference cycle with the exception in self._exception
451            self = None
452
453    def exception(self, timeout=None):
454        """Return the exception raised by the call that the future represents.
455
456        Args:
457            timeout: The number of seconds to wait for the exception if the
458                future isn't done. If None, then there is no limit on the wait
459                time.
460
461        Returns:
462            The exception raised by the call that the future represents or None
463            if the call completed without raising.
464
465        Raises:
466            CancelledError: If the future was cancelled.
467            TimeoutError: If the future didn't finish executing before the given
468                timeout.
469        """
470
471        with self._condition:
472            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
473                raise CancelledError()
474            elif self._state == FINISHED:
475                return self._exception
476
477            self._condition.wait(timeout)
478
479            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
480                raise CancelledError()
481            elif self._state == FINISHED:
482                return self._exception
483            else:
484                raise TimeoutError()
485
486    # The following methods should only be used by Executors and in tests.
487    def set_running_or_notify_cancel(self):
488        """Mark the future as running or process any cancel notifications.
489
490        Should only be used by Executor implementations and unit tests.
491
492        If the future has been cancelled (cancel() was called and returned
493        True) then any threads waiting on the future completing (though calls
494        to as_completed() or wait()) are notified and False is returned.
495
496        If the future was not cancelled then it is put in the running state
497        (future calls to running() will return True) and True is returned.
498
499        This method should be called by Executor implementations before
500        executing the work associated with this future. If this method returns
501        False then the work should not be executed.
502
503        Returns:
504            False if the Future was cancelled, True otherwise.
505
506        Raises:
507            RuntimeError: if this method was already called or if set_result()
508                or set_exception() was called.
509        """
510        with self._condition:
511            if self._state == CANCELLED:
512                self._state = CANCELLED_AND_NOTIFIED
513                for waiter in self._waiters:
514                    waiter.add_cancelled(self)
515                # self._condition.notify_all() is not necessary because
516                # self.cancel() triggers a notification.
517                return False
518            elif self._state == PENDING:
519                self._state = RUNNING
520                return True
521            else:
522                LOGGER.critical('Future %s in unexpected state: %s',
523                                id(self),
524                                self._state)
525                raise RuntimeError('Future in unexpected state')
526
527    def set_result(self, result):
528        """Sets the return value of work associated with the future.
529
530        Should only be used by Executor implementations and unit tests.
531        """
532        with self._condition:
533            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
534                raise InvalidStateError('{}: {!r}'.format(self._state, self))
535            self._result = result
536            self._state = FINISHED
537            for waiter in self._waiters:
538                waiter.add_result(self)
539            self._condition.notify_all()
540        self._invoke_callbacks()
541
542    def set_exception(self, exception):
543        """Sets the result of the future as being the given exception.
544
545        Should only be used by Executor implementations and unit tests.
546        """
547        with self._condition:
548            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
549                raise InvalidStateError('{}: {!r}'.format(self._state, self))
550            self._exception = exception
551            self._state = FINISHED
552            for waiter in self._waiters:
553                waiter.add_exception(self)
554            self._condition.notify_all()
555        self._invoke_callbacks()
556
557    __class_getitem__ = classmethod(types.GenericAlias)
558
559class Executor(object):
560    """This is an abstract base class for concrete asynchronous executors."""
561
562    def submit(self, fn, /, *args, **kwargs):
563        """Submits a callable to be executed with the given arguments.
564
565        Schedules the callable to be executed as fn(*args, **kwargs) and returns
566        a Future instance representing the execution of the callable.
567
568        Returns:
569            A Future representing the given call.
570        """
571        raise NotImplementedError()
572
573    def map(self, fn, *iterables, timeout=None, chunksize=1):
574        """Returns an iterator equivalent to map(fn, iter).
575
576        Args:
577            fn: A callable that will take as many arguments as there are
578                passed iterables.
579            timeout: The maximum number of seconds to wait. If None, then there
580                is no limit on the wait time.
581            chunksize: The size of the chunks the iterable will be broken into
582                before being passed to a child process. This argument is only
583                used by ProcessPoolExecutor; it is ignored by
584                ThreadPoolExecutor.
585
586        Returns:
587            An iterator equivalent to: map(func, *iterables) but the calls may
588            be evaluated out-of-order.
589
590        Raises:
591            TimeoutError: If the entire result iterator could not be generated
592                before the given timeout.
593            Exception: If fn(*args) raises for any values.
594        """
595        if timeout is not None:
596            end_time = timeout + time.monotonic()
597
598        fs = [self.submit(fn, *args) for args in zip(*iterables)]
599
600        # Yield must be hidden in closure so that the futures are submitted
601        # before the first iterator value is required.
602        def result_iterator():
603            try:
604                # reverse to keep finishing order
605                fs.reverse()
606                while fs:
607                    # Careful not to keep a reference to the popped future
608                    if timeout is None:
609                        yield fs.pop().result()
610                    else:
611                        yield fs.pop().result(end_time - time.monotonic())
612            finally:
613                for future in fs:
614                    future.cancel()
615        return result_iterator()
616
617    def shutdown(self, wait=True, *, cancel_futures=False):
618        """Clean-up the resources associated with the Executor.
619
620        It is safe to call this method several times. Otherwise, no other
621        methods can be called after this one.
622
623        Args:
624            wait: If True then shutdown will not return until all running
625                futures have finished executing and the resources used by the
626                executor have been reclaimed.
627            cancel_futures: If True then shutdown will cancel all pending
628                futures. Futures that are completed or running will not be
629                cancelled.
630        """
631        pass
632
633    def __enter__(self):
634        return self
635
636    def __exit__(self, exc_type, exc_val, exc_tb):
637        self.shutdown(wait=True)
638        return False
639
640
641class BrokenExecutor(RuntimeError):
642    """
643    Raised when a executor has become non-functional after a severe failure.
644    """
645