1import functools 2import inspect 3import reprlib 4import sys 5import traceback 6 7from . import constants 8 9 10def _get_function_source(func): 11 func = inspect.unwrap(func) 12 if inspect.isfunction(func): 13 code = func.__code__ 14 return (code.co_filename, code.co_firstlineno) 15 if isinstance(func, functools.partial): 16 return _get_function_source(func.func) 17 if isinstance(func, functools.partialmethod): 18 return _get_function_source(func.func) 19 return None 20 21 22def _format_callback_source(func, args, *, debug=False): 23 func_repr = _format_callback(func, args, None, debug=debug) 24 source = _get_function_source(func) 25 if source: 26 func_repr += f' at {source[0]}:{source[1]}' 27 return func_repr 28 29 30def _format_args_and_kwargs(args, kwargs, *, debug=False): 31 """Format function arguments and keyword arguments. 32 33 Special case for a single parameter: ('hello',) is formatted as ('hello'). 34 35 Note that this function only returns argument details when 36 debug=True is specified, as arguments may contain sensitive 37 information. 38 """ 39 if not debug: 40 return '()' 41 42 # use reprlib to limit the length of the output 43 items = [] 44 if args: 45 items.extend(reprlib.repr(arg) for arg in args) 46 if kwargs: 47 items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items()) 48 return '({})'.format(', '.join(items)) 49 50 51def _format_callback(func, args, kwargs, *, debug=False, suffix=''): 52 if isinstance(func, functools.partial): 53 suffix = _format_args_and_kwargs(args, kwargs, debug=debug) + suffix 54 return _format_callback(func.func, func.args, func.keywords, 55 debug=debug, suffix=suffix) 56 57 if hasattr(func, '__qualname__') and func.__qualname__: 58 func_repr = func.__qualname__ 59 elif hasattr(func, '__name__') and func.__name__: 60 func_repr = func.__name__ 61 else: 62 func_repr = repr(func) 63 64 func_repr += _format_args_and_kwargs(args, kwargs, debug=debug) 65 if suffix: 66 func_repr += suffix 67 return func_repr 68 69 70def extract_stack(f=None, limit=None): 71 """Replacement for traceback.extract_stack() that only does the 72 necessary work for asyncio debug mode. 73 """ 74 if f is None: 75 f = sys._getframe().f_back 76 if limit is None: 77 # Limit the amount of work to a reasonable amount, as extract_stack() 78 # can be called for each coroutine and future in debug mode. 79 limit = constants.DEBUG_STACK_DEPTH 80 stack = traceback.StackSummary.extract(traceback.walk_stack(f), 81 limit=limit, 82 lookup_lines=False) 83 stack.reverse() 84 return stack 85