1"""A Future class similar to the one in PEP 3148.""" 2 3__all__ = ( 4 'Future', 'wrap_future', 'isfuture', 5) 6 7import concurrent.futures 8import contextvars 9import logging 10import sys 11from types import GenericAlias 12 13from . import base_futures 14from . import events 15from . import exceptions 16from . import format_helpers 17 18 19isfuture = base_futures.isfuture 20 21 22_PENDING = base_futures._PENDING 23_CANCELLED = base_futures._CANCELLED 24_FINISHED = base_futures._FINISHED 25 26 27STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 28 29 30class Future: 31 """This class is *almost* compatible with concurrent.futures.Future. 32 33 Differences: 34 35 - This class is not thread-safe. 36 37 - result() and exception() do not take a timeout argument and 38 raise an exception when the future isn't done yet. 39 40 - Callbacks registered with add_done_callback() are always called 41 via the event loop's call_soon(). 42 43 - This class is not compatible with the wait() and as_completed() 44 methods in the concurrent.futures package. 45 46 (In Python 3.4 or later we may be able to unify the implementations.) 47 """ 48 49 # Class variables serving as defaults for instance variables. 50 _state = _PENDING 51 _result = None 52 _exception = None 53 _loop = None 54 _source_traceback = None 55 _cancel_message = None 56 # A saved CancelledError for later chaining as an exception context. 57 _cancelled_exc = None 58 59 # This field is used for a dual purpose: 60 # - Its presence is a marker to declare that a class implements 61 # the Future protocol (i.e. is intended to be duck-type compatible). 62 # The value must also be not-None, to enable a subclass to declare 63 # that it is not compatible by setting this to None. 64 # - It is set by __iter__() below so that Task._step() can tell 65 # the difference between 66 # `await Future()` or`yield from Future()` (correct) vs. 67 # `yield Future()` (incorrect). 68 _asyncio_future_blocking = False 69 70 __log_traceback = False 71 72 def __init__(self, *, loop=None): 73 """Initialize the future. 74 75 The optional event_loop argument allows explicitly setting the event 76 loop object used by the future. If it's not provided, the future uses 77 the default event loop. 78 """ 79 if loop is None: 80 self._loop = events.get_event_loop() 81 else: 82 self._loop = loop 83 self._callbacks = [] 84 if self._loop.get_debug(): 85 self._source_traceback = format_helpers.extract_stack( 86 sys._getframe(1)) 87 88 def __repr__(self): 89 return base_futures._future_repr(self) 90 91 def __del__(self): 92 if not self.__log_traceback: 93 # set_exception() was not called, or result() or exception() 94 # has consumed the exception 95 return 96 exc = self._exception 97 context = { 98 'message': 99 f'{self.__class__.__name__} exception was never retrieved', 100 'exception': exc, 101 'future': self, 102 } 103 if self._source_traceback: 104 context['source_traceback'] = self._source_traceback 105 self._loop.call_exception_handler(context) 106 107 __class_getitem__ = classmethod(GenericAlias) 108 109 @property 110 def _log_traceback(self): 111 return self.__log_traceback 112 113 @_log_traceback.setter 114 def _log_traceback(self, val): 115 if val: 116 raise ValueError('_log_traceback can only be set to False') 117 self.__log_traceback = False 118 119 def get_loop(self): 120 """Return the event loop the Future is bound to.""" 121 loop = self._loop 122 if loop is None: 123 raise RuntimeError("Future object is not initialized.") 124 return loop 125 126 def _make_cancelled_error(self): 127 """Create the CancelledError to raise if the Future is cancelled. 128 129 This should only be called once when handling a cancellation since 130 it erases the saved context exception value. 131 """ 132 if self._cancelled_exc is not None: 133 exc = self._cancelled_exc 134 self._cancelled_exc = None 135 return exc 136 137 if self._cancel_message is None: 138 exc = exceptions.CancelledError() 139 else: 140 exc = exceptions.CancelledError(self._cancel_message) 141 return exc 142 143 def cancel(self, msg=None): 144 """Cancel the future and schedule callbacks. 145 146 If the future is already done or cancelled, return False. Otherwise, 147 change the future's state to cancelled, schedule the callbacks and 148 return True. 149 """ 150 self.__log_traceback = False 151 if self._state != _PENDING: 152 return False 153 self._state = _CANCELLED 154 self._cancel_message = msg 155 self.__schedule_callbacks() 156 return True 157 158 def __schedule_callbacks(self): 159 """Internal: Ask the event loop to call all callbacks. 160 161 The callbacks are scheduled to be called as soon as possible. Also 162 clears the callback list. 163 """ 164 callbacks = self._callbacks[:] 165 if not callbacks: 166 return 167 168 self._callbacks[:] = [] 169 for callback, ctx in callbacks: 170 self._loop.call_soon(callback, self, context=ctx) 171 172 def cancelled(self): 173 """Return True if the future was cancelled.""" 174 return self._state == _CANCELLED 175 176 # Don't implement running(); see http://bugs.python.org/issue18699 177 178 def done(self): 179 """Return True if the future is done. 180 181 Done means either that a result / exception are available, or that the 182 future was cancelled. 183 """ 184 return self._state != _PENDING 185 186 def result(self): 187 """Return the result this future represents. 188 189 If the future has been cancelled, raises CancelledError. If the 190 future's result isn't yet available, raises InvalidStateError. If 191 the future is done and has an exception set, this exception is raised. 192 """ 193 if self._state == _CANCELLED: 194 raise self._make_cancelled_error() 195 if self._state != _FINISHED: 196 raise exceptions.InvalidStateError('Result is not ready.') 197 self.__log_traceback = False 198 if self._exception is not None: 199 raise self._exception.with_traceback(self._exception_tb) 200 return self._result 201 202 def exception(self): 203 """Return the exception that was set on this future. 204 205 The exception (or None if no exception was set) is returned only if 206 the future is done. If the future has been cancelled, raises 207 CancelledError. If the future isn't done yet, raises 208 InvalidStateError. 209 """ 210 if self._state == _CANCELLED: 211 raise self._make_cancelled_error() 212 if self._state != _FINISHED: 213 raise exceptions.InvalidStateError('Exception is not set.') 214 self.__log_traceback = False 215 return self._exception 216 217 def add_done_callback(self, fn, *, context=None): 218 """Add a callback to be run when the future becomes done. 219 220 The callback is called with a single argument - the future object. If 221 the future is already done when this is called, the callback is 222 scheduled with call_soon. 223 """ 224 if self._state != _PENDING: 225 self._loop.call_soon(fn, self, context=context) 226 else: 227 if context is None: 228 context = contextvars.copy_context() 229 self._callbacks.append((fn, context)) 230 231 # New method not in PEP 3148. 232 233 def remove_done_callback(self, fn): 234 """Remove all instances of a callback from the "call when done" list. 235 236 Returns the number of callbacks removed. 237 """ 238 filtered_callbacks = [(f, ctx) 239 for (f, ctx) in self._callbacks 240 if f != fn] 241 removed_count = len(self._callbacks) - len(filtered_callbacks) 242 if removed_count: 243 self._callbacks[:] = filtered_callbacks 244 return removed_count 245 246 # So-called internal methods (note: no set_running_or_notify_cancel()). 247 248 def set_result(self, result): 249 """Mark the future done and set its result. 250 251 If the future is already done when this method is called, raises 252 InvalidStateError. 253 """ 254 if self._state != _PENDING: 255 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 256 self._result = result 257 self._state = _FINISHED 258 self.__schedule_callbacks() 259 260 def set_exception(self, exception): 261 """Mark the future done and set an exception. 262 263 If the future is already done when this method is called, raises 264 InvalidStateError. 265 """ 266 if self._state != _PENDING: 267 raise exceptions.InvalidStateError(f'{self._state}: {self!r}') 268 if isinstance(exception, type): 269 exception = exception() 270 if isinstance(exception, StopIteration): 271 new_exc = RuntimeError("StopIteration interacts badly with " 272 "generators and cannot be raised into a " 273 "Future") 274 new_exc.__cause__ = exception 275 new_exc.__context__ = exception 276 exception = new_exc 277 self._exception = exception 278 self._exception_tb = exception.__traceback__ 279 self._state = _FINISHED 280 self.__schedule_callbacks() 281 self.__log_traceback = True 282 283 def __await__(self): 284 if not self.done(): 285 self._asyncio_future_blocking = True 286 yield self # This tells Task to wait for completion. 287 if not self.done(): 288 raise RuntimeError("await wasn't used with future") 289 return self.result() # May raise too. 290 291 __iter__ = __await__ # make compatible with 'yield from'. 292 293 294# Needed for testing purposes. 295_PyFuture = Future 296 297 298def _get_loop(fut): 299 # Tries to call Future.get_loop() if it's available. 300 # Otherwise fallbacks to using the old '_loop' property. 301 try: 302 get_loop = fut.get_loop 303 except AttributeError: 304 pass 305 else: 306 return get_loop() 307 return fut._loop 308 309 310def _set_result_unless_cancelled(fut, result): 311 """Helper setting the result only if the future was not cancelled.""" 312 if fut.cancelled(): 313 return 314 fut.set_result(result) 315 316 317def _convert_future_exc(exc): 318 exc_class = type(exc) 319 if exc_class is concurrent.futures.CancelledError: 320 return exceptions.CancelledError(*exc.args).with_traceback(exc.__traceback__) 321 elif exc_class is concurrent.futures.InvalidStateError: 322 return exceptions.InvalidStateError(*exc.args).with_traceback(exc.__traceback__) 323 else: 324 return exc 325 326 327def _set_concurrent_future_state(concurrent, source): 328 """Copy state from a future to a concurrent.futures.Future.""" 329 assert source.done() 330 if source.cancelled(): 331 concurrent.cancel() 332 if not concurrent.set_running_or_notify_cancel(): 333 return 334 exception = source.exception() 335 if exception is not None: 336 concurrent.set_exception(_convert_future_exc(exception)) 337 else: 338 result = source.result() 339 concurrent.set_result(result) 340 341 342def _copy_future_state(source, dest): 343 """Internal helper to copy state from another Future. 344 345 The other Future may be a concurrent.futures.Future. 346 """ 347 assert source.done() 348 if dest.cancelled(): 349 return 350 assert not dest.done() 351 if source.cancelled(): 352 dest.cancel() 353 else: 354 exception = source.exception() 355 if exception is not None: 356 dest.set_exception(_convert_future_exc(exception)) 357 else: 358 result = source.result() 359 dest.set_result(result) 360 361 362def _chain_future(source, destination): 363 """Chain two futures so that when one completes, so does the other. 364 365 The result (or exception) of source will be copied to destination. 366 If destination is cancelled, source gets cancelled too. 367 Compatible with both asyncio.Future and concurrent.futures.Future. 368 """ 369 if not isfuture(source) and not isinstance(source, 370 concurrent.futures.Future): 371 raise TypeError('A future is required for source argument') 372 if not isfuture(destination) and not isinstance(destination, 373 concurrent.futures.Future): 374 raise TypeError('A future is required for destination argument') 375 source_loop = _get_loop(source) if isfuture(source) else None 376 dest_loop = _get_loop(destination) if isfuture(destination) else None 377 378 def _set_state(future, other): 379 if isfuture(future): 380 _copy_future_state(other, future) 381 else: 382 _set_concurrent_future_state(future, other) 383 384 def _call_check_cancel(destination): 385 if destination.cancelled(): 386 if source_loop is None or source_loop is dest_loop: 387 source.cancel() 388 else: 389 source_loop.call_soon_threadsafe(source.cancel) 390 391 def _call_set_state(source): 392 if (destination.cancelled() and 393 dest_loop is not None and dest_loop.is_closed()): 394 return 395 if dest_loop is None or dest_loop is source_loop: 396 _set_state(destination, source) 397 else: 398 if dest_loop.is_closed(): 399 return 400 dest_loop.call_soon_threadsafe(_set_state, destination, source) 401 402 destination.add_done_callback(_call_check_cancel) 403 source.add_done_callback(_call_set_state) 404 405 406def wrap_future(future, *, loop=None): 407 """Wrap concurrent.futures.Future object.""" 408 if isfuture(future): 409 return future 410 assert isinstance(future, concurrent.futures.Future), \ 411 f'concurrent.futures.Future is expected, got {future!r}' 412 if loop is None: 413 loop = events.get_event_loop() 414 new_future = loop.create_future() 415 _chain_future(future, new_future) 416 return new_future 417 418 419try: 420 import _asyncio 421except ImportError: 422 pass 423else: 424 # _CFuture is needed for tests. 425 Future = _CFuture = _asyncio.Future 426