• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2009 Brian Quinlan. All Rights Reserved.
2# Licensed to PSF under a Contributor Agreement.
3
4__author__ = 'Brian Quinlan (brian@sweetapp.com)'
5
6import collections
7import logging
8import threading
9import time
10
11FIRST_COMPLETED = 'FIRST_COMPLETED'
12FIRST_EXCEPTION = 'FIRST_EXCEPTION'
13ALL_COMPLETED = 'ALL_COMPLETED'
14_AS_COMPLETED = '_AS_COMPLETED'
15
16# Possible future states (for internal use by the futures package).
17PENDING = 'PENDING'
18RUNNING = 'RUNNING'
19# The future was cancelled by the user...
20CANCELLED = 'CANCELLED'
21# ...and _Waiter.add_cancelled() was called by a worker.
22CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
23FINISHED = 'FINISHED'
24
25_FUTURE_STATES = [
26    PENDING,
27    RUNNING,
28    CANCELLED,
29    CANCELLED_AND_NOTIFIED,
30    FINISHED
31]
32
33_STATE_TO_DESCRIPTION_MAP = {
34    PENDING: "pending",
35    RUNNING: "running",
36    CANCELLED: "cancelled",
37    CANCELLED_AND_NOTIFIED: "cancelled",
38    FINISHED: "finished"
39}
40
41# Logger for internal use by the futures package.
42LOGGER = logging.getLogger("concurrent.futures")
43
44class Error(Exception):
45    """Base class for all future-related exceptions."""
46    pass
47
48class CancelledError(Error):
49    """The Future was cancelled."""
50    pass
51
52class TimeoutError(Error):
53    """The operation exceeded the given deadline."""
54    pass
55
56class _Waiter(object):
57    """Provides the event that wait() and as_completed() block on."""
58    def __init__(self):
59        self.event = threading.Event()
60        self.finished_futures = []
61
62    def add_result(self, future):
63        self.finished_futures.append(future)
64
65    def add_exception(self, future):
66        self.finished_futures.append(future)
67
68    def add_cancelled(self, future):
69        self.finished_futures.append(future)
70
71class _AsCompletedWaiter(_Waiter):
72    """Used by as_completed()."""
73
74    def __init__(self):
75        super(_AsCompletedWaiter, self).__init__()
76        self.lock = threading.Lock()
77
78    def add_result(self, future):
79        with self.lock:
80            super(_AsCompletedWaiter, self).add_result(future)
81            self.event.set()
82
83    def add_exception(self, future):
84        with self.lock:
85            super(_AsCompletedWaiter, self).add_exception(future)
86            self.event.set()
87
88    def add_cancelled(self, future):
89        with self.lock:
90            super(_AsCompletedWaiter, self).add_cancelled(future)
91            self.event.set()
92
93class _FirstCompletedWaiter(_Waiter):
94    """Used by wait(return_when=FIRST_COMPLETED)."""
95
96    def add_result(self, future):
97        super().add_result(future)
98        self.event.set()
99
100    def add_exception(self, future):
101        super().add_exception(future)
102        self.event.set()
103
104    def add_cancelled(self, future):
105        super().add_cancelled(future)
106        self.event.set()
107
108class _AllCompletedWaiter(_Waiter):
109    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
110
111    def __init__(self, num_pending_calls, stop_on_exception):
112        self.num_pending_calls = num_pending_calls
113        self.stop_on_exception = stop_on_exception
114        self.lock = threading.Lock()
115        super().__init__()
116
117    def _decrement_pending_calls(self):
118        with self.lock:
119            self.num_pending_calls -= 1
120            if not self.num_pending_calls:
121                self.event.set()
122
123    def add_result(self, future):
124        super().add_result(future)
125        self._decrement_pending_calls()
126
127    def add_exception(self, future):
128        super().add_exception(future)
129        if self.stop_on_exception:
130            self.event.set()
131        else:
132            self._decrement_pending_calls()
133
134    def add_cancelled(self, future):
135        super().add_cancelled(future)
136        self._decrement_pending_calls()
137
138class _AcquireFutures(object):
139    """A context manager that does an ordered acquire of Future conditions."""
140
141    def __init__(self, futures):
142        self.futures = sorted(futures, key=id)
143
144    def __enter__(self):
145        for future in self.futures:
146            future._condition.acquire()
147
148    def __exit__(self, *args):
149        for future in self.futures:
150            future._condition.release()
151
152def _create_and_install_waiters(fs, return_when):
153    if return_when == _AS_COMPLETED:
154        waiter = _AsCompletedWaiter()
155    elif return_when == FIRST_COMPLETED:
156        waiter = _FirstCompletedWaiter()
157    else:
158        pending_count = sum(
159                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
160
161        if return_when == FIRST_EXCEPTION:
162            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
163        elif return_when == ALL_COMPLETED:
164            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
165        else:
166            raise ValueError("Invalid return condition: %r" % return_when)
167
168    for f in fs:
169        f._waiters.append(waiter)
170
171    return waiter
172
173def as_completed(fs, timeout=None):
174    """An iterator over the given futures that yields each as it completes.
175
176    Args:
177        fs: The sequence of Futures (possibly created by different Executors) to
178            iterate over.
179        timeout: The maximum number of seconds to wait. If None, then there
180            is no limit on the wait time.
181
182    Returns:
183        An iterator that yields the given Futures as they complete (finished or
184        cancelled). If any given Futures are duplicated, they will be returned
185        once.
186
187    Raises:
188        TimeoutError: If the entire result iterator could not be generated
189            before the given timeout.
190    """
191    if timeout is not None:
192        end_time = timeout + time.time()
193
194    fs = set(fs)
195    with _AcquireFutures(fs):
196        finished = set(
197                f for f in fs
198                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
199        pending = fs - finished
200        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
201
202    try:
203        yield from finished
204
205        while pending:
206            if timeout is None:
207                wait_timeout = None
208            else:
209                wait_timeout = end_time - time.time()
210                if wait_timeout < 0:
211                    raise TimeoutError(
212                            '%d (of %d) futures unfinished' % (
213                            len(pending), len(fs)))
214
215            waiter.event.wait(wait_timeout)
216
217            with waiter.lock:
218                finished = waiter.finished_futures
219                waiter.finished_futures = []
220                waiter.event.clear()
221
222            for future in finished:
223                yield future
224                pending.remove(future)
225
226    finally:
227        for f in fs:
228            with f._condition:
229                f._waiters.remove(waiter)
230
231DoneAndNotDoneFutures = collections.namedtuple(
232        'DoneAndNotDoneFutures', 'done not_done')
233def wait(fs, timeout=None, return_when=ALL_COMPLETED):
234    """Wait for the futures in the given sequence to complete.
235
236    Args:
237        fs: The sequence of Futures (possibly created by different Executors) to
238            wait upon.
239        timeout: The maximum number of seconds to wait. If None, then there
240            is no limit on the wait time.
241        return_when: Indicates when this function should return. The options
242            are:
243
244            FIRST_COMPLETED - Return when any future finishes or is
245                              cancelled.
246            FIRST_EXCEPTION - Return when any future finishes by raising an
247                              exception. If no future raises an exception
248                              then it is equivalent to ALL_COMPLETED.
249            ALL_COMPLETED -   Return when all futures finish or are cancelled.
250
251    Returns:
252        A named 2-tuple of sets. The first set, named 'done', contains the
253        futures that completed (is finished or cancelled) before the wait
254        completed. The second set, named 'not_done', contains uncompleted
255        futures.
256    """
257    with _AcquireFutures(fs):
258        done = set(f for f in fs
259                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
260        not_done = set(fs) - done
261
262        if (return_when == FIRST_COMPLETED) and done:
263            return DoneAndNotDoneFutures(done, not_done)
264        elif (return_when == FIRST_EXCEPTION) and done:
265            if any(f for f in done
266                   if not f.cancelled() and f.exception() is not None):
267                return DoneAndNotDoneFutures(done, not_done)
268
269        if len(done) == len(fs):
270            return DoneAndNotDoneFutures(done, not_done)
271
272        waiter = _create_and_install_waiters(fs, return_when)
273
274    waiter.event.wait(timeout)
275    for f in fs:
276        with f._condition:
277            f._waiters.remove(waiter)
278
279    done.update(waiter.finished_futures)
280    return DoneAndNotDoneFutures(done, set(fs) - done)
281
282class Future(object):
283    """Represents the result of an asynchronous computation."""
284
285    def __init__(self):
286        """Initializes the future. Should not be called by clients."""
287        self._condition = threading.Condition()
288        self._state = PENDING
289        self._result = None
290        self._exception = None
291        self._waiters = []
292        self._done_callbacks = []
293
294    def _invoke_callbacks(self):
295        for callback in self._done_callbacks:
296            try:
297                callback(self)
298            except Exception:
299                LOGGER.exception('exception calling callback for %r', self)
300
301    def __repr__(self):
302        with self._condition:
303            if self._state == FINISHED:
304                if self._exception:
305                    return '<%s at %#x state=%s raised %s>' % (
306                        self.__class__.__name__,
307                        id(self),
308                        _STATE_TO_DESCRIPTION_MAP[self._state],
309                        self._exception.__class__.__name__)
310                else:
311                    return '<%s at %#x state=%s returned %s>' % (
312                        self.__class__.__name__,
313                        id(self),
314                        _STATE_TO_DESCRIPTION_MAP[self._state],
315                        self._result.__class__.__name__)
316            return '<%s at %#x state=%s>' % (
317                    self.__class__.__name__,
318                    id(self),
319                   _STATE_TO_DESCRIPTION_MAP[self._state])
320
321    def cancel(self):
322        """Cancel the future if possible.
323
324        Returns True if the future was cancelled, False otherwise. A future
325        cannot be cancelled if it is running or has already completed.
326        """
327        with self._condition:
328            if self._state in [RUNNING, FINISHED]:
329                return False
330
331            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
332                return True
333
334            self._state = CANCELLED
335            self._condition.notify_all()
336
337        self._invoke_callbacks()
338        return True
339
340    def cancelled(self):
341        """Return True if the future was cancelled."""
342        with self._condition:
343            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
344
345    def running(self):
346        """Return True if the future is currently executing."""
347        with self._condition:
348            return self._state == RUNNING
349
350    def done(self):
351        """Return True of the future was cancelled or finished executing."""
352        with self._condition:
353            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
354
355    def __get_result(self):
356        if self._exception:
357            raise self._exception
358        else:
359            return self._result
360
361    def add_done_callback(self, fn):
362        """Attaches a callable that will be called when the future finishes.
363
364        Args:
365            fn: A callable that will be called with this future as its only
366                argument when the future completes or is cancelled. The callable
367                will always be called by a thread in the same process in which
368                it was added. If the future has already completed or been
369                cancelled then the callable will be called immediately. These
370                callables are called in the order that they were added.
371        """
372        with self._condition:
373            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
374                self._done_callbacks.append(fn)
375                return
376        fn(self)
377
378    def result(self, timeout=None):
379        """Return the result of the call that the future represents.
380
381        Args:
382            timeout: The number of seconds to wait for the result if the future
383                isn't done. If None, then there is no limit on the wait time.
384
385        Returns:
386            The result of the call that the future represents.
387
388        Raises:
389            CancelledError: If the future was cancelled.
390            TimeoutError: If the future didn't finish executing before the given
391                timeout.
392            Exception: If the call raised then that exception will be raised.
393        """
394        with self._condition:
395            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
396                raise CancelledError()
397            elif self._state == FINISHED:
398                return self.__get_result()
399
400            self._condition.wait(timeout)
401
402            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
403                raise CancelledError()
404            elif self._state == FINISHED:
405                return self.__get_result()
406            else:
407                raise TimeoutError()
408
409    def exception(self, timeout=None):
410        """Return the exception raised by the call that the future represents.
411
412        Args:
413            timeout: The number of seconds to wait for the exception if the
414                future isn't done. If None, then there is no limit on the wait
415                time.
416
417        Returns:
418            The exception raised by the call that the future represents or None
419            if the call completed without raising.
420
421        Raises:
422            CancelledError: If the future was cancelled.
423            TimeoutError: If the future didn't finish executing before the given
424                timeout.
425        """
426
427        with self._condition:
428            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
429                raise CancelledError()
430            elif self._state == FINISHED:
431                return self._exception
432
433            self._condition.wait(timeout)
434
435            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
436                raise CancelledError()
437            elif self._state == FINISHED:
438                return self._exception
439            else:
440                raise TimeoutError()
441
442    # The following methods should only be used by Executors and in tests.
443    def set_running_or_notify_cancel(self):
444        """Mark the future as running or process any cancel notifications.
445
446        Should only be used by Executor implementations and unit tests.
447
448        If the future has been cancelled (cancel() was called and returned
449        True) then any threads waiting on the future completing (though calls
450        to as_completed() or wait()) are notified and False is returned.
451
452        If the future was not cancelled then it is put in the running state
453        (future calls to running() will return True) and True is returned.
454
455        This method should be called by Executor implementations before
456        executing the work associated with this future. If this method returns
457        False then the work should not be executed.
458
459        Returns:
460            False if the Future was cancelled, True otherwise.
461
462        Raises:
463            RuntimeError: if this method was already called or if set_result()
464                or set_exception() was called.
465        """
466        with self._condition:
467            if self._state == CANCELLED:
468                self._state = CANCELLED_AND_NOTIFIED
469                for waiter in self._waiters:
470                    waiter.add_cancelled(self)
471                # self._condition.notify_all() is not necessary because
472                # self.cancel() triggers a notification.
473                return False
474            elif self._state == PENDING:
475                self._state = RUNNING
476                return True
477            else:
478                LOGGER.critical('Future %s in unexpected state: %s',
479                                id(self),
480                                self._state)
481                raise RuntimeError('Future in unexpected state')
482
483    def set_result(self, result):
484        """Sets the return value of work associated with the future.
485
486        Should only be used by Executor implementations and unit tests.
487        """
488        with self._condition:
489            self._result = result
490            self._state = FINISHED
491            for waiter in self._waiters:
492                waiter.add_result(self)
493            self._condition.notify_all()
494        self._invoke_callbacks()
495
496    def set_exception(self, exception):
497        """Sets the result of the future as being the given exception.
498
499        Should only be used by Executor implementations and unit tests.
500        """
501        with self._condition:
502            self._exception = exception
503            self._state = FINISHED
504            for waiter in self._waiters:
505                waiter.add_exception(self)
506            self._condition.notify_all()
507        self._invoke_callbacks()
508
509class Executor(object):
510    """This is an abstract base class for concrete asynchronous executors."""
511
512    def submit(self, fn, *args, **kwargs):
513        """Submits a callable to be executed with the given arguments.
514
515        Schedules the callable to be executed as fn(*args, **kwargs) and returns
516        a Future instance representing the execution of the callable.
517
518        Returns:
519            A Future representing the given call.
520        """
521        raise NotImplementedError()
522
523    def map(self, fn, *iterables, timeout=None, chunksize=1):
524        """Returns an iterator equivalent to map(fn, iter).
525
526        Args:
527            fn: A callable that will take as many arguments as there are
528                passed iterables.
529            timeout: The maximum number of seconds to wait. If None, then there
530                is no limit on the wait time.
531            chunksize: The size of the chunks the iterable will be broken into
532                before being passed to a child process. This argument is only
533                used by ProcessPoolExecutor; it is ignored by
534                ThreadPoolExecutor.
535
536        Returns:
537            An iterator equivalent to: map(func, *iterables) but the calls may
538            be evaluated out-of-order.
539
540        Raises:
541            TimeoutError: If the entire result iterator could not be generated
542                before the given timeout.
543            Exception: If fn(*args) raises for any values.
544        """
545        if timeout is not None:
546            end_time = timeout + time.time()
547
548        fs = [self.submit(fn, *args) for args in zip(*iterables)]
549
550        # Yield must be hidden in closure so that the futures are submitted
551        # before the first iterator value is required.
552        def result_iterator():
553            try:
554                for future in fs:
555                    if timeout is None:
556                        yield future.result()
557                    else:
558                        yield future.result(end_time - time.time())
559            finally:
560                for future in fs:
561                    future.cancel()
562        return result_iterator()
563
564    def shutdown(self, wait=True):
565        """Clean-up the resources associated with the Executor.
566
567        It is safe to call this method several times. Otherwise, no other
568        methods can be called after this one.
569
570        Args:
571            wait: If True then shutdown will not return until all running
572                futures have finished executing and the resources used by the
573                executor have been reclaimed.
574        """
575        pass
576
577    def __enter__(self):
578        return self
579
580    def __exit__(self, exc_type, exc_val, exc_tb):
581        self.shutdown(wait=True)
582        return False
583