• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import linecache
2import reprlib
3import traceback
4
5from . import base_futures
6from . import coroutines
7
8
9def _task_repr_info(task):
10    info = base_futures._future_repr_info(task)
11
12    if task.cancelling() and not task.done():
13        # replace status
14        info[0] = 'cancelling'
15
16    info.insert(1, 'name=%r' % task.get_name())
17
18    if task._fut_waiter is not None:
19        info.insert(2, f'wait_for={task._fut_waiter!r}')
20
21    if task._coro:
22        coro = coroutines._format_coroutine(task._coro)
23        info.insert(2, f'coro=<{coro}>')
24
25    return info
26
27
28@reprlib.recursive_repr()
29def _task_repr(task):
30    info = ' '.join(_task_repr_info(task))
31    return f'<{task.__class__.__name__} {info}>'
32
33
34def _task_get_stack(task, limit):
35    frames = []
36    if hasattr(task._coro, 'cr_frame'):
37        # case 1: 'async def' coroutines
38        f = task._coro.cr_frame
39    elif hasattr(task._coro, 'gi_frame'):
40        # case 2: legacy coroutines
41        f = task._coro.gi_frame
42    elif hasattr(task._coro, 'ag_frame'):
43        # case 3: async generators
44        f = task._coro.ag_frame
45    else:
46        # case 4: unknown objects
47        f = None
48    if f is not None:
49        while f is not None:
50            if limit is not None:
51                if limit <= 0:
52                    break
53                limit -= 1
54            frames.append(f)
55            f = f.f_back
56        frames.reverse()
57    elif task._exception is not None:
58        tb = task._exception.__traceback__
59        while tb is not None:
60            if limit is not None:
61                if limit <= 0:
62                    break
63                limit -= 1
64            frames.append(tb.tb_frame)
65            tb = tb.tb_next
66    return frames
67
68
69def _task_print_stack(task, limit, file):
70    extracted_list = []
71    checked = set()
72    for f in task.get_stack(limit=limit):
73        lineno = f.f_lineno
74        co = f.f_code
75        filename = co.co_filename
76        name = co.co_name
77        if filename not in checked:
78            checked.add(filename)
79            linecache.checkcache(filename)
80        line = linecache.getline(filename, lineno, f.f_globals)
81        extracted_list.append((filename, lineno, name, line))
82
83    exc = task._exception
84    if not extracted_list:
85        print(f'No stack for {task!r}', file=file)
86    elif exc is not None:
87        print(f'Traceback for {task!r} (most recent call last):', file=file)
88    else:
89        print(f'Stack for {task!r} (most recent call last):', file=file)
90
91    traceback.print_list(extracted_list, file=file)
92    if exc is not None:
93        for line in traceback.format_exception_only(exc.__class__, exc):
94            print(line, file=file, end='')
95