• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import functools
2import inspect
3import reprlib
4import sys
5import traceback
6
7from . import constants
8
9
10def _get_function_source(func):
11    func = inspect.unwrap(func)
12    if inspect.isfunction(func):
13        code = func.__code__
14        return (code.co_filename, code.co_firstlineno)
15    if isinstance(func, functools.partial):
16        return _get_function_source(func.func)
17    if isinstance(func, functools.partialmethod):
18        return _get_function_source(func.func)
19    return None
20
21
22def _format_callback_source(func, args):
23    func_repr = _format_callback(func, args, None)
24    source = _get_function_source(func)
25    if source:
26        func_repr += f' at {source[0]}:{source[1]}'
27    return func_repr
28
29
30def _format_args_and_kwargs(args, kwargs):
31    """Format function arguments and keyword arguments.
32
33    Special case for a single parameter: ('hello',) is formatted as ('hello').
34    """
35    # use reprlib to limit the length of the output
36    items = []
37    if args:
38        items.extend(reprlib.repr(arg) for arg in args)
39    if kwargs:
40        items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
41    return '({})'.format(', '.join(items))
42
43
44def _format_callback(func, args, kwargs, suffix=''):
45    if isinstance(func, functools.partial):
46        suffix = _format_args_and_kwargs(args, kwargs) + suffix
47        return _format_callback(func.func, func.args, func.keywords, suffix)
48
49    if hasattr(func, '__qualname__') and func.__qualname__:
50        func_repr = func.__qualname__
51    elif hasattr(func, '__name__') and func.__name__:
52        func_repr = func.__name__
53    else:
54        func_repr = repr(func)
55
56    func_repr += _format_args_and_kwargs(args, kwargs)
57    if suffix:
58        func_repr += suffix
59    return func_repr
60
61
62def extract_stack(f=None, limit=None):
63    """Replacement for traceback.extract_stack() that only does the
64    necessary work for asyncio debug mode.
65    """
66    if f is None:
67        f = sys._getframe().f_back
68    if limit is None:
69        # Limit the amount of work to a reasonable amount, as extract_stack()
70        # can be called for each coroutine and future in debug mode.
71        limit = constants.DEBUG_STACK_DEPTH
72    stack = traceback.StackSummary.extract(traceback.walk_stack(f),
73                                           limit=limit,
74                                           lookup_lines=False)
75    stack.reverse()
76    return stack
77