• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = (
4    'CancelledError', 'TimeoutError', 'InvalidStateError',
5    'Future', 'wrap_future', 'isfuture',
6)
7
8import concurrent.futures
9import contextvars
10import logging
11import sys
12
13from . import base_futures
14from . import events
15from . import format_helpers
16
17
18CancelledError = base_futures.CancelledError
19InvalidStateError = base_futures.InvalidStateError
20TimeoutError = base_futures.TimeoutError
21isfuture = base_futures.isfuture
22
23
24_PENDING = base_futures._PENDING
25_CANCELLED = base_futures._CANCELLED
26_FINISHED = base_futures._FINISHED
27
28
29STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
30
31
32class Future:
33    """This class is *almost* compatible with concurrent.futures.Future.
34
35    Differences:
36
37    - This class is not thread-safe.
38
39    - result() and exception() do not take a timeout argument and
40      raise an exception when the future isn't done yet.
41
42    - Callbacks registered with add_done_callback() are always called
43      via the event loop's call_soon().
44
45    - This class is not compatible with the wait() and as_completed()
46      methods in the concurrent.futures package.
47
48    (In Python 3.4 or later we may be able to unify the implementations.)
49    """
50
51    # Class variables serving as defaults for instance variables.
52    _state = _PENDING
53    _result = None
54    _exception = None
55    _loop = None
56    _source_traceback = None
57
58    # This field is used for a dual purpose:
59    # - Its presence is a marker to declare that a class implements
60    #   the Future protocol (i.e. is intended to be duck-type compatible).
61    #   The value must also be not-None, to enable a subclass to declare
62    #   that it is not compatible by setting this to None.
63    # - It is set by __iter__() below so that Task._step() can tell
64    #   the difference between
65    #   `await Future()` or`yield from Future()` (correct) vs.
66    #   `yield Future()` (incorrect).
67    _asyncio_future_blocking = False
68
69    __log_traceback = False
70
71    def __init__(self, *, loop=None):
72        """Initialize the future.
73
74        The optional event_loop argument allows explicitly setting the event
75        loop object used by the future. If it's not provided, the future uses
76        the default event loop.
77        """
78        if loop is None:
79            self._loop = events.get_event_loop()
80        else:
81            self._loop = loop
82        self._callbacks = []
83        if self._loop.get_debug():
84            self._source_traceback = format_helpers.extract_stack(
85                sys._getframe(1))
86
87    _repr_info = base_futures._future_repr_info
88
89    def __repr__(self):
90        return '<{} {}>'.format(self.__class__.__name__,
91                                ' '.join(self._repr_info()))
92
93    def __del__(self):
94        if not self.__log_traceback:
95            # set_exception() was not called, or result() or exception()
96            # has consumed the exception
97            return
98        exc = self._exception
99        context = {
100            'message':
101                f'{self.__class__.__name__} exception was never retrieved',
102            'exception': exc,
103            'future': self,
104        }
105        if self._source_traceback:
106            context['source_traceback'] = self._source_traceback
107        self._loop.call_exception_handler(context)
108
109    @property
110    def _log_traceback(self):
111        return self.__log_traceback
112
113    @_log_traceback.setter
114    def _log_traceback(self, val):
115        if bool(val):
116            raise ValueError('_log_traceback can only be set to False')
117        self.__log_traceback = False
118
119    def get_loop(self):
120        """Return the event loop the Future is bound to."""
121        return self._loop
122
123    def cancel(self):
124        """Cancel the future and schedule callbacks.
125
126        If the future is already done or cancelled, return False.  Otherwise,
127        change the future's state to cancelled, schedule the callbacks and
128        return True.
129        """
130        self.__log_traceback = False
131        if self._state != _PENDING:
132            return False
133        self._state = _CANCELLED
134        self.__schedule_callbacks()
135        return True
136
137    def __schedule_callbacks(self):
138        """Internal: Ask the event loop to call all callbacks.
139
140        The callbacks are scheduled to be called as soon as possible. Also
141        clears the callback list.
142        """
143        callbacks = self._callbacks[:]
144        if not callbacks:
145            return
146
147        self._callbacks[:] = []
148        for callback, ctx in callbacks:
149            self._loop.call_soon(callback, self, context=ctx)
150
151    def cancelled(self):
152        """Return True if the future was cancelled."""
153        return self._state == _CANCELLED
154
155    # Don't implement running(); see http://bugs.python.org/issue18699
156
157    def done(self):
158        """Return True if the future is done.
159
160        Done means either that a result / exception are available, or that the
161        future was cancelled.
162        """
163        return self._state != _PENDING
164
165    def result(self):
166        """Return the result this future represents.
167
168        If the future has been cancelled, raises CancelledError.  If the
169        future's result isn't yet available, raises InvalidStateError.  If
170        the future is done and has an exception set, this exception is raised.
171        """
172        if self._state == _CANCELLED:
173            raise CancelledError
174        if self._state != _FINISHED:
175            raise InvalidStateError('Result is not ready.')
176        self.__log_traceback = False
177        if self._exception is not None:
178            raise self._exception
179        return self._result
180
181    def exception(self):
182        """Return the exception that was set on this future.
183
184        The exception (or None if no exception was set) is returned only if
185        the future is done.  If the future has been cancelled, raises
186        CancelledError.  If the future isn't done yet, raises
187        InvalidStateError.
188        """
189        if self._state == _CANCELLED:
190            raise CancelledError
191        if self._state != _FINISHED:
192            raise InvalidStateError('Exception is not set.')
193        self.__log_traceback = False
194        return self._exception
195
196    def add_done_callback(self, fn, *, context=None):
197        """Add a callback to be run when the future becomes done.
198
199        The callback is called with a single argument - the future object. If
200        the future is already done when this is called, the callback is
201        scheduled with call_soon.
202        """
203        if self._state != _PENDING:
204            self._loop.call_soon(fn, self, context=context)
205        else:
206            if context is None:
207                context = contextvars.copy_context()
208            self._callbacks.append((fn, context))
209
210    # New method not in PEP 3148.
211
212    def remove_done_callback(self, fn):
213        """Remove all instances of a callback from the "call when done" list.
214
215        Returns the number of callbacks removed.
216        """
217        filtered_callbacks = [(f, ctx)
218                              for (f, ctx) in self._callbacks
219                              if f != fn]
220        removed_count = len(self._callbacks) - len(filtered_callbacks)
221        if removed_count:
222            self._callbacks[:] = filtered_callbacks
223        return removed_count
224
225    # So-called internal methods (note: no set_running_or_notify_cancel()).
226
227    def set_result(self, result):
228        """Mark the future done and set its result.
229
230        If the future is already done when this method is called, raises
231        InvalidStateError.
232        """
233        if self._state != _PENDING:
234            raise InvalidStateError('{}: {!r}'.format(self._state, self))
235        self._result = result
236        self._state = _FINISHED
237        self.__schedule_callbacks()
238
239    def set_exception(self, exception):
240        """Mark the future done and set an exception.
241
242        If the future is already done when this method is called, raises
243        InvalidStateError.
244        """
245        if self._state != _PENDING:
246            raise InvalidStateError('{}: {!r}'.format(self._state, self))
247        if isinstance(exception, type):
248            exception = exception()
249        if type(exception) is StopIteration:
250            raise TypeError("StopIteration interacts badly with generators "
251                            "and cannot be raised into a Future")
252        self._exception = exception
253        self._state = _FINISHED
254        self.__schedule_callbacks()
255        self.__log_traceback = True
256
257    def __await__(self):
258        if not self.done():
259            self._asyncio_future_blocking = True
260            yield self  # This tells Task to wait for completion.
261        if not self.done():
262            raise RuntimeError("await wasn't used with future")
263        return self.result()  # May raise too.
264
265    __iter__ = __await__  # make compatible with 'yield from'.
266
267
268# Needed for testing purposes.
269_PyFuture = Future
270
271
272def _get_loop(fut):
273    # Tries to call Future.get_loop() if it's available.
274    # Otherwise fallbacks to using the old '_loop' property.
275    try:
276        get_loop = fut.get_loop
277    except AttributeError:
278        pass
279    else:
280        return get_loop()
281    return fut._loop
282
283
284def _set_result_unless_cancelled(fut, result):
285    """Helper setting the result only if the future was not cancelled."""
286    if fut.cancelled():
287        return
288    fut.set_result(result)
289
290
291def _set_concurrent_future_state(concurrent, source):
292    """Copy state from a future to a concurrent.futures.Future."""
293    assert source.done()
294    if source.cancelled():
295        concurrent.cancel()
296    if not concurrent.set_running_or_notify_cancel():
297        return
298    exception = source.exception()
299    if exception is not None:
300        concurrent.set_exception(exception)
301    else:
302        result = source.result()
303        concurrent.set_result(result)
304
305
306def _copy_future_state(source, dest):
307    """Internal helper to copy state from another Future.
308
309    The other Future may be a concurrent.futures.Future.
310    """
311    assert source.done()
312    if dest.cancelled():
313        return
314    assert not dest.done()
315    if source.cancelled():
316        dest.cancel()
317    else:
318        exception = source.exception()
319        if exception is not None:
320            dest.set_exception(exception)
321        else:
322            result = source.result()
323            dest.set_result(result)
324
325
326def _chain_future(source, destination):
327    """Chain two futures so that when one completes, so does the other.
328
329    The result (or exception) of source will be copied to destination.
330    If destination is cancelled, source gets cancelled too.
331    Compatible with both asyncio.Future and concurrent.futures.Future.
332    """
333    if not isfuture(source) and not isinstance(source,
334                                               concurrent.futures.Future):
335        raise TypeError('A future is required for source argument')
336    if not isfuture(destination) and not isinstance(destination,
337                                                    concurrent.futures.Future):
338        raise TypeError('A future is required for destination argument')
339    source_loop = _get_loop(source) if isfuture(source) else None
340    dest_loop = _get_loop(destination) if isfuture(destination) else None
341
342    def _set_state(future, other):
343        if isfuture(future):
344            _copy_future_state(other, future)
345        else:
346            _set_concurrent_future_state(future, other)
347
348    def _call_check_cancel(destination):
349        if destination.cancelled():
350            if source_loop is None or source_loop is dest_loop:
351                source.cancel()
352            else:
353                source_loop.call_soon_threadsafe(source.cancel)
354
355    def _call_set_state(source):
356        if (destination.cancelled() and
357                dest_loop is not None and dest_loop.is_closed()):
358            return
359        if dest_loop is None or dest_loop is source_loop:
360            _set_state(destination, source)
361        else:
362            dest_loop.call_soon_threadsafe(_set_state, destination, source)
363
364    destination.add_done_callback(_call_check_cancel)
365    source.add_done_callback(_call_set_state)
366
367
368def wrap_future(future, *, loop=None):
369    """Wrap concurrent.futures.Future object."""
370    if isfuture(future):
371        return future
372    assert isinstance(future, concurrent.futures.Future), \
373        f'concurrent.futures.Future is expected, got {future!r}'
374    if loop is None:
375        loop = events.get_event_loop()
376    new_future = loop.create_future()
377    _chain_future(future, new_future)
378    return new_future
379
380
381try:
382    import _asyncio
383except ImportError:
384    pass
385else:
386    # _CFuture is needed for tests.
387    Future = _CFuture = _asyncio.Future
388