• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine'
2
3import collections.abc
4import functools
5import inspect
6import os
7import sys
8import traceback
9import types
10import warnings
11
12from . import base_futures
13from . import constants
14from . import format_helpers
15from .log import logger
16
17
18def _is_debug_mode():
19    # If you set _DEBUG to true, @coroutine will wrap the resulting
20    # generator objects in a CoroWrapper instance (defined below).  That
21    # instance will log a message when the generator is never iterated
22    # over, which may happen when you forget to use "await" or "yield from"
23    # with a coroutine call.
24    # Note that the value of the _DEBUG flag is taken
25    # when the decorator is used, so to be of any use it must be set
26    # before you define your coroutines.  A downside of using this feature
27    # is that tracebacks show entries for the CoroWrapper.__next__ method
28    # when _DEBUG is true.
29    return sys.flags.dev_mode or (not sys.flags.ignore_environment and
30                                  bool(os.environ.get('PYTHONASYNCIODEBUG')))
31
32
33_DEBUG = _is_debug_mode()
34
35
36class CoroWrapper:
37    # Wrapper for coroutine object in _DEBUG mode.
38
39    def __init__(self, gen, func=None):
40        assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
41        self.gen = gen
42        self.func = func  # Used to unwrap @coroutine decorator
43        self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
44        self.__name__ = getattr(gen, '__name__', None)
45        self.__qualname__ = getattr(gen, '__qualname__', None)
46
47    def __repr__(self):
48        coro_repr = _format_coroutine(self)
49        if self._source_traceback:
50            frame = self._source_traceback[-1]
51            coro_repr += f', created at {frame[0]}:{frame[1]}'
52
53        return f'<{self.__class__.__name__} {coro_repr}>'
54
55    def __iter__(self):
56        return self
57
58    def __next__(self):
59        return self.gen.send(None)
60
61    def send(self, value):
62        return self.gen.send(value)
63
64    def throw(self, type, value=None, traceback=None):
65        return self.gen.throw(type, value, traceback)
66
67    def close(self):
68        return self.gen.close()
69
70    @property
71    def gi_frame(self):
72        return self.gen.gi_frame
73
74    @property
75    def gi_running(self):
76        return self.gen.gi_running
77
78    @property
79    def gi_code(self):
80        return self.gen.gi_code
81
82    def __await__(self):
83        return self
84
85    @property
86    def gi_yieldfrom(self):
87        return self.gen.gi_yieldfrom
88
89    def __del__(self):
90        # Be careful accessing self.gen.frame -- self.gen might not exist.
91        gen = getattr(self, 'gen', None)
92        frame = getattr(gen, 'gi_frame', None)
93        if frame is not None and frame.f_lasti == -1:
94            msg = f'{self!r} was never yielded from'
95            tb = getattr(self, '_source_traceback', ())
96            if tb:
97                tb = ''.join(traceback.format_list(tb))
98                msg += (f'\nCoroutine object created at '
99                        f'(most recent call last, truncated to '
100                        f'{constants.DEBUG_STACK_DEPTH} last lines):\n')
101                msg += tb.rstrip()
102            logger.error(msg)
103
104
105def coroutine(func):
106    """Decorator to mark coroutines.
107
108    If the coroutine is not yielded from before it is destroyed,
109    an error message is logged.
110    """
111    warnings.warn('"@coroutine" decorator is deprecated since Python 3.8, use "async def" instead',
112                  DeprecationWarning,
113                  stacklevel=2)
114    if inspect.iscoroutinefunction(func):
115        # In Python 3.5 that's all we need to do for coroutines
116        # defined with "async def".
117        return func
118
119    if inspect.isgeneratorfunction(func):
120        coro = func
121    else:
122        @functools.wraps(func)
123        def coro(*args, **kw):
124            res = func(*args, **kw)
125            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
126                    isinstance(res, CoroWrapper)):
127                res = yield from res
128            else:
129                # If 'res' is an awaitable, run it.
130                try:
131                    await_meth = res.__await__
132                except AttributeError:
133                    pass
134                else:
135                    if isinstance(res, collections.abc.Awaitable):
136                        res = yield from await_meth()
137            return res
138
139    coro = types.coroutine(coro)
140    if not _DEBUG:
141        wrapper = coro
142    else:
143        @functools.wraps(func)
144        def wrapper(*args, **kwds):
145            w = CoroWrapper(coro(*args, **kwds), func=func)
146            if w._source_traceback:
147                del w._source_traceback[-1]
148            # Python < 3.5 does not implement __qualname__
149            # on generator objects, so we set it manually.
150            # We use getattr as some callables (such as
151            # functools.partial may lack __qualname__).
152            w.__name__ = getattr(func, '__name__', None)
153            w.__qualname__ = getattr(func, '__qualname__', None)
154            return w
155
156    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
157    return wrapper
158
159
160# A marker for iscoroutinefunction.
161_is_coroutine = object()
162
163
164def iscoroutinefunction(func):
165    """Return True if func is a decorated coroutine function."""
166    return (inspect.iscoroutinefunction(func) or
167            getattr(func, '_is_coroutine', None) is _is_coroutine)
168
169
170# Prioritize native coroutine check to speed-up
171# asyncio.iscoroutine.
172_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
173                    collections.abc.Coroutine, CoroWrapper)
174_iscoroutine_typecache = set()
175
176
177def iscoroutine(obj):
178    """Return True if obj is a coroutine object."""
179    if type(obj) in _iscoroutine_typecache:
180        return True
181
182    if isinstance(obj, _COROUTINE_TYPES):
183        # Just in case we don't want to cache more than 100
184        # positive types.  That shouldn't ever happen, unless
185        # someone stressing the system on purpose.
186        if len(_iscoroutine_typecache) < 100:
187            _iscoroutine_typecache.add(type(obj))
188        return True
189    else:
190        return False
191
192
193def _format_coroutine(coro):
194    assert iscoroutine(coro)
195
196    is_corowrapper = isinstance(coro, CoroWrapper)
197
198    def get_name(coro):
199        # Coroutines compiled with Cython sometimes don't have
200        # proper __qualname__ or __name__.  While that is a bug
201        # in Cython, asyncio shouldn't crash with an AttributeError
202        # in its __repr__ functions.
203        if is_corowrapper:
204            return format_helpers._format_callback(coro.func, (), {})
205
206        if hasattr(coro, '__qualname__') and coro.__qualname__:
207            coro_name = coro.__qualname__
208        elif hasattr(coro, '__name__') and coro.__name__:
209            coro_name = coro.__name__
210        else:
211            # Stop masking Cython bugs, expose them in a friendly way.
212            coro_name = f'<{type(coro).__name__} without __name__>'
213        return f'{coro_name}()'
214
215    def is_running(coro):
216        try:
217            return coro.cr_running
218        except AttributeError:
219            try:
220                return coro.gi_running
221            except AttributeError:
222                return False
223
224    coro_code = None
225    if hasattr(coro, 'cr_code') and coro.cr_code:
226        coro_code = coro.cr_code
227    elif hasattr(coro, 'gi_code') and coro.gi_code:
228        coro_code = coro.gi_code
229
230    coro_name = get_name(coro)
231
232    if not coro_code:
233        # Built-in types might not have __qualname__ or __name__.
234        if is_running(coro):
235            return f'{coro_name} running'
236        else:
237            return coro_name
238
239    coro_frame = None
240    if hasattr(coro, 'gi_frame') and coro.gi_frame:
241        coro_frame = coro.gi_frame
242    elif hasattr(coro, 'cr_frame') and coro.cr_frame:
243        coro_frame = coro.cr_frame
244
245    # If Cython's coroutine has a fake code object without proper
246    # co_filename -- expose that.
247    filename = coro_code.co_filename or '<empty co_filename>'
248
249    lineno = 0
250    if (is_corowrapper and
251            coro.func is not None and
252            not inspect.isgeneratorfunction(coro.func)):
253        source = format_helpers._get_function_source(coro.func)
254        if source is not None:
255            filename, lineno = source
256        if coro_frame is None:
257            coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
258        else:
259            coro_repr = f'{coro_name} running, defined at {filename}:{lineno}'
260
261    elif coro_frame is not None:
262        lineno = coro_frame.f_lineno
263        coro_repr = f'{coro_name} running at {filename}:{lineno}'
264
265    else:
266        lineno = coro_code.co_firstlineno
267        coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
268
269    return coro_repr
270