1"""A Future class similar to the one in PEP 3148.""" 2 3__all__ = ['CancelledError', 'TimeoutError', 'InvalidStateError', 4 'Future', 'wrap_future', 'isfuture'] 5 6import concurrent.futures 7import logging 8import sys 9import traceback 10 11from . import base_futures 12from . import compat 13from . import events 14 15 16CancelledError = base_futures.CancelledError 17InvalidStateError = base_futures.InvalidStateError 18TimeoutError = base_futures.TimeoutError 19isfuture = base_futures.isfuture 20 21 22_PENDING = base_futures._PENDING 23_CANCELLED = base_futures._CANCELLED 24_FINISHED = base_futures._FINISHED 25 26 27STACK_DEBUG = logging.DEBUG - 1 # heavy-duty debugging 28 29 30class _TracebackLogger: 31 """Helper to log a traceback upon destruction if not cleared. 32 33 This solves a nasty problem with Futures and Tasks that have an 34 exception set: if nobody asks for the exception, the exception is 35 never logged. This violates the Zen of Python: 'Errors should 36 never pass silently. Unless explicitly silenced.' 37 38 However, we don't want to log the exception as soon as 39 set_exception() is called: if the calling code is written 40 properly, it will get the exception and handle it properly. But 41 we *do* want to log it if result() or exception() was never called 42 -- otherwise developers waste a lot of time wondering why their 43 buggy code fails silently. 44 45 An earlier attempt added a __del__() method to the Future class 46 itself, but this backfired because the presence of __del__() 47 prevents garbage collection from breaking cycles. A way out of 48 this catch-22 is to avoid having a __del__() method on the Future 49 class itself, but instead to have a reference to a helper object 50 with a __del__() method that logs the traceback, where we ensure 51 that the helper object doesn't participate in cycles, and only the 52 Future has a reference to it. 53 54 The helper object is added when set_exception() is called. When 55 the Future is collected, and the helper is present, the helper 56 object is also collected, and its __del__() method will log the 57 traceback. When the Future's result() or exception() method is 58 called (and a helper object is present), it removes the helper 59 object, after calling its clear() method to prevent it from 60 logging. 61 62 One downside is that we do a fair amount of work to extract the 63 traceback from the exception, even when it is never logged. It 64 would seem cheaper to just store the exception object, but that 65 references the traceback, which references stack frames, which may 66 reference the Future, which references the _TracebackLogger, and 67 then the _TracebackLogger would be included in a cycle, which is 68 what we're trying to avoid! As an optimization, we don't 69 immediately format the exception; we only do the work when 70 activate() is called, which call is delayed until after all the 71 Future's callbacks have run. Since usually a Future has at least 72 one callback (typically set by 'yield from') and usually that 73 callback extracts the callback, thereby removing the need to 74 format the exception. 75 76 PS. I don't claim credit for this solution. I first heard of it 77 in a discussion about closing files when they are collected. 78 """ 79 80 __slots__ = ('loop', 'source_traceback', 'exc', 'tb') 81 82 def __init__(self, future, exc): 83 self.loop = future._loop 84 self.source_traceback = future._source_traceback 85 self.exc = exc 86 self.tb = None 87 88 def activate(self): 89 exc = self.exc 90 if exc is not None: 91 self.exc = None 92 self.tb = traceback.format_exception(exc.__class__, exc, 93 exc.__traceback__) 94 95 def clear(self): 96 self.exc = None 97 self.tb = None 98 99 def __del__(self): 100 if self.tb: 101 msg = 'Future/Task exception was never retrieved\n' 102 if self.source_traceback: 103 src = ''.join(traceback.format_list(self.source_traceback)) 104 msg += 'Future/Task created at (most recent call last):\n' 105 msg += '%s\n' % src.rstrip() 106 msg += ''.join(self.tb).rstrip() 107 self.loop.call_exception_handler({'message': msg}) 108 109 110class Future: 111 """This class is *almost* compatible with concurrent.futures.Future. 112 113 Differences: 114 115 - result() and exception() do not take a timeout argument and 116 raise an exception when the future isn't done yet. 117 118 - Callbacks registered with add_done_callback() are always called 119 via the event loop's call_soon_threadsafe(). 120 121 - This class is not compatible with the wait() and as_completed() 122 methods in the concurrent.futures package. 123 124 (In Python 3.4 or later we may be able to unify the implementations.) 125 """ 126 127 # Class variables serving as defaults for instance variables. 128 _state = _PENDING 129 _result = None 130 _exception = None 131 _loop = None 132 _source_traceback = None 133 134 # This field is used for a dual purpose: 135 # - Its presence is a marker to declare that a class implements 136 # the Future protocol (i.e. is intended to be duck-type compatible). 137 # The value must also be not-None, to enable a subclass to declare 138 # that it is not compatible by setting this to None. 139 # - It is set by __iter__() below so that Task._step() can tell 140 # the difference between `yield from Future()` (correct) vs. 141 # `yield Future()` (incorrect). 142 _asyncio_future_blocking = False 143 144 _log_traceback = False # Used for Python 3.4 and later 145 _tb_logger = None # Used for Python 3.3 only 146 147 def __init__(self, *, loop=None): 148 """Initialize the future. 149 150 The optional event_loop argument allows explicitly setting the event 151 loop object used by the future. If it's not provided, the future uses 152 the default event loop. 153 """ 154 if loop is None: 155 self._loop = events.get_event_loop() 156 else: 157 self._loop = loop 158 self._callbacks = [] 159 if self._loop.get_debug(): 160 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 161 162 _repr_info = base_futures._future_repr_info 163 164 def __repr__(self): 165 return '<%s %s>' % (self.__class__.__name__, ' '.join(self._repr_info())) 166 167 # On Python 3.3 and older, objects with a destructor part of a reference 168 # cycle are never destroyed. It's not more the case on Python 3.4 thanks 169 # to the PEP 442. 170 if compat.PY34: 171 def __del__(self): 172 if not self._log_traceback: 173 # set_exception() was not called, or result() or exception() 174 # has consumed the exception 175 return 176 exc = self._exception 177 context = { 178 'message': ('%s exception was never retrieved' 179 % self.__class__.__name__), 180 'exception': exc, 181 'future': self, 182 } 183 if self._source_traceback: 184 context['source_traceback'] = self._source_traceback 185 self._loop.call_exception_handler(context) 186 187 def cancel(self): 188 """Cancel the future and schedule callbacks. 189 190 If the future is already done or cancelled, return False. Otherwise, 191 change the future's state to cancelled, schedule the callbacks and 192 return True. 193 """ 194 if self._state != _PENDING: 195 return False 196 self._state = _CANCELLED 197 self._schedule_callbacks() 198 return True 199 200 def _schedule_callbacks(self): 201 """Internal: Ask the event loop to call all callbacks. 202 203 The callbacks are scheduled to be called as soon as possible. Also 204 clears the callback list. 205 """ 206 callbacks = self._callbacks[:] 207 if not callbacks: 208 return 209 210 self._callbacks[:] = [] 211 for callback in callbacks: 212 self._loop.call_soon(callback, self) 213 214 def cancelled(self): 215 """Return True if the future was cancelled.""" 216 return self._state == _CANCELLED 217 218 # Don't implement running(); see http://bugs.python.org/issue18699 219 220 def done(self): 221 """Return True if the future is done. 222 223 Done means either that a result / exception are available, or that the 224 future was cancelled. 225 """ 226 return self._state != _PENDING 227 228 def result(self): 229 """Return the result this future represents. 230 231 If the future has been cancelled, raises CancelledError. If the 232 future's result isn't yet available, raises InvalidStateError. If 233 the future is done and has an exception set, this exception is raised. 234 """ 235 if self._state == _CANCELLED: 236 raise CancelledError 237 if self._state != _FINISHED: 238 raise InvalidStateError('Result is not ready.') 239 self._log_traceback = False 240 if self._tb_logger is not None: 241 self._tb_logger.clear() 242 self._tb_logger = None 243 if self._exception is not None: 244 raise self._exception 245 return self._result 246 247 def exception(self): 248 """Return the exception that was set on this future. 249 250 The exception (or None if no exception was set) is returned only if 251 the future is done. If the future has been cancelled, raises 252 CancelledError. If the future isn't done yet, raises 253 InvalidStateError. 254 """ 255 if self._state == _CANCELLED: 256 raise CancelledError 257 if self._state != _FINISHED: 258 raise InvalidStateError('Exception is not set.') 259 self._log_traceback = False 260 if self._tb_logger is not None: 261 self._tb_logger.clear() 262 self._tb_logger = None 263 return self._exception 264 265 def add_done_callback(self, fn): 266 """Add a callback to be run when the future becomes done. 267 268 The callback is called with a single argument - the future object. If 269 the future is already done when this is called, the callback is 270 scheduled with call_soon. 271 """ 272 if self._state != _PENDING: 273 self._loop.call_soon(fn, self) 274 else: 275 self._callbacks.append(fn) 276 277 # New method not in PEP 3148. 278 279 def remove_done_callback(self, fn): 280 """Remove all instances of a callback from the "call when done" list. 281 282 Returns the number of callbacks removed. 283 """ 284 filtered_callbacks = [f for f in self._callbacks if f != fn] 285 removed_count = len(self._callbacks) - len(filtered_callbacks) 286 if removed_count: 287 self._callbacks[:] = filtered_callbacks 288 return removed_count 289 290 # So-called internal methods (note: no set_running_or_notify_cancel()). 291 292 def set_result(self, result): 293 """Mark the future done and set its result. 294 295 If the future is already done when this method is called, raises 296 InvalidStateError. 297 """ 298 if self._state != _PENDING: 299 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 300 self._result = result 301 self._state = _FINISHED 302 self._schedule_callbacks() 303 304 def set_exception(self, exception): 305 """Mark the future done and set an exception. 306 307 If the future is already done when this method is called, raises 308 InvalidStateError. 309 """ 310 if self._state != _PENDING: 311 raise InvalidStateError('{}: {!r}'.format(self._state, self)) 312 if isinstance(exception, type): 313 exception = exception() 314 if type(exception) is StopIteration: 315 raise TypeError("StopIteration interacts badly with generators " 316 "and cannot be raised into a Future") 317 self._exception = exception 318 self._state = _FINISHED 319 self._schedule_callbacks() 320 if compat.PY34: 321 self._log_traceback = True 322 else: 323 self._tb_logger = _TracebackLogger(self, exception) 324 # Arrange for the logger to be activated after all callbacks 325 # have had a chance to call result() or exception(). 326 self._loop.call_soon(self._tb_logger.activate) 327 328 def __iter__(self): 329 if not self.done(): 330 self._asyncio_future_blocking = True 331 yield self # This tells Task to wait for completion. 332 assert self.done(), "yield from wasn't used with future" 333 return self.result() # May raise too. 334 335 if compat.PY35: 336 __await__ = __iter__ # make compatible with 'await' expression 337 338 339# Needed for testing purposes. 340_PyFuture = Future 341 342 343def _set_result_unless_cancelled(fut, result): 344 """Helper setting the result only if the future was not cancelled.""" 345 if fut.cancelled(): 346 return 347 fut.set_result(result) 348 349 350def _set_concurrent_future_state(concurrent, source): 351 """Copy state from a future to a concurrent.futures.Future.""" 352 assert source.done() 353 if source.cancelled(): 354 concurrent.cancel() 355 if not concurrent.set_running_or_notify_cancel(): 356 return 357 exception = source.exception() 358 if exception is not None: 359 concurrent.set_exception(exception) 360 else: 361 result = source.result() 362 concurrent.set_result(result) 363 364 365def _copy_future_state(source, dest): 366 """Internal helper to copy state from another Future. 367 368 The other Future may be a concurrent.futures.Future. 369 """ 370 assert source.done() 371 if dest.cancelled(): 372 return 373 assert not dest.done() 374 if source.cancelled(): 375 dest.cancel() 376 else: 377 exception = source.exception() 378 if exception is not None: 379 dest.set_exception(exception) 380 else: 381 result = source.result() 382 dest.set_result(result) 383 384 385def _chain_future(source, destination): 386 """Chain two futures so that when one completes, so does the other. 387 388 The result (or exception) of source will be copied to destination. 389 If destination is cancelled, source gets cancelled too. 390 Compatible with both asyncio.Future and concurrent.futures.Future. 391 """ 392 if not isfuture(source) and not isinstance(source, 393 concurrent.futures.Future): 394 raise TypeError('A future is required for source argument') 395 if not isfuture(destination) and not isinstance(destination, 396 concurrent.futures.Future): 397 raise TypeError('A future is required for destination argument') 398 source_loop = source._loop if isfuture(source) else None 399 dest_loop = destination._loop if isfuture(destination) else None 400 401 def _set_state(future, other): 402 if isfuture(future): 403 _copy_future_state(other, future) 404 else: 405 _set_concurrent_future_state(future, other) 406 407 def _call_check_cancel(destination): 408 if destination.cancelled(): 409 if source_loop is None or source_loop is dest_loop: 410 source.cancel() 411 else: 412 source_loop.call_soon_threadsafe(source.cancel) 413 414 def _call_set_state(source): 415 if dest_loop is None or dest_loop is source_loop: 416 _set_state(destination, source) 417 else: 418 dest_loop.call_soon_threadsafe(_set_state, destination, source) 419 420 destination.add_done_callback(_call_check_cancel) 421 source.add_done_callback(_call_set_state) 422 423 424def wrap_future(future, *, loop=None): 425 """Wrap concurrent.futures.Future object.""" 426 if isfuture(future): 427 return future 428 assert isinstance(future, concurrent.futures.Future), \ 429 'concurrent.futures.Future is expected, got {!r}'.format(future) 430 if loop is None: 431 loop = events.get_event_loop() 432 new_future = loop.create_future() 433 _chain_future(future, new_future) 434 return new_future 435 436 437try: 438 import _asyncio 439except ImportError: 440 pass 441else: 442 # _CFuture is needed for tests. 443 Future = _CFuture = _asyncio.Future 444