• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1__all__ = 'coroutine', 'iscoroutinefunction', 'iscoroutine'
2
3import collections.abc
4import functools
5import inspect
6import os
7import sys
8import traceback
9import types
10
11from . import base_futures
12from . import constants
13from . import format_helpers
14from .log import logger
15
16
17def _is_debug_mode():
18    # If you set _DEBUG to true, @coroutine will wrap the resulting
19    # generator objects in a CoroWrapper instance (defined below).  That
20    # instance will log a message when the generator is never iterated
21    # over, which may happen when you forget to use "await" or "yield from"
22    # with a coroutine call.
23    # Note that the value of the _DEBUG flag is taken
24    # when the decorator is used, so to be of any use it must be set
25    # before you define your coroutines.  A downside of using this feature
26    # is that tracebacks show entries for the CoroWrapper.__next__ method
27    # when _DEBUG is true.
28    return sys.flags.dev_mode or (not sys.flags.ignore_environment and
29                                  bool(os.environ.get('PYTHONASYNCIODEBUG')))
30
31
32_DEBUG = _is_debug_mode()
33
34
35class CoroWrapper:
36    # Wrapper for coroutine object in _DEBUG mode.
37
38    def __init__(self, gen, func=None):
39        assert inspect.isgenerator(gen) or inspect.iscoroutine(gen), gen
40        self.gen = gen
41        self.func = func  # Used to unwrap @coroutine decorator
42        self._source_traceback = format_helpers.extract_stack(sys._getframe(1))
43        self.__name__ = getattr(gen, '__name__', None)
44        self.__qualname__ = getattr(gen, '__qualname__', None)
45
46    def __repr__(self):
47        coro_repr = _format_coroutine(self)
48        if self._source_traceback:
49            frame = self._source_traceback[-1]
50            coro_repr += f', created at {frame[0]}:{frame[1]}'
51
52        return f'<{self.__class__.__name__} {coro_repr}>'
53
54    def __iter__(self):
55        return self
56
57    def __next__(self):
58        return self.gen.send(None)
59
60    def send(self, value):
61        return self.gen.send(value)
62
63    def throw(self, type, value=None, traceback=None):
64        return self.gen.throw(type, value, traceback)
65
66    def close(self):
67        return self.gen.close()
68
69    @property
70    def gi_frame(self):
71        return self.gen.gi_frame
72
73    @property
74    def gi_running(self):
75        return self.gen.gi_running
76
77    @property
78    def gi_code(self):
79        return self.gen.gi_code
80
81    def __await__(self):
82        return self
83
84    @property
85    def gi_yieldfrom(self):
86        return self.gen.gi_yieldfrom
87
88    def __del__(self):
89        # Be careful accessing self.gen.frame -- self.gen might not exist.
90        gen = getattr(self, 'gen', None)
91        frame = getattr(gen, 'gi_frame', None)
92        if frame is not None and frame.f_lasti == -1:
93            msg = f'{self!r} was never yielded from'
94            tb = getattr(self, '_source_traceback', ())
95            if tb:
96                tb = ''.join(traceback.format_list(tb))
97                msg += (f'\nCoroutine object created at '
98                        f'(most recent call last, truncated to '
99                        f'{constants.DEBUG_STACK_DEPTH} last lines):\n')
100                msg += tb.rstrip()
101            logger.error(msg)
102
103
104def coroutine(func):
105    """Decorator to mark coroutines.
106
107    If the coroutine is not yielded from before it is destroyed,
108    an error message is logged.
109    """
110    if inspect.iscoroutinefunction(func):
111        # In Python 3.5 that's all we need to do for coroutines
112        # defined with "async def".
113        return func
114
115    if inspect.isgeneratorfunction(func):
116        coro = func
117    else:
118        @functools.wraps(func)
119        def coro(*args, **kw):
120            res = func(*args, **kw)
121            if (base_futures.isfuture(res) or inspect.isgenerator(res) or
122                    isinstance(res, CoroWrapper)):
123                res = yield from res
124            else:
125                # If 'res' is an awaitable, run it.
126                try:
127                    await_meth = res.__await__
128                except AttributeError:
129                    pass
130                else:
131                    if isinstance(res, collections.abc.Awaitable):
132                        res = yield from await_meth()
133            return res
134
135    coro = types.coroutine(coro)
136    if not _DEBUG:
137        wrapper = coro
138    else:
139        @functools.wraps(func)
140        def wrapper(*args, **kwds):
141            w = CoroWrapper(coro(*args, **kwds), func=func)
142            if w._source_traceback:
143                del w._source_traceback[-1]
144            # Python < 3.5 does not implement __qualname__
145            # on generator objects, so we set it manually.
146            # We use getattr as some callables (such as
147            # functools.partial may lack __qualname__).
148            w.__name__ = getattr(func, '__name__', None)
149            w.__qualname__ = getattr(func, '__qualname__', None)
150            return w
151
152    wrapper._is_coroutine = _is_coroutine  # For iscoroutinefunction().
153    return wrapper
154
155
156# A marker for iscoroutinefunction.
157_is_coroutine = object()
158
159
160def iscoroutinefunction(func):
161    """Return True if func is a decorated coroutine function."""
162    return (inspect.iscoroutinefunction(func) or
163            getattr(func, '_is_coroutine', None) is _is_coroutine)
164
165
166# Prioritize native coroutine check to speed-up
167# asyncio.iscoroutine.
168_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
169                    collections.abc.Coroutine, CoroWrapper)
170_iscoroutine_typecache = set()
171
172
173def iscoroutine(obj):
174    """Return True if obj is a coroutine object."""
175    if type(obj) in _iscoroutine_typecache:
176        return True
177
178    if isinstance(obj, _COROUTINE_TYPES):
179        # Just in case we don't want to cache more than 100
180        # positive types.  That shouldn't ever happen, unless
181        # someone stressing the system on purpose.
182        if len(_iscoroutine_typecache) < 100:
183            _iscoroutine_typecache.add(type(obj))
184        return True
185    else:
186        return False
187
188
189def _format_coroutine(coro):
190    assert iscoroutine(coro)
191
192    is_corowrapper = isinstance(coro, CoroWrapper)
193
194    def get_name(coro):
195        # Coroutines compiled with Cython sometimes don't have
196        # proper __qualname__ or __name__.  While that is a bug
197        # in Cython, asyncio shouldn't crash with an AttributeError
198        # in its __repr__ functions.
199        if is_corowrapper:
200            return format_helpers._format_callback(coro.func, (), {})
201
202        if hasattr(coro, '__qualname__') and coro.__qualname__:
203            coro_name = coro.__qualname__
204        elif hasattr(coro, '__name__') and coro.__name__:
205            coro_name = coro.__name__
206        else:
207            # Stop masking Cython bugs, expose them in a friendly way.
208            coro_name = f'<{type(coro).__name__} without __name__>'
209        return f'{coro_name}()'
210
211    def is_running(coro):
212        try:
213            return coro.cr_running
214        except AttributeError:
215            try:
216                return coro.gi_running
217            except AttributeError:
218                return False
219
220    coro_code = None
221    if hasattr(coro, 'cr_code') and coro.cr_code:
222        coro_code = coro.cr_code
223    elif hasattr(coro, 'gi_code') and coro.gi_code:
224        coro_code = coro.gi_code
225
226    coro_name = get_name(coro)
227
228    if not coro_code:
229        # Built-in types might not have __qualname__ or __name__.
230        if is_running(coro):
231            return f'{coro_name} running'
232        else:
233            return coro_name
234
235    coro_frame = None
236    if hasattr(coro, 'gi_frame') and coro.gi_frame:
237        coro_frame = coro.gi_frame
238    elif hasattr(coro, 'cr_frame') and coro.cr_frame:
239        coro_frame = coro.cr_frame
240
241    # If Cython's coroutine has a fake code object without proper
242    # co_filename -- expose that.
243    filename = coro_code.co_filename or '<empty co_filename>'
244
245    lineno = 0
246    if (is_corowrapper and
247            coro.func is not None and
248            not inspect.isgeneratorfunction(coro.func)):
249        source = format_helpers._get_function_source(coro.func)
250        if source is not None:
251            filename, lineno = source
252        if coro_frame is None:
253            coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
254        else:
255            coro_repr = f'{coro_name} running, defined at {filename}:{lineno}'
256
257    elif coro_frame is not None:
258        lineno = coro_frame.f_lineno
259        coro_repr = f'{coro_name} running at {filename}:{lineno}'
260
261    else:
262        lineno = coro_code.co_firstlineno
263        coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
264
265    return coro_repr
266