1__all__ = ['coroutine', 2 'iscoroutinefunction', 'iscoroutine'] 3 4import functools 5import inspect 6import opcode 7import os 8import sys 9import traceback 10import types 11 12from . import compat 13from . import events 14from . import base_futures 15from .log import logger 16 17 18# Opcode of "yield from" instruction 19_YIELD_FROM = opcode.opmap['YIELD_FROM'] 20 21# If you set _DEBUG to true, @coroutine will wrap the resulting 22# generator objects in a CoroWrapper instance (defined below). That 23# instance will log a message when the generator is never iterated 24# over, which may happen when you forget to use "yield from" with a 25# coroutine call. Note that the value of the _DEBUG flag is taken 26# when the decorator is used, so to be of any use it must be set 27# before you define your coroutines. A downside of using this feature 28# is that tracebacks show entries for the CoroWrapper.__next__ method 29# when _DEBUG is true. 30_DEBUG = (not sys.flags.ignore_environment and 31 bool(os.environ.get('PYTHONASYNCIODEBUG'))) 32 33 34try: 35 _types_coroutine = types.coroutine 36 _types_CoroutineType = types.CoroutineType 37except AttributeError: 38 # Python 3.4 39 _types_coroutine = None 40 _types_CoroutineType = None 41 42try: 43 _inspect_iscoroutinefunction = inspect.iscoroutinefunction 44except AttributeError: 45 # Python 3.4 46 _inspect_iscoroutinefunction = lambda func: False 47 48try: 49 from collections.abc import Coroutine as _CoroutineABC, \ 50 Awaitable as _AwaitableABC 51except ImportError: 52 _CoroutineABC = _AwaitableABC = None 53 54 55# Check for CPython issue #21209 56def has_yield_from_bug(): 57 class MyGen: 58 def __init__(self): 59 self.send_args = None 60 def __iter__(self): 61 return self 62 def __next__(self): 63 return 42 64 def send(self, *what): 65 self.send_args = what 66 return None 67 def yield_from_gen(gen): 68 yield from gen 69 value = (1, 2, 3) 70 gen = MyGen() 71 coro = yield_from_gen(gen) 72 next(coro) 73 coro.send(value) 74 return gen.send_args != (value,) 75_YIELD_FROM_BUG = has_yield_from_bug() 76del has_yield_from_bug 77 78 79def debug_wrapper(gen): 80 # This function is called from 'sys.set_coroutine_wrapper'. 81 # We only wrap here coroutines defined via 'async def' syntax. 82 # Generator-based coroutines are wrapped in @coroutine 83 # decorator. 84 return CoroWrapper(gen, None) 85 86 87class CoroWrapper: 88 # Wrapper for coroutine object in _DEBUG mode. 89 90 def __init__(self, gen, func=None): 91 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen 92 self.gen = gen 93 self.func = func # Used to unwrap @coroutine decorator 94 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 95 self.__name__ = getattr(gen, '__name__', None) 96 self.__qualname__ = getattr(gen, '__qualname__', None) 97 98 def __repr__(self): 99 coro_repr = _format_coroutine(self) 100 if self._source_traceback: 101 frame = self._source_traceback[-1] 102 coro_repr += ', created at %s:%s' % (frame[0], frame[1]) 103 return '<%s %s>' % (self.__class__.__name__, coro_repr) 104 105 def __iter__(self): 106 return self 107 108 def __next__(self): 109 return self.gen.send(None) 110 111 if _YIELD_FROM_BUG: 112 # For for CPython issue #21209: using "yield from" and a custom 113 # generator, generator.send(tuple) unpacks the tuple instead of passing 114 # the tuple unchanged. Check if the caller is a generator using "yield 115 # from" to decide if the parameter should be unpacked or not. 116 def send(self, *value): 117 frame = sys._getframe() 118 caller = frame.f_back 119 assert caller.f_lasti >= 0 120 if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM: 121 value = value[0] 122 return self.gen.send(value) 123 else: 124 def send(self, value): 125 return self.gen.send(value) 126 127 def throw(self, type, value=None, traceback=None): 128 return self.gen.throw(type, value, traceback) 129 130 def close(self): 131 return self.gen.close() 132 133 @property 134 def gi_frame(self): 135 return self.gen.gi_frame 136 137 @property 138 def gi_running(self): 139 return self.gen.gi_running 140 141 @property 142 def gi_code(self): 143 return self.gen.gi_code 144 145 if compat.PY35: 146 147 def __await__(self): 148 cr_await = getattr(self.gen, 'cr_await', None) 149 if cr_await is not None: 150 raise RuntimeError( 151 "Cannot await on coroutine {!r} while it's " 152 "awaiting for {!r}".format(self.gen, cr_await)) 153 return self 154 155 @property 156 def gi_yieldfrom(self): 157 return self.gen.gi_yieldfrom 158 159 @property 160 def cr_await(self): 161 return self.gen.cr_await 162 163 @property 164 def cr_running(self): 165 return self.gen.cr_running 166 167 @property 168 def cr_code(self): 169 return self.gen.cr_code 170 171 @property 172 def cr_frame(self): 173 return self.gen.cr_frame 174 175 def __del__(self): 176 # Be careful accessing self.gen.frame -- self.gen might not exist. 177 gen = getattr(self, 'gen', None) 178 frame = getattr(gen, 'gi_frame', None) 179 if frame is None: 180 frame = getattr(gen, 'cr_frame', None) 181 if frame is not None and frame.f_lasti == -1: 182 msg = '%r was never yielded from' % self 183 tb = getattr(self, '_source_traceback', ()) 184 if tb: 185 tb = ''.join(traceback.format_list(tb)) 186 msg += ('\nCoroutine object created at ' 187 '(most recent call last):\n') 188 msg += tb.rstrip() 189 logger.error(msg) 190 191 192def coroutine(func): 193 """Decorator to mark coroutines. 194 195 If the coroutine is not yielded from before it is destroyed, 196 an error message is logged. 197 """ 198 if _inspect_iscoroutinefunction(func): 199 # In Python 3.5 that's all we need to do for coroutines 200 # defiend with "async def". 201 # Wrapping in CoroWrapper will happen via 202 # 'sys.set_coroutine_wrapper' function. 203 return func 204 205 if inspect.isgeneratorfunction(func): 206 coro = func 207 else: 208 @functools.wraps(func) 209 def coro(*args, **kw): 210 res = func(*args, **kw) 211 if (base_futures.isfuture(res) or inspect.isgenerator(res) or 212 isinstance(res, CoroWrapper)): 213 res = yield from res 214 elif _AwaitableABC is not None: 215 # If 'func' returns an Awaitable (new in 3.5) we 216 # want to run it. 217 try: 218 await_meth = res.__await__ 219 except AttributeError: 220 pass 221 else: 222 if isinstance(res, _AwaitableABC): 223 res = yield from await_meth() 224 return res 225 226 if not _DEBUG: 227 if _types_coroutine is None: 228 wrapper = coro 229 else: 230 wrapper = _types_coroutine(coro) 231 else: 232 @functools.wraps(func) 233 def wrapper(*args, **kwds): 234 w = CoroWrapper(coro(*args, **kwds), func=func) 235 if w._source_traceback: 236 del w._source_traceback[-1] 237 # Python < 3.5 does not implement __qualname__ 238 # on generator objects, so we set it manually. 239 # We use getattr as some callables (such as 240 # functools.partial may lack __qualname__). 241 w.__name__ = getattr(func, '__name__', None) 242 w.__qualname__ = getattr(func, '__qualname__', None) 243 return w 244 245 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction(). 246 return wrapper 247 248 249# A marker for iscoroutinefunction. 250_is_coroutine = object() 251 252 253def iscoroutinefunction(func): 254 """Return True if func is a decorated coroutine function.""" 255 return (getattr(func, '_is_coroutine', None) is _is_coroutine or 256 _inspect_iscoroutinefunction(func)) 257 258 259_COROUTINE_TYPES = (types.GeneratorType, CoroWrapper) 260if _CoroutineABC is not None: 261 _COROUTINE_TYPES += (_CoroutineABC,) 262if _types_CoroutineType is not None: 263 # Prioritize native coroutine check to speed-up 264 # asyncio.iscoroutine. 265 _COROUTINE_TYPES = (_types_CoroutineType,) + _COROUTINE_TYPES 266 267 268def iscoroutine(obj): 269 """Return True if obj is a coroutine object.""" 270 return isinstance(obj, _COROUTINE_TYPES) 271 272 273def _format_coroutine(coro): 274 assert iscoroutine(coro) 275 276 if not hasattr(coro, 'cr_code') and not hasattr(coro, 'gi_code'): 277 # Most likely a built-in type or a Cython coroutine. 278 279 # Built-in types might not have __qualname__ or __name__. 280 coro_name = getattr( 281 coro, '__qualname__', 282 getattr(coro, '__name__', type(coro).__name__)) 283 coro_name = '{}()'.format(coro_name) 284 285 running = False 286 try: 287 running = coro.cr_running 288 except AttributeError: 289 try: 290 running = coro.gi_running 291 except AttributeError: 292 pass 293 294 if running: 295 return '{} running'.format(coro_name) 296 else: 297 return coro_name 298 299 coro_name = None 300 if isinstance(coro, CoroWrapper): 301 func = coro.func 302 coro_name = coro.__qualname__ 303 if coro_name is not None: 304 coro_name = '{}()'.format(coro_name) 305 else: 306 func = coro 307 308 if coro_name is None: 309 coro_name = events._format_callback(func, (), {}) 310 311 try: 312 coro_code = coro.gi_code 313 except AttributeError: 314 coro_code = coro.cr_code 315 316 try: 317 coro_frame = coro.gi_frame 318 except AttributeError: 319 coro_frame = coro.cr_frame 320 321 filename = coro_code.co_filename 322 lineno = 0 323 if (isinstance(coro, CoroWrapper) and 324 not inspect.isgeneratorfunction(coro.func) and 325 coro.func is not None): 326 source = events._get_function_source(coro.func) 327 if source is not None: 328 filename, lineno = source 329 if coro_frame is None: 330 coro_repr = ('%s done, defined at %s:%s' 331 % (coro_name, filename, lineno)) 332 else: 333 coro_repr = ('%s running, defined at %s:%s' 334 % (coro_name, filename, lineno)) 335 elif coro_frame is not None: 336 lineno = coro_frame.f_lineno 337 coro_repr = ('%s running at %s:%s' 338 % (coro_name, filename, lineno)) 339 else: 340 lineno = coro_code.co_firstlineno 341 coro_repr = ('%s done, defined at %s:%s' 342 % (coro_name, filename, lineno)) 343 344 return coro_repr 345