1"""A Future class similar to the one in PEP 3148.""" 2 3__all__ = ( 4 'Future', 'wrap_future', 'isfuture', 5) 6 7import concurrent.futures 8import contextvars 9import logging 10import sys 11 12from . import base_futures 13from . import events 14from . import exceptions 15from . import format_helpers 16 17 18isfuture = base_futures.isfuture 19 20 21_PENDING = base_futures._PENDING 22_CANCELLED = base_futures._CANCELLED 23_FINISHED = base_futures._FINISHED 24 25 26STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 27 28 29class Future: 30 """This class is *almost* compatible with concurrent.futures.Future. 31 32 Differences: 33 34 - This class is not thread-safe. 35 36 - result() and exception() do not take a timeout argument and 37 raise an exception when the future isn't done yet. 38 39 - Callbacks registered with add_done_callback() are always called 40 via the event loop's call_soon(). 41 42 - This class is not compatible with the wait() and as_completed() 43 methods in the concurrent.futures package. 44 45 (In Python 3.4 or later we may be able to unify the implementations.) 46 """ 47 48 # Class variables serving as defaults for instance variables. 49 _state = _PENDING 50 _result = None 51 _exception = None 52 _loop = None 53 _source_traceback = None 54 _cancel_message = None 55 # A saved CancelledError for later chaining as an exception context. 56 _cancelled_exc = None 57 58 # This field is used for a dual purpose: 59 # - Its presence is a marker to declare that a class implements 60 # the Future protocol (i.e. is intended to be duck-type compatible). 61 # The value must also be not-None, to enable a subclass to declare 62 # that it is not compatible by setting this to None. 63 # - It is set by __iter__() below so that Task._step() can tell 64 # the difference between 65 # `await Future()` or`yield from Future()` (correct) vs. 66 # `yield Future()` (incorrect). 67 _asyncio_future_blocking = False 68 69 __log_traceback = False 70 71 def __init__(self, *, loop=None): 72 """Initialize the future. 73 74 The optional event_loop argument allows explicitly setting the event 75 loop object used by the future. If it's not provided, the future uses 76 the default event loop. 77 """ 78 if loop is None: 79 self._loop = events._get_event_loop() 80 else: 81 self._loop = loop 82 self._callbacks = [] 83 if self._loop.get_debug(): 84 self._source_traceback = format_helpers.extract_stack( 85 sys._getframe(1)) 86 87 _repr_info = base_futures._future_repr_info 88 89 def __repr__(self): 90 return '<{} {}>'.format(self.__class__.__name__, 91 ' '.join(self._repr_info())) 92 93 def __del__(self): 94 if not self.__log_traceback: 95 # set_exception() was not called, or result() or exception() 96 # has consumed the exception 97 return 98 exc = self._exception 99 context = { 100 'message': 101 f'{self.__class__.__name__} exception was never retrieved', 102 'exception': exc, 103 'future': self, 104 } 105 if self._source_traceback: 106 context['source_traceback'] = self._source_traceback 107 self._loop.call_exception_handler(context) 108 109 def __class_getitem__(cls, type): 110 return cls 111 112 @property 113 def _log_traceback(self): 114 return self.__log_traceback 115 116 @_log_traceback.setter 117 def _log_traceback(self, val): 118 if val: 119 raise ValueError('_log_traceback can only be set to False') 120 self.__log_traceback = False 121 122 def get_loop(self): 123 """Return the event loop the Future is bound to.""" 124 loop = self._loop 125 if loop is None: 126 raise RuntimeError("Future object is not initialized.") 127 return loop 128 129 def _make_cancelled_error(self): 130 """Create the CancelledError to raise if the Future is cancelled. 131 132 This should only be called once when handling a cancellation since 133 it erases the saved context exception value. 134 """ 135 if self._cancel_message is None: 136 exc = exceptions.CancelledError() 137 else: 138 exc = exceptions.CancelledError(self._cancel_message) 139 exc.__context__ = self._cancelled_exc 140 # Remove the reference since we don't need this anymore. 141 self._cancelled_exc = None 142 return exc 143 144 def cancel(self, msg=None): 145 """Cancel the future and schedule callbacks. 146 147 If the future is already done or cancelled, return False. Otherwise, 148 change the future's state to cancelled, schedule the callbacks and 149 return True. 150 """ 151 self.__log_traceback = False 152 if self._state != _PENDING: 153 return False 154 self._state = _CANCELLED 155 self._cancel_message = msg 156 self.__schedule_callbacks() 157 return True 158 159 def __schedule_callbacks(self): 160 """Internal: Ask the event loop to call all callbacks. 161 162 The callbacks are scheduled to be called as soon as possible. Also 163 clears the callback list. 164 """ 165 callbacks = self._callbacks[:] 166 if not callbacks: 167 return 168 169 self._callbacks[:] = [] 170 for callback, ctx in callbacks: 171 self._loop.call_soon(callback, self, context=ctx) 172 173 def cancelled(self): 174 """Return True if the future was cancelled.""" 175 return self._state == _CANCELLED 176 177 # Don't implement running(); see http://bugs.python.org/issue18699 178 179 def done(self): 180 """Return True if the future is done. 181 182 Done means either that a result / exception are available, or that the 183 future was cancelled. 184 """ 185 return self._state != _PENDING 186 187 def result(self): 188 """Return the result this future represents. 189 190 If the future has been cancelled, raises CancelledError. If the 191 future's result isn't yet available, raises InvalidStateError. If 192 the future is done and has an exception set, this exception is raised. 193 """ 194 if self._state == _CANCELLED: 195 exc = self._make_cancelled_error() 196 raise exc 197 if self._state != _FINISHED: 198 raise exceptions.InvalidStateError('Result is not ready.') 199 self.__log_traceback = False 200 if self._exception is not None: 201 raise self._exception 202 return self._result 203 204 def exception(self): 205 """Return the exception that was set on this future. 206 207 The exception (or None if no exception was set) is returned only if 208 the future is done. If the future has been cancelled, raises 209 CancelledError. If the future isn't done yet, raises 210 InvalidStateError. 211 """ 212 if self._state == _CANCELLED: 213 exc = self._make_cancelled_error() 214 raise exc 215 if self._state != _FINISHED: 216 raise exceptions.InvalidStateError('Exception is not set.') 217 self.__log_traceback = False 218 return self._exception 219 220 def add_done_callback(self, fn, *, context=None): 221 """Add a callback to be run when the future becomes done. 222 223 The callback is called with a single argument - the future object. If 224 the future is already done when this is called, the callback is 225 scheduled with call_soon. 226 """ 227 if self._state != _PENDING: 228 self._loop.call_soon(fn, self, context=context) 229 else: 230 if context is None: 231 context = contextvars.copy_context() 232 self._callbacks.append((fn, context)) 233 234 # New method not in PEP 3148. 235 236 def remove_done_callback(self, fn): 237 """Remove all instances of a callback from the "call when done" list. 238 239 Returns the number of callbacks removed. 240 """ 241 filtered_callbacks = [(f, ctx) 242 for (f, ctx) in self._callbacks 243 if f != fn] 244 removed_count = len(self._callbacks) - len(filtered_callbacks) 245 if removed_count: 246 self._callbacks[:] = filtered_callbacks 247 return removed_count 248 249 # So-called internal methods (note: no set_running_or_notify_cancel()). 250 251 def set_result(self, result): 252 """Mark the future done and set its result. 253 254 If the future is already done when this method is called, raises 255 InvalidStateError. 256 """ 257 if self._state != _PENDING: 258 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 259 self._result = result 260 self._state = _FINISHED 261 self.__schedule_callbacks() 262 263 def set_exception(self, exception): 264 """Mark the future done and set an exception. 265 266 If the future is already done when this method is called, raises 267 InvalidStateError. 268 """ 269 if self._state != _PENDING: 270 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 271 if isinstance(exception, type): 272 exception = exception() 273 if type(exception) is StopIteration: 274 raise TypeError("StopIteration interacts badly with generators " 275 "and cannot be raised into a Future") 276 self._exception = exception 277 self._state = _FINISHED 278 self.__schedule_callbacks() 279 self.__log_traceback = True 280 281 def __await__(self): 282 if not self.done(): 283 self._asyncio_future_blocking = True 284 yield self # This tells Task to wait for completion. 285 if not self.done(): 286 raise RuntimeError("await wasn't used with future") 287 return self.result() # May raise too. 288 289 __iter__ = __await__ # make compatible with 'yield from'. 290 291 292# Needed for testing purposes. 293_PyFuture = Future 294 295 296def _get_loop(fut): 297 # Tries to call Future.get_loop() if it's available. 298 # Otherwise fallbacks to using the old '_loop' property. 299 try: 300 get_loop = fut.get_loop 301 except AttributeError: 302 pass 303 else: 304 return get_loop() 305 return fut._loop 306 307 308def _set_result_unless_cancelled(fut, result): 309 """Helper setting the result only if the future was not cancelled.""" 310 if fut.cancelled(): 311 return 312 fut.set_result(result) 313 314 315def _convert_future_exc(exc): 316 exc_class = type(exc) 317 if exc_class is concurrent.futures.CancelledError: 318 return exceptions.CancelledError(*exc.args) 319 elif exc_class is concurrent.futures.TimeoutError: 320 return exceptions.TimeoutError(*exc.args) 321 elif exc_class is concurrent.futures.InvalidStateError: 322 return exceptions.InvalidStateError(*exc.args) 323 else: 324 return exc 325 326 327def _set_concurrent_future_state(concurrent, source): 328 """Copy state from a future to a concurrent.futures.Future.""" 329 assert source.done() 330 if source.cancelled(): 331 concurrent.cancel() 332 if not concurrent.set_running_or_notify_cancel(): 333 return 334 exception = source.exception() 335 if exception is not None: 336 concurrent.set_exception(_convert_future_exc(exception)) 337 else: 338 result = source.result() 339 concurrent.set_result(result) 340 341 342def _copy_future_state(source, dest): 343 """Internal helper to copy state from another Future. 344 345 The other Future may be a concurrent.futures.Future. 346 """ 347 assert source.done() 348 if dest.cancelled(): 349 return 350 assert not dest.done() 351 if source.cancelled(): 352 dest.cancel() 353 else: 354 exception = source.exception() 355 if exception is not None: 356 dest.set_exception(_convert_future_exc(exception)) 357 else: 358 result = source.result() 359 dest.set_result(result) 360 361 362def _chain_future(source, destination): 363 """Chain two futures so that when one completes, so does the other. 364 365 The result (or exception) of source will be copied to destination. 366 If destination is cancelled, source gets cancelled too. 367 Compatible with both asyncio.Future and concurrent.futures.Future. 368 """ 369 if not isfuture(source) and not isinstance(source, 370 concurrent.futures.Future): 371 raise TypeError('A future is required for source argument') 372 if not isfuture(destination) and not isinstance(destination, 373 concurrent.futures.Future): 374 raise TypeError('A future is required for destination argument') 375 source_loop = _get_loop(source) if isfuture(source) else None 376 dest_loop = _get_loop(destination) if isfuture(destination) else None 377 378 def _set_state(future, other): 379 if isfuture(future): 380 _copy_future_state(other, future) 381 else: 382 _set_concurrent_future_state(future, other) 383 384 def _call_check_cancel(destination): 385 if destination.cancelled(): 386 if source_loop is None or source_loop is dest_loop: 387 source.cancel() 388 else: 389 source_loop.call_soon_threadsafe(source.cancel) 390 391 def _call_set_state(source): 392 if (destination.cancelled() and 393 dest_loop is not None and dest_loop.is_closed()): 394 return 395 if dest_loop is None or dest_loop is source_loop: 396 _set_state(destination, source) 397 else: 398 dest_loop.call_soon_threadsafe(_set_state, destination, source) 399 400 destination.add_done_callback(_call_check_cancel) 401 source.add_done_callback(_call_set_state) 402 403 404def wrap_future(future, *, loop=None): 405 """Wrap concurrent.futures.Future object.""" 406 if isfuture(future): 407 return future 408 assert isinstance(future, concurrent.futures.Future), \ 409 f'concurrent.futures.Future is expected, got {future!r}' 410 if loop is None: 411 loop = events._get_event_loop() 412 new_future = loop.create_future() 413 _chain_future(future, new_future) 414 return new_future 415 416 417try: 418 import _asyncio 419except ImportError: 420 pass 421else: 422 # _CFuture is needed for tests. 423 Future = _CFuture = _asyncio.Future 424