1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tool for processing and outputting Snapshot protos as text""" 15 16import argparse 17import functools 18import logging 19import sys 20from pathlib import Path 21from typing import Optional, BinaryIO, TextIO, Callable 22import pw_tokenizer 23import pw_cpu_exception_cortex_m 24import pw_build_info.build_id 25from pw_snapshot_metadata import metadata 26from pw_snapshot_protos import snapshot_pb2 27from pw_symbolizer import LlvmSymbolizer, Symbolizer 28from pw_thread import thread_analyzer 29from pw_chrono import timestamp_analyzer 30 31_LOG = logging.getLogger('snapshot_processor') 32 33_BRANDING = """ 34 ____ _ __ _____ _ _____ ____ _____ __ ______ ______ 35 / __ \\ | / / / ___// | / / | / __ \\/ ___// / / / __ \\/_ __/ 36 / /_/ / | /| / / \\__ \\/ |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / / 37 / ____/| |/ |/ / ___/ / /| / ___ |/ ____/___/ / __ / /_/ / / / 38 /_/ |__/|__/____/____/_/ |_/_/ |_/_/ /____/_/ /_/\\____/ /_/ 39 /_____/ 40 41""" 42 43# Deprecated, use SymbolizerMatcher. Will be removed shortly. 44ElfMatcher = Callable[[snapshot_pb2.Snapshot], Optional[Path]] 45 46# Symbolizers are useful for turning addresses into source code locations and 47# function names. As a single snapshot may contain embedded snapshots from 48# multiple devices, there's a need to match ELF files to the correct snapshot to 49# correctly symbolize addresses. 50# 51# A SymbolizerMatcher is a function that takes a snapshot and investigates its 52# metadata (often build ID, device name, or the version string) to determine 53# whether a Symbolizer may be loaded with a suitable ELF file for symbolization. 54SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer] 55 56 57def process_snapshot( 58 serialized_snapshot: bytes, 59 detokenizer: Optional[pw_tokenizer.Detokenizer] = None, 60 elf_matcher: Optional[ElfMatcher] = None, 61 symbolizer_matcher: Optional[SymbolizerMatcher] = None, 62) -> str: 63 """Processes a single snapshot.""" 64 65 output = [_BRANDING] 66 67 captured_metadata = metadata.process_snapshot( 68 serialized_snapshot, detokenizer 69 ) 70 if captured_metadata: 71 output.append(captured_metadata) 72 73 # Open a symbolizer. 74 snapshot = snapshot_pb2.Snapshot() 75 snapshot.ParseFromString(serialized_snapshot) 76 77 if symbolizer_matcher is not None: 78 symbolizer = symbolizer_matcher(snapshot) 79 elif elf_matcher is not None: 80 symbolizer = LlvmSymbolizer(elf_matcher(snapshot)) 81 else: 82 symbolizer = LlvmSymbolizer() 83 84 cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot( 85 serialized_snapshot, symbolizer 86 ) 87 if cortex_m_cpu_state: 88 output.append(cortex_m_cpu_state) 89 90 thread_info = thread_analyzer.process_snapshot( 91 serialized_snapshot, detokenizer, symbolizer 92 ) 93 94 if thread_info: 95 output.append(thread_info) 96 97 timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot) 98 99 if timestamp_info: 100 output.append(timestamp_info) 101 102 # Check and emit the number of related snapshots embedded in this snapshot. 103 if snapshot.related_snapshots: 104 snapshot_count = len(snapshot.related_snapshots) 105 plural = 's' if snapshot_count > 1 else '' 106 output.append( 107 f'This snapshot contains {snapshot_count} related snapshot{plural}' 108 ) 109 output.append('') 110 111 return '\n'.join(output) 112 113 114def process_snapshots( 115 serialized_snapshot: bytes, 116 detokenizer: Optional[pw_tokenizer.Detokenizer] = None, 117 elf_matcher: Optional[ElfMatcher] = None, 118 user_processing_callback: Optional[Callable[[bytes], str]] = None, 119 symbolizer_matcher: Optional[SymbolizerMatcher] = None, 120) -> str: 121 """Processes a snapshot that may have multiple embedded snapshots.""" 122 output = [] 123 # Process the top-level snapshot. 124 output.append( 125 process_snapshot( 126 serialized_snapshot, detokenizer, elf_matcher, symbolizer_matcher 127 ) 128 ) 129 130 # If the user provided a custom processing callback, call it on each 131 # snapshot. 132 if user_processing_callback is not None: 133 output.append(user_processing_callback(serialized_snapshot)) 134 135 # Process any related snapshots that were embedded in this one. 136 snapshot = snapshot_pb2.Snapshot() 137 snapshot.ParseFromString(serialized_snapshot) 138 for nested_snapshot in snapshot.related_snapshots: 139 output.append('\n[' + '=' * 78 + ']\n') 140 output.append( 141 str( 142 process_snapshots( 143 nested_snapshot.SerializeToString(), 144 detokenizer, 145 elf_matcher, 146 user_processing_callback, 147 symbolizer_matcher, 148 ) 149 ) 150 ) 151 152 return '\n'.join(output) 153 154 155def _snapshot_symbolizer_matcher( 156 artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot 157) -> LlvmSymbolizer: 158 matching_elf: Optional[Path] = pw_build_info.build_id.find_matching_elf( 159 snapshot.metadata.software_build_uuid, artifacts_dir 160 ) 161 if not matching_elf: 162 _LOG.error( 163 'Error: No matching ELF found for GNU build ID %s.', 164 snapshot.metadata.software_build_uuid.hex(), 165 ) 166 return LlvmSymbolizer(matching_elf) 167 168 169def _load_and_dump_snapshots( 170 in_file: BinaryIO, 171 out_file: TextIO, 172 token_db: Optional[TextIO], 173 artifacts_dir: Optional[Path], 174): 175 detokenizer = None 176 if token_db: 177 detokenizer = pw_tokenizer.Detokenizer(token_db) 178 symbolizer_matcher: Optional[SymbolizerMatcher] = None 179 if artifacts_dir: 180 symbolizer_matcher = functools.partial( 181 _snapshot_symbolizer_matcher, artifacts_dir 182 ) 183 out_file.write( 184 process_snapshots( 185 serialized_snapshot=in_file.read(), 186 detokenizer=detokenizer, 187 symbolizer_matcher=symbolizer_matcher, 188 ) 189 ) 190 191 192def _parse_args(): 193 parser = argparse.ArgumentParser(description='Decode Pigweed snapshots') 194 parser.add_argument( 195 'in_file', type=argparse.FileType('rb'), help='Binary snapshot file' 196 ) 197 parser.add_argument( 198 '--out-file', 199 '-o', 200 default='-', 201 type=argparse.FileType('wb'), 202 help='File to output decoded snapshots to. Defaults to stdout.', 203 ) 204 parser.add_argument( 205 '--token-db', 206 type=argparse.FileType('r'), 207 help='Token database or ELF file to use for detokenization.', 208 ) 209 parser.add_argument( 210 '--artifacts-dir', 211 type=Path, 212 help=( 213 'Directory to recursively search for matching ELF files to use ' 214 'for symbolization.' 215 ), 216 ) 217 return parser.parse_args() 218 219 220if __name__ == '__main__': 221 logging.basicConfig(format='%(message)s', level=logging.INFO) 222 _load_and_dump_snapshots(**vars(_parse_args())) 223 sys.exit(0) 224