1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tool for processing and outputting Snapshot protos as text""" 15 16import argparse 17import functools 18import logging 19import sys 20from pathlib import Path 21from typing import BinaryIO, TextIO, Callable 22import pw_tokenizer 23import pw_cpu_exception_cortex_m 24import pw_cpu_exception_risc_v 25import pw_build_info.build_id 26from pw_snapshot_metadata import metadata 27from pw_snapshot_metadata_proto import snapshot_metadata_pb2 28from pw_snapshot_protos import snapshot_pb2 29from pw_symbolizer import LlvmSymbolizer, Symbolizer 30from pw_thread import thread_analyzer 31from pw_chrono import timestamp_analyzer 32 33_LOG = logging.getLogger('snapshot_processor') 34 35_BRANDING = """ 36 ____ _ __ _____ _ _____ ____ _____ __ ______ ______ 37 / __ \\ | / / / ___// | / / | / __ \\/ ___// / / / __ \\/_ __/ 38 / /_/ / | /| / / \\__ \\/ |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / / 39 / ____/| |/ |/ / ___/ / /| / ___ |/ ____/___/ / __ / /_/ / / / 40 /_/ |__/|__/____/____/_/ |_/_/ |_/_/ /____/_/ /_/\\____/ /_/ 41 /_____/ 42 43""" 44 45# Deprecated, use SymbolizerMatcher. Will be removed shortly. 46ElfMatcher = Callable[[snapshot_pb2.Snapshot], Path | None] 47 48# Symbolizers are useful for turning addresses into source code locations and 49# function names. As a single snapshot may contain embedded snapshots from 50# multiple devices, there's a need to match ELF files to the correct snapshot to 51# correctly symbolize addresses. 52# 53# A SymbolizerMatcher is a function that takes a snapshot and investigates its 54# metadata (often build ID, device name, or the version string) to determine 55# whether a Symbolizer may be loaded with a suitable ELF file for symbolization. 56SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer] 57 58 59def process_snapshot( 60 serialized_snapshot: bytes, 61 detokenizer: pw_tokenizer.Detokenizer | None = None, 62 elf_matcher: ElfMatcher | None = None, 63 symbolizer_matcher: SymbolizerMatcher | None = None, 64 llvm_symbolizer_binary: Path | None = None, 65) -> str: 66 """Processes a single snapshot.""" 67 68 output = [_BRANDING] 69 70 # Open a symbolizer. 71 snapshot = snapshot_pb2.Snapshot() 72 snapshot.ParseFromString(serialized_snapshot) 73 74 if symbolizer_matcher is not None: 75 symbolizer = symbolizer_matcher(snapshot) 76 elif elf_matcher is not None: 77 symbolizer = LlvmSymbolizer( 78 elf_matcher(snapshot), llvm_symbolizer_binary=llvm_symbolizer_binary 79 ) 80 else: 81 symbolizer = LlvmSymbolizer( 82 llvm_symbolizer_binary=llvm_symbolizer_binary 83 ) 84 85 captured_metadata = metadata.process_snapshot( 86 serialized_snapshot, detokenizer 87 ) 88 if captured_metadata: 89 output.append(captured_metadata) 90 91 # Create MetadataProcessor 92 snapshot_metadata = snapshot_metadata_pb2.SnapshotBasicInfo() 93 snapshot_metadata.ParseFromString(serialized_snapshot) 94 metadata_processor = metadata.MetadataProcessor( 95 snapshot_metadata.metadata, detokenizer 96 ) 97 98 # Check which CPU architecture to process the snapshot with. 99 if metadata_processor.cpu_arch().startswith("RV"): 100 risc_v_cpu_state = pw_cpu_exception_risc_v.process_snapshot( 101 serialized_snapshot, symbolizer 102 ) 103 if risc_v_cpu_state: 104 output.append(risc_v_cpu_state) 105 else: 106 cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot( 107 serialized_snapshot, symbolizer 108 ) 109 if cortex_m_cpu_state: 110 output.append(cortex_m_cpu_state) 111 112 thread_info = thread_analyzer.process_snapshot( 113 serialized_snapshot, detokenizer, symbolizer 114 ) 115 116 if thread_info: 117 output.append(thread_info) 118 119 timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot) 120 121 if timestamp_info: 122 output.append(timestamp_info) 123 124 # Check and emit the number of related snapshots embedded in this snapshot. 125 if snapshot.related_snapshots: 126 snapshot_count = len(snapshot.related_snapshots) 127 plural = 's' if snapshot_count > 1 else '' 128 output.append( 129 f'This snapshot contains {snapshot_count} related snapshot{plural}' 130 ) 131 output.append('') 132 133 return '\n'.join(output) 134 135 136def process_snapshots( 137 serialized_snapshot: bytes, 138 detokenizer: pw_tokenizer.Detokenizer | None = None, 139 elf_matcher: ElfMatcher | None = None, 140 user_processing_callback: Callable[[bytes], str] | None = None, 141 symbolizer_matcher: SymbolizerMatcher | None = None, 142) -> str: 143 """Processes a snapshot that may have multiple embedded snapshots.""" 144 output = [] 145 # Process the top-level snapshot. 146 output.append( 147 process_snapshot( 148 serialized_snapshot, detokenizer, elf_matcher, symbolizer_matcher 149 ) 150 ) 151 152 # If the user provided a custom processing callback, call it on each 153 # snapshot. 154 if user_processing_callback is not None: 155 output.append(user_processing_callback(serialized_snapshot)) 156 157 # Process any related snapshots that were embedded in this one. 158 snapshot = snapshot_pb2.Snapshot() 159 snapshot.ParseFromString(serialized_snapshot) 160 for nested_snapshot in snapshot.related_snapshots: 161 output.append('\n[' + '=' * 78 + ']\n') 162 output.append( 163 str( 164 process_snapshots( 165 nested_snapshot.SerializeToString(), 166 detokenizer, 167 elf_matcher, 168 user_processing_callback, 169 symbolizer_matcher, 170 ) 171 ) 172 ) 173 174 return '\n'.join(output) 175 176 177def _snapshot_symbolizer_matcher( 178 artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot 179) -> LlvmSymbolizer: 180 matching_elf: Path | None = pw_build_info.build_id.find_matching_elf( 181 snapshot.metadata.software_build_uuid, artifacts_dir 182 ) 183 if not matching_elf: 184 _LOG.error( 185 'Error: No matching ELF found for GNU build ID %s.', 186 snapshot.metadata.software_build_uuid.hex(), 187 ) 188 return LlvmSymbolizer(matching_elf) 189 190 191def _load_and_dump_snapshots( 192 in_file: BinaryIO, 193 out_file: TextIO, 194 token_db: TextIO | None, 195 artifacts_dir: Path | None, 196): 197 detokenizer = None 198 if token_db: 199 detokenizer = pw_tokenizer.Detokenizer(token_db) 200 symbolizer_matcher: SymbolizerMatcher | None = None 201 if artifacts_dir: 202 symbolizer_matcher = functools.partial( 203 _snapshot_symbolizer_matcher, artifacts_dir 204 ) 205 out_file.write( 206 process_snapshots( 207 serialized_snapshot=in_file.read(), 208 detokenizer=detokenizer, 209 symbolizer_matcher=symbolizer_matcher, 210 ) 211 ) 212 213 214def _parse_args(): 215 parser = argparse.ArgumentParser(description='Decode Pigweed snapshots') 216 parser.add_argument( 217 'in_file', type=argparse.FileType('rb'), help='Binary snapshot file' 218 ) 219 parser.add_argument( 220 '--out-file', 221 '-o', 222 default='-', 223 type=argparse.FileType('wb'), 224 help='File to output decoded snapshots to. Defaults to stdout.', 225 ) 226 parser.add_argument( 227 '--token-db', 228 type=argparse.FileType('r'), 229 help='Token database or ELF file to use for detokenization.', 230 ) 231 parser.add_argument( 232 '--artifacts-dir', 233 type=Path, 234 help=( 235 'Directory to recursively search for matching ELF files to use ' 236 'for symbolization.' 237 ), 238 ) 239 return parser.parse_args() 240 241 242if __name__ == '__main__': 243 logging.basicConfig(format='%(message)s', level=logging.INFO) 244 _load_and_dump_snapshots(**vars(_parse_args())) 245 sys.exit(0) 246