• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4    'Future', 'wrap_future', 'isfuture',
5)
6
7import concurrent.futures
8import contextvars
9import logging
10import sys
11from types import GenericAlias
12
13from . import base_futures
14from . import events
15from . import exceptions
16from . import format_helpers
17
18
19isfuture = base_futures.isfuture
20
21
22_PENDING = base_futures._PENDING
23_CANCELLED = base_futures._CANCELLED
24_FINISHED = base_futures._FINISHED
25
26
27STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
28
29
30class Future:
31    """This class is *almost* compatible with concurrent.futures.Future.
32
33    Differences:
34
35    - This class is not thread-safe.
36
37    - result() and exception() do not take a timeout argument and
38      raise an exception when the future isn't done yet.
39
40    - Callbacks registered with add_done_callback() are always called
41      via the event loop's call_soon().
42
43    - This class is not compatible with the wait() and as_completed()
44      methods in the concurrent.futures package.
45
46    (In Python 3.4 or later we may be able to unify the implementations.)
47    """
48
49    # Class variables serving as defaults for instance variables.
50    _state = _PENDING
51    _result = None
52    _exception = None
53    _loop = None
54    _source_traceback = None
55    _cancel_message = None
56    # A saved CancelledError for later chaining as an exception context.
57    _cancelled_exc = None
58
59    # This field is used for a dual purpose:
60    # - Its presence is a marker to declare that a class implements
61    #   the Future protocol (i.e. is intended to be duck-type compatible).
62    #   The value must also be not-None, to enable a subclass to declare
63    #   that it is not compatible by setting this to None.
64    # - It is set by __iter__() below so that Task._step() can tell
65    #   the difference between
66    #   `await Future()` or`yield from Future()` (correct) vs.
67    #   `yield Future()` (incorrect).
68    _asyncio_future_blocking = False
69
70    __log_traceback = False
71
72    def __init__(self, *, loop=None):
73        """Initialize the future.
74
75        The optional event_loop argument allows explicitly setting the event
76        loop object used by the future. If it's not provided, the future uses
77        the default event loop.
78        """
79        if loop is None:
80            self._loop = events.get_event_loop()
81        else:
82            self._loop = loop
83        self._callbacks = []
84        if self._loop.get_debug():
85            self._source_traceback = format_helpers.extract_stack(
86                sys._getframe(1))
87
88    def __repr__(self):
89        return base_futures._future_repr(self)
90
91    def __del__(self):
92        if not self.__log_traceback:
93            # set_exception() was not called, or result() or exception()
94            # has consumed the exception
95            return
96        exc = self._exception
97        context = {
98            'message':
99                f'{self.__class__.__name__} exception was never retrieved',
100            'exception': exc,
101            'future': self,
102        }
103        if self._source_traceback:
104            context['source_traceback'] = self._source_traceback
105        self._loop.call_exception_handler(context)
106
107    __class_getitem__ = classmethod(GenericAlias)
108
109    @property
110    def _log_traceback(self):
111        return self.__log_traceback
112
113    @_log_traceback.setter
114    def _log_traceback(self, val):
115        if val:
116            raise ValueError('_log_traceback can only be set to False')
117        self.__log_traceback = False
118
119    def get_loop(self):
120        """Return the event loop the Future is bound to."""
121        loop = self._loop
122        if loop is None:
123            raise RuntimeError("Future object is not initialized.")
124        return loop
125
126    def _make_cancelled_error(self):
127        """Create the CancelledError to raise if the Future is cancelled.
128
129        This should only be called once when handling a cancellation since
130        it erases the saved context exception value.
131        """
132        if self._cancelled_exc is not None:
133            exc = self._cancelled_exc
134            self._cancelled_exc = None
135            return exc
136
137        if self._cancel_message is None:
138            exc = exceptions.CancelledError()
139        else:
140            exc = exceptions.CancelledError(self._cancel_message)
141        return exc
142
143    def cancel(self, msg=None):
144        """Cancel the future and schedule callbacks.
145
146        If the future is already done or cancelled, return False.  Otherwise,
147        change the future's state to cancelled, schedule the callbacks and
148        return True.
149        """
150        self.__log_traceback = False
151        if self._state != _PENDING:
152            return False
153        self._state = _CANCELLED
154        self._cancel_message = msg
155        self.__schedule_callbacks()
156        return True
157
158    def __schedule_callbacks(self):
159        """Internal: Ask the event loop to call all callbacks.
160
161        The callbacks are scheduled to be called as soon as possible. Also
162        clears the callback list.
163        """
164        callbacks = self._callbacks[:]
165        if not callbacks:
166            return
167
168        self._callbacks[:] = []
169        for callback, ctx in callbacks:
170            self._loop.call_soon(callback, self, context=ctx)
171
172    def cancelled(self):
173        """Return True if the future was cancelled."""
174        return self._state == _CANCELLED
175
176    # Don't implement running(); see http://bugs.python.org/issue18699
177
178    def done(self):
179        """Return True if the future is done.
180
181        Done means either that a result / exception are available, or that the
182        future was cancelled.
183        """
184        return self._state != _PENDING
185
186    def result(self):
187        """Return the result this future represents.
188
189        If the future has been cancelled, raises CancelledError.  If the
190        future's result isn't yet available, raises InvalidStateError.  If
191        the future is done and has an exception set, this exception is raised.
192        """
193        if self._state == _CANCELLED:
194            raise self._make_cancelled_error()
195        if self._state != _FINISHED:
196            raise exceptions.InvalidStateError('Result is not ready.')
197        self.__log_traceback = False
198        if self._exception is not None:
199            raise self._exception.with_traceback(self._exception_tb)
200        return self._result
201
202    def exception(self):
203        """Return the exception that was set on this future.
204
205        The exception (or None if no exception was set) is returned only if
206        the future is done.  If the future has been cancelled, raises
207        CancelledError.  If the future isn't done yet, raises
208        InvalidStateError.
209        """
210        if self._state == _CANCELLED:
211            raise self._make_cancelled_error()
212        if self._state != _FINISHED:
213            raise exceptions.InvalidStateError('Exception is not set.')
214        self.__log_traceback = False
215        return self._exception
216
217    def add_done_callback(self, fn, *, context=None):
218        """Add a callback to be run when the future becomes done.
219
220        The callback is called with a single argument - the future object. If
221        the future is already done when this is called, the callback is
222        scheduled with call_soon.
223        """
224        if self._state != _PENDING:
225            self._loop.call_soon(fn, self, context=context)
226        else:
227            if context is None:
228                context = contextvars.copy_context()
229            self._callbacks.append((fn, context))
230
231    # New method not in PEP 3148.
232
233    def remove_done_callback(self, fn):
234        """Remove all instances of a callback from the "call when done" list.
235
236        Returns the number of callbacks removed.
237        """
238        filtered_callbacks = [(f, ctx)
239                              for (f, ctx) in self._callbacks
240                              if f != fn]
241        removed_count = len(self._callbacks) - len(filtered_callbacks)
242        if removed_count:
243            self._callbacks[:] = filtered_callbacks
244        return removed_count
245
246    # So-called internal methods (note: no set_running_or_notify_cancel()).
247
248    def set_result(self, result):
249        """Mark the future done and set its result.
250
251        If the future is already done when this method is called, raises
252        InvalidStateError.
253        """
254        if self._state != _PENDING:
255            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
256        self._result = result
257        self._state = _FINISHED
258        self.__schedule_callbacks()
259
260    def set_exception(self, exception):
261        """Mark the future done and set an exception.
262
263        If the future is already done when this method is called, raises
264        InvalidStateError.
265        """
266        if self._state != _PENDING:
267            raise exceptions.InvalidStateError(f'{self._state}: {self!r}')
268        if isinstance(exception, type):
269            exception = exception()
270        if isinstance(exception, StopIteration):
271            new_exc = RuntimeError("StopIteration interacts badly with "
272                                   "generators and cannot be raised into a "
273                                   "Future")
274            new_exc.__cause__ = exception
275            new_exc.__context__ = exception
276            exception = new_exc
277        self._exception = exception
278        self._exception_tb = exception.__traceback__
279        self._state = _FINISHED
280        self.__schedule_callbacks()
281        self.__log_traceback = True
282
283    def __await__(self):
284        if not self.done():
285            self._asyncio_future_blocking = True
286            yield self  # This tells Task to wait for completion.
287        if not self.done():
288            raise RuntimeError("await wasn't used with future")
289        return self.result()  # May raise too.
290
291    __iter__ = __await__  # make compatible with 'yield from'.
292
293
294# Needed for testing purposes.
295_PyFuture = Future
296
297
298def _get_loop(fut):
299    # Tries to call Future.get_loop() if it's available.
300    # Otherwise fallbacks to using the old '_loop' property.
301    try:
302        get_loop = fut.get_loop
303    except AttributeError:
304        pass
305    else:
306        return get_loop()
307    return fut._loop
308
309
310def _set_result_unless_cancelled(fut, result):
311    """Helper setting the result only if the future was not cancelled."""
312    if fut.cancelled():
313        return
314    fut.set_result(result)
315
316
317def _convert_future_exc(exc):
318    exc_class = type(exc)
319    if exc_class is concurrent.futures.CancelledError:
320        return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__)
321    elif exc_class is concurrent.futures.InvalidStateError:
322        return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__)
323    else:
324        return exc
325
326
327def _set_concurrent_future_state(concurrent, source):
328    """Copy state from a future to a concurrent.futures.Future."""
329    assert source.done()
330    if source.cancelled():
331        concurrent.cancel()
332    if not concurrent.set_running_or_notify_cancel():
333        return
334    exception = source.exception()
335    if exception is not None:
336        concurrent.set_exception(_convert_future_exc(exception))
337    else:
338        result = source.result()
339        concurrent.set_result(result)
340
341
342def _copy_future_state(source, dest):
343    """Internal helper to copy state from another Future.
344
345    The other Future may be a concurrent.futures.Future.
346    """
347    assert source.done()
348    if dest.cancelled():
349        return
350    assert not dest.done()
351    if source.cancelled():
352        dest.cancel()
353    else:
354        exception = source.exception()
355        if exception is not None:
356            dest.set_exception(_convert_future_exc(exception))
357        else:
358            result = source.result()
359            dest.set_result(result)
360
361
362def _chain_future(source, destination):
363    """Chain two futures so that when one completes, so does the other.
364
365    The result (or exception) of source will be copied to destination.
366    If destination is cancelled, source gets cancelled too.
367    Compatible with both asyncio.Future and concurrent.futures.Future.
368    """
369    if not isfuture(source) and not isinstance(source,
370                                               concurrent.futures.Future):
371        raise TypeError('A future is required for source argument')
372    if not isfuture(destination) and not isinstance(destination,
373                                                    concurrent.futures.Future):
374        raise TypeError('A future is required for destination argument')
375    source_loop = _get_loop(source) if isfuture(source) else None
376    dest_loop = _get_loop(destination) if isfuture(destination) else None
377
378    def _set_state(future, other):
379        if isfuture(future):
380            _copy_future_state(other, future)
381        else:
382            _set_concurrent_future_state(future, other)
383
384    def _call_check_cancel(destination):
385        if destination.cancelled():
386            if source_loop is None or source_loop is dest_loop:
387                source.cancel()
388            else:
389                source_loop.call_soon_threadsafe(source.cancel)
390
391    def _call_set_state(source):
392        if (destination.cancelled() and
393                dest_loop is not None and dest_loop.is_closed()):
394            return
395        if dest_loop is None or dest_loop is source_loop:
396            _set_state(destination, source)
397        else:
398            if dest_loop.is_closed():
399                return
400            dest_loop.call_soon_threadsafe(_set_state, destination, source)
401
402    destination.add_done_callback(_call_check_cancel)
403    source.add_done_callback(_call_set_state)
404
405
406def wrap_future(future, *, loop=None):
407    """Wrap concurrent.futures.Future object."""
408    if isfuture(future):
409        return future
410    assert isinstance(future, concurrent.futures.Future), \
411        f'concurrent.futures.Future is expected, got {future!r}'
412    if loop is None:
413        loop = events.get_event_loop()
414    new_future = loop.create_future()
415    _chain_future(future, new_future)
416    return new_future
417
418
419try:
420    import _asyncio
421except ImportError:
422    pass
423else:
424    # _CFuture is needed for tests.
425    Future = _CFuture = _asyncio.Future
426