• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""A Future class similar to the one in PEP 3148."""
2
3__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError',
4           'Future', 'wrap_future', 'isfuture']
5
6import concurrent.futures
7import logging
8import sys
9import traceback
10
11from . import base_futures
12from . import compat
13from . import events
14
15
16CancelledError = base_futures.CancelledError
17InvalidStateError = base_futures.InvalidStateError
18TimeoutError = base_futures.TimeoutError
19isfuture = base_futures.isfuture
20
21
22_PENDING = base_futures._PENDING
23_CANCELLED = base_futures._CANCELLED
24_FINISHED = base_futures._FINISHED
25
26
27STACK_DEBUG = logging.DEBUG - 1  # heavy-duty debugging
28
29
30class _TracebackLogger:
31    """Helper to log a traceback upon destruction if not cleared.
32
33    This solves a nasty problem with Futures and Tasks that have an
34    exception set: if nobody asks for the exception, the exception is
35    never logged.  This violates the Zen of Python: 'Errors should
36    never pass silently.  Unless explicitly silenced.'
37
38    However, we don't want to log the exception as soon as
39    set_exception() is called: if the calling code is written
40    properly, it will get the exception and handle it properly.  But
41    we *do* want to log it if result() or exception() was never called
42    -- otherwise developers waste a lot of time wondering why their
43    buggy code fails silently.
44
45    An earlier attempt added a __del__() method to the Future class
46    itself, but this backfired because the presence of __del__()
47    prevents garbage collection from breaking cycles.  A way out of
48    this catch-22 is to avoid having a __del__() method on the Future
49    class itself, but instead to have a reference to a helper object
50    with a __del__() method that logs the traceback, where we ensure
51    that the helper object doesn't participate in cycles, and only the
52    Future has a reference to it.
53
54    The helper object is added when set_exception() is called.  When
55    the Future is collected, and the helper is present, the helper
56    object is also collected, and its __del__() method will log the
57    traceback.  When the Future's result() or exception() method is
58    called (and a helper object is present), it removes the helper
59    object, after calling its clear() method to prevent it from
60    logging.
61
62    One downside is that we do a fair amount of work to extract the
63    traceback from the exception, even when it is never logged.  It
64    would seem cheaper to just store the exception object, but that
65    references the traceback, which references stack frames, which may
66    reference the Future, which references the _TracebackLogger, and
67    then the _TracebackLogger would be included in a cycle, which is
68    what we're trying to avoid!  As an optimization, we don't
69    immediately format the exception; we only do the work when
70    activate() is called, which call is delayed until after all the
71    Future's callbacks have run.  Since usually a Future has at least
72    one callback (typically set by 'yield from') and usually that
73    callback extracts the callback, thereby removing the need to
74    format the exception.
75
76    PS. I don't claim credit for this solution.  I first heard of it
77    in a discussion about closing files when they are collected.
78    """
79
80    __slots__ = ('loop', 'source_traceback', 'exc', 'tb')
81
82    def __init__(self, future, exc):
83        self.loop = future._loop
84        self.source_traceback = future._source_traceback
85        self.exc = exc
86        self.tb = None
87
88    def activate(self):
89        exc = self.exc
90        if exc is not None:
91            self.exc = None
92            self.tb = traceback.format_exception(exc.__class__, exc,
93                                                 exc.__traceback__)
94
95    def clear(self):
96        self.exc = None
97        self.tb = None
98
99    def __del__(self):
100        if self.tb:
101            msg = 'Future/Task exception was never retrieved\n'
102            if self.source_traceback:
103                src = ''.join(traceback.format_list(self.source_traceback))
104                msg += 'Future/Task created at (most recent call last):\n'
105                msg += '%s\n' % src.rstrip()
106            msg += ''.join(self.tb).rstrip()
107            self.loop.call_exception_handler({'message': msg})
108
109
110class Future:
111    """This class is *almost* compatible with concurrent.futures.Future.
112
113    Differences:
114
115    - result() and exception() do not take a timeout argument and
116      raise an exception when the future isn't done yet.
117
118    - Callbacks registered with add_done_callback() are always called
119      via the event loop's call_soon_threadsafe().
120
121    - This class is not compatible with the wait() and as_completed()
122      methods in the concurrent.futures package.
123
124    (In Python 3.4 or later we may be able to unify the implementations.)
125    """
126
127    # Class variables serving as defaults for instance variables.
128    _state = _PENDING
129    _result = None
130    _exception = None
131    _loop = None
132    _source_traceback = None
133
134    # This field is used for a dual purpose:
135    # - Its presence is a marker to declare that a class implements
136    #   the Future protocol (i.e. is intended to be duck-type compatible).
137    #   The value must also be not-None, to enable a subclass to declare
138    #   that it is not compatible by setting this to None.
139    # - It is set by __iter__() below so that Task._step() can tell
140    #   the difference between `yield from Future()` (correct) vs.
141    #   `yield Future()` (incorrect).
142    _asyncio_future_blocking = False
143
144    _log_traceback = False   # Used for Python 3.4 and later
145    _tb_logger = None        # Used for Python 3.3 only
146
147    def __init__(self, *, loop=None):
148        """Initialize the future.
149
150        The optional event_loop argument allows explicitly setting the event
151        loop object used by the future. If it's not provided, the future uses
152        the default event loop.
153        """
154        if loop is None:
155            self._loop = events.get_event_loop()
156        else:
157            self._loop = loop
158        self._callbacks = []
159        if self._loop.get_debug():
160            self._source_traceback = traceback.extract_stack(sys._getframe(1))
161
162    _repr_info = base_futures._future_repr_info
163
164    def __repr__(self):
165        return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info()))
166
167    # On Python 3.3 and older, objects with a destructor part of a reference
168    # cycle are never destroyed. It's not more the case on Python 3.4 thanks
169    # to the PEP 442.
170    if compat.PY34:
171        def __del__(self):
172            if not self._log_traceback:
173                # set_exception() was not called, or result() or exception()
174                # has consumed the exception
175                return
176            exc = self._exception
177            context = {
178                'message': ('%s exception was never retrieved'
179                            % self.__class__.__name__),
180                'exception': exc,
181                'future': self,
182            }
183            if self._source_traceback:
184                context['source_traceback'] = self._source_traceback
185            self._loop.call_exception_handler(context)
186
187    def cancel(self):
188        """Cancel the future and schedule callbacks.
189
190        If the future is already done or cancelled, return False.  Otherwise,
191        change the future's state to cancelled, schedule the callbacks and
192        return True.
193        """
194        if self._state != _PENDING:
195            return False
196        self._state = _CANCELLED
197        self._schedule_callbacks()
198        return True
199
200    def _schedule_callbacks(self):
201        """Internal: Ask the event loop to call all callbacks.
202
203        The callbacks are scheduled to be called as soon as possible. Also
204        clears the callback list.
205        """
206        callbacks = self._callbacks[:]
207        if not callbacks:
208            return
209
210        self._callbacks[:] = []
211        for callback in callbacks:
212            self._loop.call_soon(callback, self)
213
214    def cancelled(self):
215        """Return True if the future was cancelled."""
216        return self._state == _CANCELLED
217
218    # Don't implement running(); see http://bugs.python.org/issue18699
219
220    def done(self):
221        """Return True if the future is done.
222
223        Done means either that a result / exception are available, or that the
224        future was cancelled.
225        """
226        return self._state != _PENDING
227
228    def result(self):
229        """Return the result this future represents.
230
231        If the future has been cancelled, raises CancelledError.  If the
232        future's result isn't yet available, raises InvalidStateError.  If
233        the future is done and has an exception set, this exception is raised.
234        """
235        if self._state == _CANCELLED:
236            raise CancelledError
237        if self._state != _FINISHED:
238            raise InvalidStateError('Result is not ready.')
239        self._log_traceback = False
240        if self._tb_logger is not None:
241            self._tb_logger.clear()
242            self._tb_logger = None
243        if self._exception is not None:
244            raise self._exception
245        return self._result
246
247    def exception(self):
248        """Return the exception that was set on this future.
249
250        The exception (or None if no exception was set) is returned only if
251        the future is done.  If the future has been cancelled, raises
252        CancelledError.  If the future isn't done yet, raises
253        InvalidStateError.
254        """
255        if self._state == _CANCELLED:
256            raise CancelledError
257        if self._state != _FINISHED:
258            raise InvalidStateError('Exception is not set.')
259        self._log_traceback = False
260        if self._tb_logger is not None:
261            self._tb_logger.clear()
262            self._tb_logger = None
263        return self._exception
264
265    def add_done_callback(self, fn):
266        """Add a callback to be run when the future becomes done.
267
268        The callback is called with a single argument - the future object. If
269        the future is already done when this is called, the callback is
270        scheduled with call_soon.
271        """
272        if self._state != _PENDING:
273            self._loop.call_soon(fn, self)
274        else:
275            self._callbacks.append(fn)
276
277    # New method not in PEP 3148.
278
279    def remove_done_callback(self, fn):
280        """Remove all instances of a callback from the "call when done" list.
281
282        Returns the number of callbacks removed.
283        """
284        filtered_callbacks = [f for f in self._callbacks if f != fn]
285        removed_count = len(self._callbacks) - len(filtered_callbacks)
286        if removed_count:
287            self._callbacks[:] = filtered_callbacks
288        return removed_count
289
290    # So-called internal methods (note: no set_running_or_notify_cancel()).
291
292    def set_result(self, result):
293        """Mark the future done and set its result.
294
295        If the future is already done when this method is called, raises
296        InvalidStateError.
297        """
298        if self._state != _PENDING:
299            raise InvalidStateError('{}: {!r}'.format(self._state, self))
300        self._result = result
301        self._state = _FINISHED
302        self._schedule_callbacks()
303
304    def set_exception(self, exception):
305        """Mark the future done and set an exception.
306
307        If the future is already done when this method is called, raises
308        InvalidStateError.
309        """
310        if self._state != _PENDING:
311            raise InvalidStateError('{}: {!r}'.format(self._state, self))
312        if isinstance(exception, type):
313            exception = exception()
314        if type(exception) is StopIteration:
315            raise TypeError("StopIteration interacts badly with generators "
316                            "and cannot be raised into a Future")
317        self._exception = exception
318        self._state = _FINISHED
319        self._schedule_callbacks()
320        if compat.PY34:
321            self._log_traceback = True
322        else:
323            self._tb_logger = _TracebackLogger(self, exception)
324            # Arrange for the logger to be activated after all callbacks
325            # have had a chance to call result() or exception().
326            self._loop.call_soon(self._tb_logger.activate)
327
328    def __iter__(self):
329        if not self.done():
330            self._asyncio_future_blocking = True
331            yield self  # This tells Task to wait for completion.
332        assert self.done(), "yield from wasn't used with future"
333        return self.result()  # May raise too.
334
335    if compat.PY35:
336        __await__ = __iter__ # make compatible with 'await' expression
337
338
339# Needed for testing purposes.
340_PyFuture = Future
341
342
343def _set_result_unless_cancelled(fut, result):
344    """Helper setting the result only if the future was not cancelled."""
345    if fut.cancelled():
346        return
347    fut.set_result(result)
348
349
350def _set_concurrent_future_state(concurrent, source):
351    """Copy state from a future to a concurrent.futures.Future."""
352    assert source.done()
353    if source.cancelled():
354        concurrent.cancel()
355    if not concurrent.set_running_or_notify_cancel():
356        return
357    exception = source.exception()
358    if exception is not None:
359        concurrent.set_exception(exception)
360    else:
361        result = source.result()
362        concurrent.set_result(result)
363
364
365def _copy_future_state(source, dest):
366    """Internal helper to copy state from another Future.
367
368    The other Future may be a concurrent.futures.Future.
369    """
370    assert source.done()
371    if dest.cancelled():
372        return
373    assert not dest.done()
374    if source.cancelled():
375        dest.cancel()
376    else:
377        exception = source.exception()
378        if exception is not None:
379            dest.set_exception(exception)
380        else:
381            result = source.result()
382            dest.set_result(result)
383
384
385def _chain_future(source, destination):
386    """Chain two futures so that when one completes, so does the other.
387
388    The result (or exception) of source will be copied to destination.
389    If destination is cancelled, source gets cancelled too.
390    Compatible with both asyncio.Future and concurrent.futures.Future.
391    """
392    if not isfuture(source) and not isinstance(source,
393                                               concurrent.futures.Future):
394        raise TypeError('A future is required for source argument')
395    if not isfuture(destination) and not isinstance(destination,
396                                                    concurrent.futures.Future):
397        raise TypeError('A future is required for destination argument')
398    source_loop = source._loop if isfuture(source) else None
399    dest_loop = destination._loop if isfuture(destination) else None
400
401    def _set_state(future, other):
402        if isfuture(future):
403            _copy_future_state(other, future)
404        else:
405            _set_concurrent_future_state(future, other)
406
407    def _call_check_cancel(destination):
408        if destination.cancelled():
409            if source_loop is None or source_loop is dest_loop:
410                source.cancel()
411            else:
412                source_loop.call_soon_threadsafe(source.cancel)
413
414    def _call_set_state(source):
415        if dest_loop is None or dest_loop is source_loop:
416            _set_state(destination, source)
417        else:
418            dest_loop.call_soon_threadsafe(_set_state, destination, source)
419
420    destination.add_done_callback(_call_check_cancel)
421    source.add_done_callback(_call_set_state)
422
423
424def wrap_future(future, *, loop=None):
425    """Wrap concurrent.futures.Future object."""
426    if isfuture(future):
427        return future
428    assert isinstance(future, concurrent.futures.Future), \
429        'concurrent.futures.Future is expected, got {!r}'.format(future)
430    if loop is None:
431        loop = events.get_event_loop()
432    new_future = loop.create_future()
433    _chain_future(future, new_future)
434    return new_future
435
436
437try:
438    import _asyncio
439except ImportError:
440    pass
441else:
442    # _CFuture is needed for tests.
443    Future = _CFuture = _asyncio.Future
444