1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine' 2 3import collections.abc 4import functools 5import inspect 6import os 7import sys 8import traceback 9import types 10import warnings 11 12from . import base_futures 13from . import constants 14from . import format_helpers 15from .log import logger 16 17 18def _is_debug_mode(): 19 # If you set _DEBUG to true, @coroutine will wrap the resulting 20 # generator objects in a CoroWrapper instance (defined below). That 21 # instance will log a message when the generator is never iterated 22 # over, which may happen when you forget to use "await" or "yield from" 23 # with a coroutine call. 24 # Note that the value of the _DEBUG flag is taken 25 # when the decorator is used, so to be of any use it must be set 26 # before you define your coroutines. A downside of using this feature 27 # is that tracebacks show entries for the CoroWrapper.__next__ method 28 # when _DEBUG is true. 29 return sys.flags.dev_mode or (not sys.flags.ignore_environment and 30 bool(os.environ.get('PYTHONASYNCIODEBUG'))) 31 32 33_DEBUG = _is_debug_mode() 34 35 36class CoroWrapper: 37 # Wrapper for coroutine object in _DEBUG mode. 38 39 def __init__(self, gen, func=None): 40 assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen 41 self.gen = gen 42 self.func = func # Used to unwrap @coroutine decorator 43 self._source_traceback = format_helpers.extract_stack(sys._getframe(1)) 44 self.__name__ = getattr(gen, '__name__', None) 45 self.__qualname__ = getattr(gen, '__qualname__', None) 46 47 def __repr__(self): 48 coro_repr = _format_coroutine(self) 49 if self._source_traceback: 50 frame = self._source_traceback[-1] 51 coro_repr += f', created at {frame[0]}:{frame[1]}' 52 53 return f'<{self.__class__.__name__} {coro_repr}>' 54 55 def __iter__(self): 56 return self 57 58 def __next__(self): 59 return self.gen.send(None) 60 61 def send(self, value): 62 return self.gen.send(value) 63 64 def throw(self, type, value=None, traceback=None): 65 return self.gen.throw(type, value, traceback) 66 67 def close(self): 68 return self.gen.close() 69 70 @property 71 def gi_frame(self): 72 return self.gen.gi_frame 73 74 @property 75 def gi_running(self): 76 return self.gen.gi_running 77 78 @property 79 def gi_code(self): 80 return self.gen.gi_code 81 82 def __await__(self): 83 return self 84 85 @property 86 def gi_yieldfrom(self): 87 return self.gen.gi_yieldfrom 88 89 def __del__(self): 90 # Be careful accessing self.gen.frame -- self.gen might not exist. 91 gen = getattr(self, 'gen', None) 92 frame = getattr(gen, 'gi_frame', None) 93 if frame is not None and frame.f_lasti == -1: 94 msg = f'{self!r} was never yielded from' 95 tb = getattr(self, '_source_traceback', ()) 96 if tb: 97 tb = ''.join(traceback.format_list(tb)) 98 msg += (f'\nCoroutine object created at ' 99 f'(most recent call last, truncated to ' 100 f'{constants.DEBUG_STACK_DEPTH} last lines):\n') 101 msg += tb.rstrip() 102 logger.error(msg) 103 104 105def coroutine(func): 106 """Decorator to mark coroutines. 107 108 If the coroutine is not yielded from before it is destroyed, 109 an error message is logged. 110 """ 111 warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead', 112 DeprecationWarning, 113 stacklevel=2) 114 if inspect.iscoroutinefunction(func): 115 # In Python 3.5 that's all we need to do for coroutines 116 # defined with "async def". 117 return func 118 119 if inspect.isgeneratorfunction(func): 120 coro = func 121 else: 122 @functools.wraps(func) 123 def coro(*args, **kw): 124 res = func(*args, **kw) 125 if (base_futures.isfuture(res) or inspect.isgenerator(res) or 126 isinstance(res, CoroWrapper)): 127 res = yield from res 128 else: 129 # If 'res' is an awaitable, run it. 130 try: 131 await_meth = res.__await__ 132 except AttributeError: 133 pass 134 else: 135 if isinstance(res, collections.abc.Awaitable): 136 res = yield from await_meth() 137 return res 138 139 coro = types.coroutine(coro) 140 if not _DEBUG: 141 wrapper = coro 142 else: 143 @functools.wraps(func) 144 def wrapper(*args, **kwds): 145 w = CoroWrapper(coro(*args, **kwds), func=func) 146 if w._source_traceback: 147 del w._source_traceback[-1] 148 # Python < 3.5 does not implement __qualname__ 149 # on generator objects, so we set it manually. 150 # We use getattr as some callables (such as 151 # functools.partial may lack __qualname__). 152 w.__name__ = getattr(func, '__name__', None) 153 w.__qualname__ = getattr(func, '__qualname__', None) 154 return w 155 156 wrapper._is_coroutine = _is_coroutine # For iscoroutinefunction(). 157 return wrapper 158 159 160# A marker for iscoroutinefunction. 161_is_coroutine = object() 162 163 164def iscoroutinefunction(func): 165 """Return True if func is a decorated coroutine function.""" 166 return (inspect.iscoroutinefunction(func) or 167 getattr(func, '_is_coroutine', None) is _is_coroutine) 168 169 170# Prioritize native coroutine check to speed-up 171# asyncio.iscoroutine. 172_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType, 173 collections.abc.Coroutine, CoroWrapper) 174_iscoroutine_typecache = set() 175 176 177def iscoroutine(obj): 178 """Return True if obj is a coroutine object.""" 179 if type(obj) in _iscoroutine_typecache: 180 return True 181 182 if isinstance(obj, _COROUTINE_TYPES): 183 # Just in case we don't want to cache more than 100 184 # positive types. That shouldn't ever happen, unless 185 # someone stressing the system on purpose. 186 if len(_iscoroutine_typecache) < 100: 187 _iscoroutine_typecache.add(type(obj)) 188 return True 189 else: 190 return False 191 192 193def _format_coroutine(coro): 194 assert iscoroutine(coro) 195 196 is_corowrapper = isinstance(coro, CoroWrapper) 197 198 def get_name(coro): 199 # Coroutines compiled with Cython sometimes don't have 200 # proper __qualname__ or __name__. While that is a bug 201 # in Cython, asyncio shouldn't crash with an AttributeError 202 # in its __repr__ functions. 203 if is_corowrapper: 204 return format_helpers._format_callback(coro.func, (), {}) 205 206 if hasattr(coro, '__qualname__') and coro.__qualname__: 207 coro_name = coro.__qualname__ 208 elif hasattr(coro, '__name__') and coro.__name__: 209 coro_name = coro.__name__ 210 else: 211 # Stop masking Cython bugs, expose them in a friendly way. 212 coro_name = f'<{type(coro).__name__} without __name__>' 213 return f'{coro_name}()' 214 215 def is_running(coro): 216 try: 217 return coro.cr_running 218 except AttributeError: 219 try: 220 return coro.gi_running 221 except AttributeError: 222 return False 223 224 coro_code = None 225 if hasattr(coro, 'cr_code') and coro.cr_code: 226 coro_code = coro.cr_code 227 elif hasattr(coro, 'gi_code') and coro.gi_code: 228 coro_code = coro.gi_code 229 230 coro_name = get_name(coro) 231 232 if not coro_code: 233 # Built-in types might not have __qualname__ or __name__. 234 if is_running(coro): 235 return f'{coro_name} running' 236 else: 237 return coro_name 238 239 coro_frame = None 240 if hasattr(coro, 'gi_frame') and coro.gi_frame: 241 coro_frame = coro.gi_frame 242 elif hasattr(coro, 'cr_frame') and coro.cr_frame: 243 coro_frame = coro.cr_frame 244 245 # If Cython's coroutine has a fake code object without proper 246 # co_filename -- expose that. 247 filename = coro_code.co_filename or '<empty co_filename>' 248 249 lineno = 0 250 if (is_corowrapper and 251 coro.func is not None and 252 not inspect.isgeneratorfunction(coro.func)): 253 source = format_helpers._get_function_source(coro.func) 254 if source is not None: 255 filename, lineno = source 256 if coro_frame is None: 257 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 258 else: 259 coro_repr = f'{coro_name} running, defined at {filename}:{lineno}' 260 261 elif coro_frame is not None: 262 lineno = coro_frame.f_lineno 263 coro_repr = f'{coro_name} running at {filename}:{lineno}' 264 265 else: 266 lineno = coro_code.co_firstlineno 267 coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 268 269 return coro_repr 270