1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Library to analyze and dump Thread protos and Thread snapshots into text.""" 15 16import binascii 17from typing import Callable, Mapping 18import pw_tokenizer 19from pw_symbolizer import LlvmSymbolizer, Symbolizer 20from pw_tokenizer import proto as proto_detokenizer 21from pw_thread_protos import thread_pb2 22 23THREAD_STATE_TO_STRING: Mapping[int, str] = { 24 thread_pb2.ThreadState.Enum.UNKNOWN: 'UNKNOWN', 25 thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 'INTERRUPT_HANDLER', 26 thread_pb2.ThreadState.Enum.RUNNING: 'RUNNING', 27 thread_pb2.ThreadState.Enum.READY: 'READY', 28 thread_pb2.ThreadState.Enum.SUSPENDED: 'SUSPENDED', 29 thread_pb2.ThreadState.Enum.BLOCKED: 'BLOCKED', 30 thread_pb2.ThreadState.Enum.INACTIVE: 'INACTIVE', 31} 32 33 34def process_snapshot( 35 serialized_snapshot: bytes, 36 tokenizer_db: pw_tokenizer.Detokenizer | None = None, 37 symbolizer: Symbolizer | None = None, 38 user_processing_callback: Callable[[bytes], str] | None = None, 39) -> str: 40 """Processes snapshot threads, producing a multi-line string.""" 41 captured_threads = thread_pb2.SnapshotThreadInfo() 42 captured_threads.ParseFromString(serialized_snapshot) 43 if symbolizer is None: 44 symbolizer = LlvmSymbolizer() 45 46 return str( 47 ThreadSnapshotAnalyzer( 48 captured_threads, tokenizer_db, symbolizer, user_processing_callback 49 ) 50 ) 51 52 53class ThreadInfo: 54 """Provides CPU and stack information that can be inferred from a Thread.""" 55 56 _UNKNOWN_VALUE_STR = '0x' + '?' * 8 57 58 def __init__(self, thread: thread_pb2.Thread): 59 self._thread = thread 60 61 def _cpu_used_str(self) -> str: 62 if not self._thread.HasField('cpu_usage_hundredths'): 63 return 'unknown' 64 cpu_last_percent = self._thread.cpu_usage_hundredths / 100 65 return f'{cpu_last_percent:.2f}%' 66 67 def _stack_size_limit_limit_str(self) -> str: 68 if not self.has_stack_size_limit(): 69 return 'size unknown' 70 if self.stack_size_limit() == 0: 71 return 'WARNING: total stack size is 0 bytes' 72 73 return f'{self.stack_size_limit()} bytes' 74 75 def _stack_used_str(self) -> str: 76 if not self.has_stack_used(): 77 return 'size unknown' 78 79 used_str = f'{self.stack_used()} bytes' 80 if not self.has_stack_size_limit(): 81 return used_str 82 if self.stack_size_limit() == 0: 83 return used_str + ', NaN%' 84 used_str += f', {100*self.stack_used()/self.stack_size_limit():.2f}%' 85 return used_str 86 87 def _stack_pointer_est_peak_str(self) -> str: 88 if not self.has_stack_pointer_est_peak(): 89 return 'size unknown' 90 91 high_used_str = f'{self.stack_pointer_est_peak()} bytes' 92 if not self.has_stack_size_limit(): 93 return high_used_str 94 if self.stack_size_limit() == 0: 95 return high_used_str + ', NaN%' 96 high_water_mark_percent = ( 97 100 * self.stack_pointer_est_peak() / self.stack_size_limit() 98 ) 99 high_used_str += f', {high_water_mark_percent:.2f}%' 100 return high_used_str 101 102 def _stack_used_range_str(self) -> str: 103 start_str = ( 104 f'0x{self._thread.stack_start_pointer:08x}' 105 if self._thread.HasField('stack_start_pointer') 106 else ThreadInfo._UNKNOWN_VALUE_STR 107 ) 108 end_str = ( 109 f'0x{self._thread.stack_pointer:08x}' 110 if self._thread.HasField('stack_pointer') 111 else ThreadInfo._UNKNOWN_VALUE_STR 112 ) 113 114 # TODO(amontanez): Would be nice to represent stack growth direction. 115 return f'{start_str} - {end_str} ({self._stack_used_str()})' 116 117 def _stack_limit_range_str(self) -> str: 118 start_str = ( 119 f'0x{self._thread.stack_start_pointer:08x}' 120 if self._thread.HasField('stack_start_pointer') 121 else ThreadInfo._UNKNOWN_VALUE_STR 122 ) 123 end_str = ( 124 f'0x{self._thread.stack_end_pointer:08x}' 125 if self._thread.HasField('stack_end_pointer') 126 else ThreadInfo._UNKNOWN_VALUE_STR 127 ) 128 129 # TODO(amontanez): Would be nice to represent stack growth direction. 130 return f'{start_str} - {end_str} ({self._stack_size_limit_limit_str()})' 131 132 def _stack_pointer_str(self) -> str: 133 return ( 134 f'0x{self._thread.stack_end_pointer:08x}' 135 if self._thread.HasField('stack_pointer') 136 else ThreadInfo._UNKNOWN_VALUE_STR 137 ) 138 139 def has_stack_size_limit(self) -> bool: 140 """Returns true if there's enough info to calculate stack size.""" 141 return self._thread.HasField( 142 'stack_start_pointer' 143 ) and self._thread.HasField('stack_end_pointer') 144 145 def stack_size_limit(self) -> int: 146 """Returns the stack size limit in bytes. 147 148 Precondition: 149 has_stack_size_limit() must be true. 150 """ 151 assert self.has_stack_size_limit(), 'Missing stack size information' 152 return abs( 153 self._thread.stack_start_pointer - self._thread.stack_end_pointer 154 ) 155 156 def has_stack_used(self) -> bool: 157 """Returns true if there's enough info to calculate stack usage.""" 158 return self._thread.HasField( 159 'stack_start_pointer' 160 ) and self._thread.HasField('stack_pointer') 161 162 def stack_used(self) -> int: 163 """Returns the stack usage in bytes. 164 165 Precondition: 166 has_stack_used() must be true. 167 """ 168 assert self.has_stack_used(), 'Missing stack usage information' 169 return abs( 170 self._thread.stack_start_pointer - self._thread.stack_pointer 171 ) 172 173 def has_stack_pointer_est_peak(self) -> bool: 174 """Returns true if there's enough info to calculate estimate 175 used stack. 176 """ 177 return self._thread.HasField( 178 'stack_start_pointer' 179 ) and self._thread.HasField('stack_pointer_est_peak') 180 181 def stack_pointer_est_peak(self) -> int: 182 """Returns the max estimated used stack usage in bytes. 183 184 Precondition: 185 has_stack_estimated_used_bytes() must be true. 186 """ 187 assert self.has_stack_pointer_est_peak(), 'Missing stack est. peak' 188 return abs( 189 self._thread.stack_start_pointer 190 - self._thread.stack_pointer_est_peak 191 ) 192 193 def __str__(self) -> str: 194 output = [ 195 f'Est CPU usage: {self._cpu_used_str()}', 196 'Stack info', 197 f' Current usage: {self._stack_used_range_str()}', 198 f' Est peak usage: {self._stack_pointer_est_peak_str()}', 199 f' Stack limits: {self._stack_limit_range_str()}', 200 ] 201 return '\n'.join(output) 202 203 204class ThreadSnapshotAnalyzer: 205 """This class simplifies dumping contents of a snapshot Metadata message.""" 206 207 def __init__( 208 self, 209 threads: thread_pb2.SnapshotThreadInfo, 210 tokenizer_db: pw_tokenizer.Detokenizer | None = None, 211 symbolizer: Symbolizer | None = None, 212 user_processing_callback: Callable[[bytes], str] | None = None, 213 ): 214 self._threads = threads.threads 215 self._tokenizer_db = ( 216 tokenizer_db 217 if tokenizer_db is not None 218 else pw_tokenizer.Detokenizer(None) 219 ) 220 if symbolizer is not None: 221 self._symbolizer = symbolizer 222 else: 223 self._symbolizer = LlvmSymbolizer() 224 self._user_processing_callback = user_processing_callback 225 226 for thread in self._threads: 227 proto_detokenizer.detokenize_fields(self._tokenizer_db, thread) 228 229 def active_thread(self) -> thread_pb2.Thread | None: 230 """The thread that requested the snapshot capture.""" 231 # First check if an interrupt handler was active. 232 for thread in self._threads: 233 if thread.state == thread_pb2.ThreadState.Enum.INTERRUPT_HANDLER: 234 return thread 235 if thread.active: # The deprecated legacy way to report this. 236 return thread 237 238 # If not, search for a running thread. 239 for thread in self._threads: 240 if thread.state == thread_pb2.ThreadState.Enum.RUNNING: 241 return thread 242 243 return None 244 245 def __str__(self) -> str: 246 """outputs a pw.snapshot.Metadata proto as a multi-line string.""" 247 output: list[str] = [] 248 if not self._threads: 249 return '' 250 251 output.append('Thread State') 252 plural = '' if len(self._threads) == 1 else 's' 253 thread_state_overview = f' {len(self._threads)} thread{plural} running' 254 requesting_thread = self.active_thread() 255 if not requesting_thread: 256 thread_state_overview += '.' 257 output.append(thread_state_overview) 258 else: 259 thread_state_overview += ', ' 260 underline = ' ' * len(thread_state_overview) + '~' * len( 261 requesting_thread.name.decode() 262 ) 263 thread_state_overview += ( 264 f'{requesting_thread.name.decode()}' 265 ' active at the time of capture.' 266 ) 267 output.append(thread_state_overview) 268 output.append(underline) 269 270 output.append('') 271 272 # Put the active thread at the front. 273 requesting_thread = self.active_thread() 274 if requesting_thread is not None: 275 self._threads.remove(requesting_thread) 276 self._threads.insert(0, requesting_thread) 277 278 for thread in self._threads: 279 thread_name = thread.name.decode() 280 if not thread_name: 281 thread_name = '[unnamed thread]' 282 thread_headline = ( 283 'Thread ' 284 f'({THREAD_STATE_TO_STRING[thread.state]}): ' 285 f'{thread_name}' 286 ) 287 if self.active_thread() == thread: 288 thread_headline += ' <-- [ACTIVE]' 289 output.append(thread_headline) 290 output.append(str(ThreadInfo(thread))) 291 if thread.raw_backtrace: 292 output.append( 293 self._symbolizer.dump_stack_trace(thread.raw_backtrace) 294 ) 295 if thread.raw_stack: 296 output.append('Raw Stack') 297 output.append( 298 binascii.hexlify(thread.raw_stack, b'\n', 32).decode( 299 'utf-8' 300 ) 301 ) 302 if self._user_processing_callback is not None: 303 output.append( 304 self._user_processing_callback(thread.SerializeToString()) 305 ) 306 307 # Blank line between threads for nicer formatting. 308 output.append('') 309 310 return '\n'.join(output) 311