1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine' 2 3import collections.abc 4import functools 5import inspect 6import os 7import sys 8import traceback 9import types 10 11from . import base_futures 12from . import constants 13from . import format_helpers 14from .log import logger 15 16 17def _is_debug_mode(): 18 # If you set _DEBUG to true, @coroutine will wrap the resulting 19 # generator objects in a CoroWrapper instance (defined below). That 20 # instance will log a message when the generator is never iterated 21 # over, which may happen when you forget to use "await" or "yield from" 22 # with a coroutine call. 23 # Note that the value of the _DEBUG flag is taken 24 # when the decorator is used, so to be of any use it must be set 25 # before you define your coroutines. A downside of using this feature 26 # is that tracebacks show entries for the CoroWrapper.__next__ method 27 # when _DEBUG is true. 28 return sys.flags.dev_mode or (not sys.flags.ignore_environment and 29 bool(os.environ.get('PYTHONASYNCIODEBUG'))) 30 31 32_DEBUG = _is_debug_mode() 33 34 35class CoroWrapper: 36 # Wrapper for coroutine object in _DEBUG mode. 37 38 def __init__(self, gen, func=None): 39 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen 40 self.gen = gen 41 self.func = func # Used to unwrap @coroutine decorator 42 self._source_traceback = format_helpers.extract_stack(sys._getframe(1)) 43 self.__name__ = getattr(gen, '__name__', None) 44 self.__qualname__ = getattr(gen, '__qualname__', None) 45 46 def __repr__(self): 47 coro_repr = _format_coroutine(self) 48 if self._source_traceback: 49 frame = self._source_traceback[-1] 50 coro_repr += f', created at {frame[0]}:{frame[1]}' 51 52 return f'<{self.__class__.__name__} {coro_repr}>' 53 54 def __iter__(self): 55 return self 56 57 def __next__(self): 58 return self.gen.send(None) 59 60 def send(self, value): 61 return self.gen.send(value) 62 63 def throw(self, type, value=None, traceback=None): 64 return self.gen.throw(type, value, traceback) 65 66 def close(self): 67 return self.gen.close() 68 69 @property 70 def gi_frame(self): 71 return self.gen.gi_frame 72 73 @property 74 def gi_running(self): 75 return self.gen.gi_running 76 77 @property 78 def gi_code(self): 79 return self.gen.gi_code 80 81 def __await__(self): 82 return self 83 84 @property 85 def gi_yieldfrom(self): 86 return self.gen.gi_yieldfrom 87 88 def __del__(self): 89 # Be careful accessing self.gen.frame -- self.gen might not exist. 90 gen = getattr(self, 'gen', None) 91 frame = getattr(gen, 'gi_frame', None) 92 if frame is not None and frame.f_lasti == -1: 93 msg = f'{self!r} was never yielded from' 94 tb = getattr(self, '_source_traceback', ()) 95 if tb: 96 tb = ''.join(traceback.format_list(tb)) 97 msg += (f'\nCoroutine object created at ' 98 f'(most recent call last, truncated to ' 99 f'{constants.DEBUG_STACK_DEPTH} last lines):\n') 100 msg += tb.rstrip() 101 logger.error(msg) 102 103 104def coroutine(func): 105 """Decorator to mark coroutines. 106 107 If the coroutine is not yielded from before it is destroyed, 108 an error message is logged. 109 """ 110 if inspect.iscoroutinefunction(func): 111 # In Python 3.5 that's all we need to do for coroutines 112 # defined with "async def". 113 return func 114 115 if inspect.isgeneratorfunction(func): 116 coro = func 117 else: 118 @functools.wraps(func) 119 def coro(*args, **kw): 120 res = func(*args, **kw) 121 if (base_futures.isfuture(res) or inspect.isgenerator(res) or 122 isinstance(res, CoroWrapper)): 123 res = yield from res 124 else: 125 # If 'res' is an awaitable, run it. 126 try: 127 await_meth = res.__await__ 128 except AttributeError: 129 pass 130 else: 131 if isinstance(res, collections.abc.Awaitable): 132 res = yield from await_meth() 133 return res 134 135 coro = types.coroutine(coro) 136 if not _DEBUG: 137 wrapper = coro 138 else: 139 @functools.wraps(func) 140 def wrapper(*args, **kwds): 141 w = CoroWrapper(coro(*args, **kwds), func=func) 142 if w._source_traceback: 143 del w._source_traceback[-1] 144 # Python < 3.5 does not implement __qualname__ 145 # on generator objects, so we set it manually. 146 # We use getattr as some callables (such as 147 # functools.partial may lack __qualname__). 148 w.__name__ = getattr(func, '__name__', None) 149 w.__qualname__ = getattr(func, '__qualname__', None) 150 return w 151 152 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction(). 153 return wrapper 154 155 156# A marker for iscoroutinefunction. 157_is_coroutine = object() 158 159 160def iscoroutinefunction(func): 161 """Return True if func is a decorated coroutine function.""" 162 return (inspect.iscoroutinefunction(func) or 163 getattr(func, '_is_coroutine', None) is _is_coroutine) 164 165 166# Prioritize native coroutine check to speed-up 167# asyncio.iscoroutine. 168_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType, 169 collections.abc.Coroutine, CoroWrapper) 170_iscoroutine_typecache = set() 171 172 173def iscoroutine(obj): 174 """Return True if obj is a coroutine object.""" 175 if type(obj) in _iscoroutine_typecache: 176 return True 177 178 if isinstance(obj, _COROUTINE_TYPES): 179 # Just in case we don't want to cache more than 100 180 # positive types. That shouldn't ever happen, unless 181 # someone stressing the system on purpose. 182 if len(_iscoroutine_typecache) < 100: 183 _iscoroutine_typecache.add(type(obj)) 184 return True 185 else: 186 return False 187 188 189def _format_coroutine(coro): 190 assert iscoroutine(coro) 191 192 is_corowrapper = isinstance(coro, CoroWrapper) 193 194 def get_name(coro): 195 # Coroutines compiled with Cython sometimes don't have 196 # proper __qualname__ or __name__. While that is a bug 197 # in Cython, asyncio shouldn't crash with an AttributeError 198 # in its __repr__ functions. 199 if is_corowrapper: 200 return format_helpers._format_callback(coro.func, (), {}) 201 202 if hasattr(coro, '__qualname__') and coro.__qualname__: 203 coro_name = coro.__qualname__ 204 elif hasattr(coro, '__name__') and coro.__name__: 205 coro_name = coro.__name__ 206 else: 207 # Stop masking Cython bugs, expose them in a friendly way. 208 coro_name = f'<{type(coro).__name__} without __name__>' 209 return f'{coro_name}()' 210 211 def is_running(coro): 212 try: 213 return coro.cr_running 214 except AttributeError: 215 try: 216 return coro.gi_running 217 except AttributeError: 218 return False 219 220 coro_code = None 221 if hasattr(coro, 'cr_code') and coro.cr_code: 222 coro_code = coro.cr_code 223 elif hasattr(coro, 'gi_code') and coro.gi_code: 224 coro_code = coro.gi_code 225 226 coro_name = get_name(coro) 227 228 if not coro_code: 229 # Built-in types might not have __qualname__ or __name__. 230 if is_running(coro): 231 return f'{coro_name} running' 232 else: 233 return coro_name 234 235 coro_frame = None 236 if hasattr(coro, 'gi_frame') and coro.gi_frame: 237 coro_frame = coro.gi_frame 238 elif hasattr(coro, 'cr_frame') and coro.cr_frame: 239 coro_frame = coro.cr_frame 240 241 # If Cython's coroutine has a fake code object without proper 242 # co_filename -- expose that. 243 filename = coro_code.co_filename or '<empty co_filename>' 244 245 lineno = 0 246 if (is_corowrapper and 247 coro.func is not None and 248 not inspect.isgeneratorfunction(coro.func)): 249 source = format_helpers._get_function_source(coro.func) 250 if source is not None: 251 filename, lineno = source 252 if coro_frame is None: 253 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 254 else: 255 coro_repr = f'{coro_name} running, defined at {filename}:{lineno}' 256 257 elif coro_frame is not None: 258 lineno = coro_frame.f_lineno 259 coro_repr = f'{coro_name} running at {filename}:{lineno}' 260 261 else: 262 lineno = coro_code.co_firstlineno 263 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 264 265 return coro_repr 266