• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Library to analyze and dump Thread protos and Thread snapshots into text."""
15
16import binascii
17from typing import Optional, List, Mapping
18import pw_tokenizer
19from pw_symbolizer import LlvmSymbolizer, Symbolizer
20from pw_tokenizer import proto as proto_detokenizer
21from pw_thread_protos import thread_pb2
22
23THREAD_STATE_TO_STRING: Mapping[int, str] = {
24    thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN',
25    thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER',
26    thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING',
27    thread_pb2.ThreadState.Enum.READY: 'READY',
28    thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED',
29    thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED',
30    thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE',
31}
32
33
34def process_snapshot(
35    serialized_snapshot: bytes,
36    tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
37    symbolizer: Optional[Symbolizer] = None,
38) -> str:
39    """Processes snapshot threads, producing a multi-line string."""
40    captured_threads = thread_pb2.SnapshotThreadInfo()
41    captured_threads.ParseFromString(serialized_snapshot)
42    if symbolizer is None:
43        symbolizer = LlvmSymbolizer()
44
45    return str(
46        ThreadSnapshotAnalyzer(captured_threads, tokenizer_db, symbolizer)
47    )
48
49
50class ThreadInfo:
51    """Provides CPU and stack information that can be inferred from a Thread."""
52
53    _UNKNOWN_VALUE_STR = '0x' + '?' * 8
54
55    def __init__(self, thread: thread_pb2.Thread):
56        self._thread = thread
57
58    def _cpu_used_str(self) -> str:
59        if not self._thread.HasField('cpu_usage_hundredths'):
60            return 'unknown'
61        cpu_last_percent = self._thread.cpu_usage_hundredths / 100
62        return f'{cpu_last_percent:.2f}%'
63
64    def _stack_size_limit_limit_str(self) -> str:
65        if not self.has_stack_size_limit():
66            return 'size unknown'
67
68        return f'{self.stack_size_limit()} bytes'
69
70    def _stack_used_str(self) -> str:
71        if not self.has_stack_used():
72            return 'size unknown'
73
74        used_str = f'{self.stack_used()} bytes'
75        if not self.has_stack_size_limit():
76            return used_str
77        used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%'
78        return used_str
79
80    def _stack_pointer_est_peak_str(self) -> str:
81        if not self.has_stack_pointer_est_peak():
82            return 'size unknown'
83
84        high_used_str = f'{self.stack_pointer_est_peak()} bytes'
85        if not self.has_stack_size_limit():
86            return high_used_str
87        high_water_mark_percent = (
88            100 * self.stack_pointer_est_peak() / self.stack_size_limit()
89        )
90        high_used_str += f', {high_water_mark_percent:.2f}%'
91        return high_used_str
92
93    def _stack_used_range_str(self) -> str:
94        start_str = (
95            f'0x{self._thread.stack_start_pointer:08x}'
96            if self._thread.HasField('stack_start_pointer')
97            else ThreadInfo._UNKNOWN_VALUE_STR
98        )
99        end_str = (
100            f'0x{self._thread.stack_pointer:08x}'
101            if self._thread.HasField('stack_pointer')
102            else ThreadInfo._UNKNOWN_VALUE_STR
103        )
104
105        # TODO(amontanez): Would be nice to represent stack growth direction.
106        return f'{start_str} - {end_str} ({self._stack_used_str()})'
107
108    def _stack_limit_range_str(self) -> str:
109        start_str = (
110            f'0x{self._thread.stack_start_pointer:08x}'
111            if self._thread.HasField('stack_start_pointer')
112            else ThreadInfo._UNKNOWN_VALUE_STR
113        )
114        end_str = (
115            f'0x{self._thread.stack_end_pointer:08x}'
116            if self._thread.HasField('stack_end_pointer')
117            else ThreadInfo._UNKNOWN_VALUE_STR
118        )
119
120        # TODO(amontanez): Would be nice to represent stack growth direction.
121        return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})'
122
123    def _stack_pointer_str(self) -> str:
124        return (
125            f'0x{self._thread.stack_end_pointer:08x}'
126            if self._thread.HasField('stack_pointer')
127            else ThreadInfo._UNKNOWN_VALUE_STR
128        )
129
130    def has_stack_size_limit(self) -> bool:
131        """Returns true if there's enough info to calculate stack size."""
132        return self._thread.HasField(
133            'stack_start_pointer'
134        ) and self._thread.HasField('stack_end_pointer')
135
136    def stack_size_limit(self) -> int:
137        """Returns the stack size limit in bytes.
138
139        Precondition:
140            has_stack_size_limit() must be true.
141        """
142        assert self.has_stack_size_limit(), 'Missing stack size information'
143        return abs(
144            self._thread.stack_start_pointer - self._thread.stack_end_pointer
145        )
146
147    def has_stack_used(self) -> bool:
148        """Returns true if there's enough info to calculate stack usage."""
149        return self._thread.HasField(
150            'stack_start_pointer'
151        ) and self._thread.HasField('stack_pointer')
152
153    def stack_used(self) -> int:
154        """Returns the stack usage in bytes.
155
156        Precondition:
157            has_stack_used() must be true.
158        """
159        assert self.has_stack_used(), 'Missing stack usage information'
160        return abs(
161            self._thread.stack_start_pointer - self._thread.stack_pointer
162        )
163
164    def has_stack_pointer_est_peak(self) -> bool:
165        """Returns true if there's enough info to calculate estimate
166        used stack.
167        """
168        return self._thread.HasField(
169            'stack_start_pointer'
170        ) and self._thread.HasField('stack_pointer_est_peak')
171
172    def stack_pointer_est_peak(self) -> int:
173        """Returns the max estimated used stack usage in bytes.
174
175        Precondition:
176            has_stack_estimated_used_bytes() must be true.
177        """
178        assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak'
179        return abs(
180            self._thread.stack_start_pointer
181            - self._thread.stack_pointer_est_peak
182        )
183
184    def __str__(self) -> str:
185        output = [
186            f'Est CPU usage: {self._cpu_used_str()}',
187            'Stack info',
188            f'  Current usage:   {self._stack_used_range_str()}',
189            f'  Est peak usage:  {self._stack_pointer_est_peak_str()}',
190            f'  Stack limits:    {self._stack_limit_range_str()}',
191        ]
192        return '\n'.join(output)
193
194
195class ThreadSnapshotAnalyzer:
196    """This class simplifies dumping contents of a snapshot Metadata message."""
197
198    def __init__(
199        self,
200        threads: thread_pb2.SnapshotThreadInfo,
201        tokenizer_db: Optional[pw_tokenizer.Detokenizer] = None,
202        symbolizer: Optional[Symbolizer] = None,
203    ):
204        self._threads = threads.threads
205        self._tokenizer_db = (
206            tokenizer_db
207            if tokenizer_db is not None
208            else pw_tokenizer.Detokenizer(None)
209        )
210        if symbolizer is not None:
211            self._symbolizer = symbolizer
212        else:
213            self._symbolizer = LlvmSymbolizer()
214
215        for thread in self._threads:
216            proto_detokenizer.detokenize_fields(self._tokenizer_db, thread)
217
218    def active_thread(self) -> Optional[thread_pb2.Thread]:
219        """The thread that requested the snapshot capture."""
220        # First check if an interrupt handler was active.
221        for thread in self._threads:
222            if thread.state == thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER:
223                return thread
224            if thread.active:  # The deprecated legacy way to report this.
225                return thread
226
227        # If not, search for a running thread.
228        for thread in self._threads:
229            if thread.state == thread_pb2.ThreadState.Enum.RUNNING:
230                return thread
231
232        return None
233
234    def __str__(self) -> str:
235        """outputs a pw.snapshot.Metadata proto as a multi-line string."""
236        output: List[str] = []
237        if not self._threads:
238            return ''
239
240        output.append('Thread State')
241        plural = '' if len(self._threads) == 1 else 's'
242        thread_state_overview = f'  {len(self._threads)} thread{plural} running'
243        requesting_thread = self.active_thread()
244        if not requesting_thread:
245            thread_state_overview += '.'
246            output.append(thread_state_overview)
247        else:
248            thread_state_overview += ', '
249            underline = ' ' * len(thread_state_overview) + '~' * len(
250                requesting_thread.name.decode()
251            )
252            thread_state_overview += (
253                f'{requesting_thread.name.decode()}'
254                ' active at the time of capture.'
255            )
256            output.append(thread_state_overview)
257            output.append(underline)
258
259        output.append('')
260
261        # Put the active thread at the front.
262        requesting_thread = self.active_thread()
263        if requesting_thread is not None:
264            self._threads.remove(requesting_thread)
265            self._threads.insert(0, requesting_thread)
266
267        for thread in self._threads:
268            thread_name = thread.name.decode()
269            if not thread_name:
270                thread_name = '[unnamed thread]'
271            thread_headline = (
272                'Thread '
273                f'({THREAD_STATE_TO_STRING[thread.state]}): '
274                f'{thread_name}'
275            )
276            if self.active_thread() == thread:
277                thread_headline += ' <-- [ACTIVE]'
278            output.append(thread_headline)
279            output.append(str(ThreadInfo(thread)))
280            if thread.raw_backtrace:
281                output.append(
282                    self._symbolizer.dump_stack_trace(thread.raw_backtrace)
283                )
284            if thread.raw_stack:
285                output.append('Raw Stack')
286                output.append(
287                    binascii.hexlify(thread.raw_stack, b'\n', 32).decode(
288                        'utf-8'
289                    )
290                )
291            # Blank line between threads for nicer formatting.
292            output.append('')
293
294        return '\n'.join(output)
295