• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4import collections
5import logging
6import threading
7import itertools
8import time
9import types
10
11__author__ = 'Brian Quinlan (brian@sweetapp.com)'
12
13FIRST_COMPLETED = 'FIRST_COMPLETED'
14FIRST_EXCEPTION = 'FIRST_EXCEPTION'
15ALL_COMPLETED = 'ALL_COMPLETED'
16_AS_COMPLETED = '_AS_COMPLETED'
17
18# Possible future states (for internal use by the futures package).
19PENDING = 'PENDING'
20RUNNING = 'RUNNING'
21# The future was cancelled by the user...
22CANCELLED = 'CANCELLED'
23# ...and _Waiter.add_cancelled() was called by a worker.
24CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
25FINISHED = 'FINISHED'
26
27_FUTURE_STATES = [
28    PENDING,
29    RUNNING,
30    CANCELLED,
31    CANCELLED_AND_NOTIFIED,
32    FINISHED
33]
34
35_STATE_TO_DESCRIPTION_MAP = {
36    PENDING: "pending",
37    RUNNING: "running",
38    CANCELLED: "cancelled",
39    CANCELLED_AND_NOTIFIED: "cancelled",
40    FINISHED: "finished"
41}
42
43# Logger for internal use by the futures package.
44LOGGER = logging.getLogger("concurrent.futures")
45
46class Error(Exception):
47    """Base class for all future-related exceptions."""
48    pass
49
50class CancelledError(Error):
51    """The Future was cancelled."""
52    pass
53
54class TimeoutError(Error):
55    """The operation exceeded the given deadline."""
56    pass
57
58class _Waiter(object):
59    """Provides the event that wait() and as_completed() block on."""
60    def __init__(self):
61        self.event = threading.Event()
62        self.finished_futures = []
63
64    def add_result(self, future):
65        self.finished_futures.append(future)
66
67    def add_exception(self, future):
68        self.finished_futures.append(future)
69
70    def add_cancelled(self, future):
71        self.finished_futures.append(future)
72
73class _AsCompletedWaiter(_Waiter):
74    """Used by as_completed()."""
75
76    def __init__(self):
77        super(_AsCompletedWaiter, self).__init__()
78        self.lock = threading.Lock()
79
80    def add_result(self, future):
81        with self.lock:
82            super(_AsCompletedWaiter, self).add_result(future)
83            self.event.set()
84
85    def add_exception(self, future):
86        with self.lock:
87            super(_AsCompletedWaiter, self).add_exception(future)
88            self.event.set()
89
90    def add_cancelled(self, future):
91        with self.lock:
92            super(_AsCompletedWaiter, self).add_cancelled(future)
93            self.event.set()
94
95class _FirstCompletedWaiter(_Waiter):
96    """Used by wait(return_when=FIRST_COMPLETED)."""
97
98    def add_result(self, future):
99        super(_FirstCompletedWaiter, self).add_result(future)
100        self.event.set()
101
102    def add_exception(self, future):
103        super(_FirstCompletedWaiter, self).add_exception(future)
104        self.event.set()
105
106    def add_cancelled(self, future):
107        super(_FirstCompletedWaiter, self).add_cancelled(future)
108        self.event.set()
109
110class _AllCompletedWaiter(_Waiter):
111    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
112
113    def __init__(self, num_pending_calls, stop_on_exception):
114        self.num_pending_calls = num_pending_calls
115        self.stop_on_exception = stop_on_exception
116        self.lock = threading.Lock()
117        super(_AllCompletedWaiter, self).__init__()
118
119    def _decrement_pending_calls(self):
120        with self.lock:
121            self.num_pending_calls -= 1
122            if not self.num_pending_calls:
123                self.event.set()
124
125    def add_result(self, future):
126        super(_AllCompletedWaiter, self).add_result(future)
127        self._decrement_pending_calls()
128
129    def add_exception(self, future):
130        super(_AllCompletedWaiter, self).add_exception(future)
131        if self.stop_on_exception:
132            self.event.set()
133        else:
134            self._decrement_pending_calls()
135
136    def add_cancelled(self, future):
137        super(_AllCompletedWaiter, self).add_cancelled(future)
138        self._decrement_pending_calls()
139
140class _AcquireFutures(object):
141    """A context manager that does an ordered acquire of Future conditions."""
142
143    def __init__(self, futures):
144        self.futures = sorted(futures, key=id)
145
146    def __enter__(self):
147        for future in self.futures:
148            future._condition.acquire()
149
150    def __exit__(self, *args):
151        for future in self.futures:
152            future._condition.release()
153
154def _create_and_install_waiters(fs, return_when):
155    if return_when == _AS_COMPLETED:
156        waiter = _AsCompletedWaiter()
157    elif return_when == FIRST_COMPLETED:
158        waiter = _FirstCompletedWaiter()
159    else:
160        pending_count = sum(
161                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
162
163        if return_when == FIRST_EXCEPTION:
164            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
165        elif return_when == ALL_COMPLETED:
166            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
167        else:
168            raise ValueError("Invalid return condition: %r" % return_when)
169
170    for f in fs:
171        f._waiters.append(waiter)
172
173    return waiter
174
175
176def _yield_finished_futures(fs, waiter, ref_collect):
177    """
178    Iterate on the list *fs*, yielding finished futures one by one in
179    reverse order.
180    Before yielding a future, *waiter* is removed from its waiters
181    and the future is removed from each set in the collection of sets
182    *ref_collect*.
183
184    The aim of this function is to avoid keeping stale references after
185    the future is yielded and before the iterator resumes.
186    """
187    while fs:
188        f = fs[-1]
189        for futures_set in ref_collect:
190            futures_set.remove(f)
191        with f._condition:
192            f._waiters.remove(waiter)
193        del f
194        # Careful not to keep a reference to the popped value
195        yield fs.pop()
196
197
198def as_completed(fs, timeout=None):
199    """An iterator over the given futures that yields each as it completes.
200
201    Args:
202        fs: The sequence of Futures (possibly created by different Executors) to
203            iterate over.
204        timeout: The maximum number of seconds to wait. If None, then there
205            is no limit on the wait time.
206
207    Returns:
208        An iterator that yields the given Futures as they complete (finished or
209        cancelled). If any given Futures are duplicated, they will be returned
210        once.
211
212    Raises:
213        TimeoutError: If the entire result iterator could not be generated
214            before the given timeout.
215    """
216    if timeout is not None:
217        end_time = timeout + time.time()
218
219    fs = set(fs)
220    total_futures = len(fs)
221    with _AcquireFutures(fs):
222        finished = set(
223                f for f in fs
224                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
225        pending = fs - finished
226        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
227    finished = list(finished)
228    try:
229        for f in _yield_finished_futures(finished, waiter,
230                                         ref_collect=(fs,)):
231            f = [f]
232            yield f.pop()
233
234        while pending:
235            if timeout is None:
236                wait_timeout = None
237            else:
238                wait_timeout = end_time - time.time()
239                if wait_timeout < 0:
240                    raise TimeoutError(
241                            '%d (of %d) futures unfinished' % (
242                            len(pending), total_futures))
243
244            waiter.event.wait(wait_timeout)
245
246            with waiter.lock:
247                finished = waiter.finished_futures
248                waiter.finished_futures = []
249                waiter.event.clear()
250
251            # reverse to keep finishing order
252            finished.reverse()
253            for f in _yield_finished_futures(finished, waiter,
254                                             ref_collect=(fs, pending)):
255                f = [f]
256                yield f.pop()
257
258    finally:
259        # Remove waiter from unfinished futures
260        for f in fs:
261            with f._condition:
262                f._waiters.remove(waiter)
263
264DoneAndNotDoneFutures = collections.namedtuple(
265        'DoneAndNotDoneFutures', 'done not_done')
266def wait(fs, timeout=None, return_when=ALL_COMPLETED):
267    """Wait for the futures in the given sequence to complete.
268
269    Args:
270        fs: The sequence of Futures (possibly created by different Executors) to
271            wait upon.
272        timeout: The maximum number of seconds to wait. If None, then there
273            is no limit on the wait time.
274        return_when: Indicates when this function should return. The options
275            are:
276
277            FIRST_COMPLETED - Return when any future finishes or is
278                              cancelled.
279            FIRST_EXCEPTION - Return when any future finishes by raising an
280                              exception. If no future raises an exception
281                              then it is equivalent to ALL_COMPLETED.
282            ALL_COMPLETED -   Return when all futures finish or are cancelled.
283
284    Returns:
285        A named 2-tuple of sets. The first set, named 'done', contains the
286        futures that completed (is finished or cancelled) before the wait
287        completed. The second set, named 'not_done', contains uncompleted
288        futures.
289    """
290    with _AcquireFutures(fs):
291        done = set(f for f in fs
292                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
293        not_done = set(fs) - done
294
295        if (return_when == FIRST_COMPLETED) and done:
296            return DoneAndNotDoneFutures(done, not_done)
297        elif (return_when == FIRST_EXCEPTION) and done:
298            if any(f for f in done
299                   if not f.cancelled() and f.exception() is not None):
300                return DoneAndNotDoneFutures(done, not_done)
301
302        if len(done) == len(fs):
303            return DoneAndNotDoneFutures(done, not_done)
304
305        waiter = _create_and_install_waiters(fs, return_when)
306
307    waiter.event.wait(timeout)
308    for f in fs:
309        with f._condition:
310            f._waiters.remove(waiter)
311
312    done.update(waiter.finished_futures)
313    return DoneAndNotDoneFutures(done, set(fs) - done)
314
315class Future(object):
316    """Represents the result of an asynchronous computation."""
317
318    def __init__(self):
319        """Initializes the future. Should not be called by clients."""
320        self._condition = threading.Condition()
321        self._state = PENDING
322        self._result = None
323        self._exception = None
324        self._traceback = None
325        self._waiters = []
326        self._done_callbacks = []
327
328    def _invoke_callbacks(self):
329        for callback in self._done_callbacks:
330            try:
331                callback(self)
332            except Exception:
333                LOGGER.exception('exception calling callback for %r', self)
334            except BaseException:
335                # Explicitly let all other new-style exceptions through so
336                # that we can catch all old-style exceptions with a simple
337                # "except:" clause below.
338                #
339                # All old-style exception objects are instances of
340                # types.InstanceType, but "except types.InstanceType:" does
341                # not catch old-style exceptions for some reason.  Thus, the
342                # only way to catch all old-style exceptions without catching
343                # any new-style exceptions is to filter out the new-style
344                # exceptions, which all derive from BaseException.
345                raise
346            except:
347                # Because of the BaseException clause above, this handler only
348                # executes for old-style exception objects.
349                LOGGER.exception('exception calling callback for %r', self)
350
351    def __repr__(self):
352        with self._condition:
353            if self._state == FINISHED:
354                if self._exception:
355                    return '<%s at %#x state=%s raised %s>' % (
356                        self.__class__.__name__,
357                        id(self),
358                        _STATE_TO_DESCRIPTION_MAP[self._state],
359                        self._exception.__class__.__name__)
360                else:
361                    return '<%s at %#x state=%s returned %s>' % (
362                        self.__class__.__name__,
363                        id(self),
364                        _STATE_TO_DESCRIPTION_MAP[self._state],
365                        self._result.__class__.__name__)
366            return '<%s at %#x state=%s>' % (
367                    self.__class__.__name__,
368                    id(self),
369                   _STATE_TO_DESCRIPTION_MAP[self._state])
370
371    def cancel(self):
372        """Cancel the future if possible.
373
374        Returns True if the future was cancelled, False otherwise. A future
375        cannot be cancelled if it is running or has already completed.
376        """
377        with self._condition:
378            if self._state in [RUNNING, FINISHED]:
379                return False
380
381            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
382                return True
383
384            self._state = CANCELLED
385            self._condition.notify_all()
386
387        self._invoke_callbacks()
388        return True
389
390    def cancelled(self):
391        """Return True if the future was cancelled."""
392        with self._condition:
393            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
394
395    def running(self):
396        """Return True if the future is currently executing."""
397        with self._condition:
398            return self._state == RUNNING
399
400    def done(self):
401        """Return True of the future was cancelled or finished executing."""
402        with self._condition:
403            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
404
405    def __get_result(self):
406        if self._exception:
407            if isinstance(self._exception, types.InstanceType):
408                # The exception is an instance of an old-style class, which
409                # means type(self._exception) returns types.ClassType instead
410                # of the exception's actual class type.
411                exception_type = self._exception.__class__
412            else:
413                exception_type = type(self._exception)
414            raise exception_type, self._exception, self._traceback
415        else:
416            return self._result
417
418    def add_done_callback(self, fn):
419        """Attaches a callable that will be called when the future finishes.
420
421        Args:
422            fn: A callable that will be called with this future as its only
423                argument when the future completes or is cancelled. The callable
424                will always be called by a thread in the same process in which
425                it was added. If the future has already completed or been
426                cancelled then the callable will be called immediately. These
427                callables are called in the order that they were added.
428        """
429        with self._condition:
430            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
431                self._done_callbacks.append(fn)
432                return
433        fn(self)
434
435    def result(self, timeout=None):
436        """Return the result of the call that the future represents.
437
438        Args:
439            timeout: The number of seconds to wait for the result if the future
440                isn't done. If None, then there is no limit on the wait time.
441
442        Returns:
443            The result of the call that the future represents.
444
445        Raises:
446            CancelledError: If the future was cancelled.
447            TimeoutError: If the future didn't finish executing before the given
448                timeout.
449            Exception: If the call raised then that exception will be raised.
450        """
451        with self._condition:
452            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
453                raise CancelledError()
454            elif self._state == FINISHED:
455                return self.__get_result()
456
457            self._condition.wait(timeout)
458
459            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
460                raise CancelledError()
461            elif self._state == FINISHED:
462                return self.__get_result()
463            else:
464                raise TimeoutError()
465
466    def exception_info(self, timeout=None):
467        """Return a tuple of (exception, traceback) raised by the call that the
468        future represents.
469
470        Args:
471            timeout: The number of seconds to wait for the exception if the
472                future isn't done. If None, then there is no limit on the wait
473                time.
474
475        Returns:
476            The exception raised by the call that the future represents or None
477            if the call completed without raising.
478
479        Raises:
480            CancelledError: If the future was cancelled.
481            TimeoutError: If the future didn't finish executing before the given
482                timeout.
483        """
484        with self._condition:
485            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
486                raise CancelledError()
487            elif self._state == FINISHED:
488                return self._exception, self._traceback
489
490            self._condition.wait(timeout)
491
492            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
493                raise CancelledError()
494            elif self._state == FINISHED:
495                return self._exception, self._traceback
496            else:
497                raise TimeoutError()
498
499    def exception(self, timeout=None):
500        """Return the exception raised by the call that the future represents.
501
502        Args:
503            timeout: The number of seconds to wait for the exception if the
504                future isn't done. If None, then there is no limit on the wait
505                time.
506
507        Returns:
508            The exception raised by the call that the future represents or None
509            if the call completed without raising.
510
511        Raises:
512            CancelledError: If the future was cancelled.
513            TimeoutError: If the future didn't finish executing before the given
514                timeout.
515        """
516        return self.exception_info(timeout)[0]
517
518    # The following methods should only be used by Executors and in tests.
519    def set_running_or_notify_cancel(self):
520        """Mark the future as running or process any cancel notifications.
521
522        Should only be used by Executor implementations and unit tests.
523
524        If the future has been cancelled (cancel() was called and returned
525        True) then any threads waiting on the future completing (though calls
526        to as_completed() or wait()) are notified and False is returned.
527
528        If the future was not cancelled then it is put in the running state
529        (future calls to running() will return True) and True is returned.
530
531        This method should be called by Executor implementations before
532        executing the work associated with this future. If this method returns
533        False then the work should not be executed.
534
535        Returns:
536            False if the Future was cancelled, True otherwise.
537
538        Raises:
539            RuntimeError: if this method was already called or if set_result()
540                or set_exception() was called.
541        """
542        with self._condition:
543            if self._state == CANCELLED:
544                self._state = CANCELLED_AND_NOTIFIED
545                for waiter in self._waiters:
546                    waiter.add_cancelled(self)
547                # self._condition.notify_all() is not necessary because
548                # self.cancel() triggers a notification.
549                return False
550            elif self._state == PENDING:
551                self._state = RUNNING
552                return True
553            else:
554                LOGGER.critical('Future %s in unexpected state: %s',
555                                id(self),
556                                self._state)
557                raise RuntimeError('Future in unexpected state')
558
559    def set_result(self, result):
560        """Sets the return value of work associated with the future.
561
562        Should only be used by Executor implementations and unit tests.
563        """
564        with self._condition:
565            self._result = result
566            self._state = FINISHED
567            for waiter in self._waiters:
568                waiter.add_result(self)
569            self._condition.notify_all()
570        self._invoke_callbacks()
571
572    def set_exception_info(self, exception, traceback):
573        """Sets the result of the future as being the given exception
574        and traceback.
575
576        Should only be used by Executor implementations and unit tests.
577        """
578        with self._condition:
579            self._exception = exception
580            self._traceback = traceback
581            self._state = FINISHED
582            for waiter in self._waiters:
583                waiter.add_exception(self)
584            self._condition.notify_all()
585        self._invoke_callbacks()
586
587    def set_exception(self, exception):
588        """Sets the result of the future as being the given exception.
589
590        Should only be used by Executor implementations and unit tests.
591        """
592        self.set_exception_info(exception, None)
593
594class Executor(object):
595    """This is an abstract base class for concrete asynchronous executors."""
596
597    def submit(self, fn, *args, **kwargs):
598        """Submits a callable to be executed with the given arguments.
599
600        Schedules the callable to be executed as fn(*args, **kwargs) and returns
601        a Future instance representing the execution of the callable.
602
603        Returns:
604            A Future representing the given call.
605        """
606        raise NotImplementedError()
607
608    def map(self, fn, *iterables, **kwargs):
609        """Returns an iterator equivalent to map(fn, iter).
610
611        Args:
612            fn: A callable that will take as many arguments as there are
613                passed iterables.
614            timeout: The maximum number of seconds to wait. If None, then there
615                is no limit on the wait time.
616
617        Returns:
618            An iterator equivalent to: map(func, *iterables) but the calls may
619            be evaluated out-of-order.
620
621        Raises:
622            TimeoutError: If the entire result iterator could not be generated
623                before the given timeout.
624            Exception: If fn(*args) raises for any values.
625        """
626        timeout = kwargs.get('timeout')
627        if timeout is not None:
628            end_time = timeout + time.time()
629
630        fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
631
632        # Yield must be hidden in closure so that the futures are submitted
633        # before the first iterator value is required.
634        def result_iterator():
635            try:
636                # reverse to keep finishing order
637                fs.reverse()
638                while fs:
639                    # Careful not to keep a reference to the popped future
640                    if timeout is None:
641                        yield fs.pop().result()
642                    else:
643                        yield fs.pop().result(end_time - time.time())
644            finally:
645                for future in fs:
646                    future.cancel()
647        return result_iterator()
648
649    def shutdown(self, wait=True):
650        """Clean-up the resources associated with the Executor.
651
652        It is safe to call this method several times. Otherwise, no other
653        methods can be called after this one.
654
655        Args:
656            wait: If True then shutdown will not return until all running
657                futures have finished executing and the resources used by the
658                executor have been reclaimed.
659        """
660        pass
661
662    def __enter__(self):
663        return self
664
665    def __exit__(self, exc_type, exc_val, exc_tb):
666        self.shutdown(wait=True)
667        return False
668