• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import functools
2import inspect
3import reprlib
4import sys
5import traceback
6
7from . import constants
8
9
10def _get_function_source(func):
11    func = inspect.unwrap(func)
12    if inspect.isfunction(func):
13        code = func.__code__
14        return (code.co_filename, code.co_firstlineno)
15    if isinstance(func, functools.partial):
16        return _get_function_source(func.func)
17    if isinstance(func, functools.partialmethod):
18        return _get_function_source(func.func)
19    return None
20
21
22def _format_callback_source(func, args, *, debug=False):
23    func_repr = _format_callback(func, args, None, debug=debug)
24    source = _get_function_source(func)
25    if source:
26        func_repr += f' at {source[0]}:{source[1]}'
27    return func_repr
28
29
30def _format_args_and_kwargs(args, kwargs, *, debug=False):
31    """Format function arguments and keyword arguments.
32
33    Special case for a single parameter: ('hello',) is formatted as ('hello').
34
35    Note that this function only returns argument details when
36    debug=True is specified, as arguments may contain sensitive
37    information.
38    """
39    if not debug:
40        return '()'
41
42    # use reprlib to limit the length of the output
43    items = []
44    if args:
45        items.extend(reprlib.repr(arg) for arg in args)
46    if kwargs:
47        items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
48    return '({})'.format(', '.join(items))
49
50
51def _format_callback(func, args, kwargs, *, debug=False, suffix=''):
52    if isinstance(func, functools.partial):
53        suffix = _format_args_and_kwargs(args, kwargs, debug=debug) + suffix
54        return _format_callback(func.func, func.args, func.keywords,
55                                debug=debug, suffix=suffix)
56
57    if hasattr(func, '__qualname__') and func.__qualname__:
58        func_repr = func.__qualname__
59    elif hasattr(func, '__name__') and func.__name__:
60        func_repr = func.__name__
61    else:
62        func_repr = repr(func)
63
64    func_repr += _format_args_and_kwargs(args, kwargs, debug=debug)
65    if suffix:
66        func_repr += suffix
67    return func_repr
68
69
70def extract_stack(f=None, limit=None):
71    """Replacement for traceback.extract_stack() that only does the
72    necessary work for asyncio debug mode.
73    """
74    if f is None:
75        f = sys._getframe().f_back
76    if limit is None:
77        # Limit the amount of work to a reasonable amount, as extract_stack()
78        # can be called for each coroutine and future in debug mode.
79        limit = constants.DEBUG_STACK_DEPTH
80    stack = traceback.StackSummary.extract(traceback.walk_stack(f),
81                                           limit=limit,
82                                           lookup_lines=False)
83    stack.reverse()
84    return stack
85