• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tool for processing and outputting Snapshot protos as text"""
15
16import argparse
17import functools
18import logging
19import sys
20from pathlib import Path
21from typing import BinaryIO, TextIO, Callable
22import pw_tokenizer
23import pw_cpu_exception_cortex_m
24import pw_cpu_exception_risc_v
25import pw_build_info.build_id
26from pw_snapshot_metadata import metadata
27from pw_snapshot_metadata_proto import snapshot_metadata_pb2
28from pw_snapshot_protos import snapshot_pb2
29from pw_symbolizer import LlvmSymbolizer, Symbolizer
30from pw_thread import thread_analyzer
31from pw_chrono import timestamp_analyzer
32
33_LOG = logging.getLogger('snapshot_processor')
34
35_BRANDING = """
36        ____ _       __    _____ _   _____    ____  _____ __  ______  ______
37       / __ \\ |     / /   / ___// | / /   |  / __ \\/ ___// / / / __ \\/_  __/
38      / /_/ / | /| / /    \\__ \\/  |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / /
39     / ____/| |/ |/ /    ___/ / /|  / ___ |/ ____/___/ / __  / /_/ / / /
40    /_/     |__/|__/____/____/_/ |_/_/  |_/_/    /____/_/ /_/\\____/ /_/
41                  /_____/
42
43"""
44
45# Deprecated, use SymbolizerMatcher. Will be removed shortly.
46ElfMatcher = Callable[[snapshot_pb2.Snapshot], Path | None]
47
48# Symbolizers are useful for turning addresses into source code locations and
49# function names. As a single snapshot may contain embedded snapshots from
50# multiple devices, there's a need to match ELF files to the correct snapshot to
51# correctly symbolize addresses.
52#
53# A SymbolizerMatcher is a function that takes a snapshot and investigates its
54# metadata (often build ID, device name, or the version string) to determine
55# whether a Symbolizer may be loaded with a suitable ELF file for symbolization.
56SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer]
57
58
59def process_snapshot(
60    serialized_snapshot: bytes,
61    detokenizer: pw_tokenizer.Detokenizer | None = None,
62    elf_matcher: ElfMatcher | None = None,
63    symbolizer_matcher: SymbolizerMatcher | None = None,
64    llvm_symbolizer_binary: Path | None = None,
65) -> str:
66    """Processes a single snapshot."""
67
68    output = [_BRANDING]
69
70    # Open a symbolizer.
71    snapshot = snapshot_pb2.Snapshot()
72    snapshot.ParseFromString(serialized_snapshot)
73
74    if symbolizer_matcher is not None:
75        symbolizer = symbolizer_matcher(snapshot)
76    elif elf_matcher is not None:
77        symbolizer = LlvmSymbolizer(
78            elf_matcher(snapshot), llvm_symbolizer_binary=llvm_symbolizer_binary
79        )
80    else:
81        symbolizer = LlvmSymbolizer(
82            llvm_symbolizer_binary=llvm_symbolizer_binary
83        )
84
85    captured_metadata = metadata.process_snapshot(
86        serialized_snapshot, detokenizer
87    )
88    if captured_metadata:
89        output.append(captured_metadata)
90
91    # Create MetadataProcessor
92    snapshot_metadata = snapshot_metadata_pb2.SnapshotBasicInfo()
93    snapshot_metadata.ParseFromString(serialized_snapshot)
94    metadata_processor = metadata.MetadataProcessor(
95        snapshot_metadata.metadata, detokenizer
96    )
97
98    # Check which CPU architecture to process the snapshot with.
99    if metadata_processor.cpu_arch().startswith("RV"):
100        risc_v_cpu_state = pw_cpu_exception_risc_v.process_snapshot(
101            serialized_snapshot, symbolizer
102        )
103        if risc_v_cpu_state:
104            output.append(risc_v_cpu_state)
105    else:
106        cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot(
107            serialized_snapshot, symbolizer
108        )
109        if cortex_m_cpu_state:
110            output.append(cortex_m_cpu_state)
111
112    thread_info = thread_analyzer.process_snapshot(
113        serialized_snapshot, detokenizer, symbolizer
114    )
115
116    if thread_info:
117        output.append(thread_info)
118
119    timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot)
120
121    if timestamp_info:
122        output.append(timestamp_info)
123
124    # Check and emit the number of related snapshots embedded in this snapshot.
125    if snapshot.related_snapshots:
126        snapshot_count = len(snapshot.related_snapshots)
127        plural = 's' if snapshot_count > 1 else ''
128        output.append(
129            f'This snapshot contains {snapshot_count} related snapshot{plural}'
130        )
131        output.append('')
132
133    return '\n'.join(output)
134
135
136def process_snapshots(
137    serialized_snapshot: bytes,
138    detokenizer: pw_tokenizer.Detokenizer | None = None,
139    elf_matcher: ElfMatcher | None = None,
140    user_processing_callback: Callable[[bytes], str] | None = None,
141    symbolizer_matcher: SymbolizerMatcher | None = None,
142) -> str:
143    """Processes a snapshot that may have multiple embedded snapshots."""
144    output = []
145    # Process the top-level snapshot.
146    output.append(
147        process_snapshot(
148            serialized_snapshot, detokenizer, elf_matcher, symbolizer_matcher
149        )
150    )
151
152    # If the user provided a custom processing callback, call it on each
153    # snapshot.
154    if user_processing_callback is not None:
155        output.append(user_processing_callback(serialized_snapshot))
156
157    # Process any related snapshots that were embedded in this one.
158    snapshot = snapshot_pb2.Snapshot()
159    snapshot.ParseFromString(serialized_snapshot)
160    for nested_snapshot in snapshot.related_snapshots:
161        output.append('\n[' + '=' * 78 + ']\n')
162        output.append(
163            str(
164                process_snapshots(
165                    nested_snapshot.SerializeToString(),
166                    detokenizer,
167                    elf_matcher,
168                    user_processing_callback,
169                    symbolizer_matcher,
170                )
171            )
172        )
173
174    return '\n'.join(output)
175
176
177def _snapshot_symbolizer_matcher(
178    artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot
179) -> LlvmSymbolizer:
180    matching_elf: Path | None = pw_build_info.build_id.find_matching_elf(
181        snapshot.metadata.software_build_uuid, artifacts_dir
182    )
183    if not matching_elf:
184        _LOG.error(
185            'Error: No matching ELF found for GNU build ID %s.',
186            snapshot.metadata.software_build_uuid.hex(),
187        )
188    return LlvmSymbolizer(matching_elf)
189
190
191def _load_and_dump_snapshots(
192    in_file: BinaryIO,
193    out_file: TextIO,
194    token_db: TextIO | None,
195    artifacts_dir: Path | None,
196):
197    detokenizer = None
198    if token_db:
199        detokenizer = pw_tokenizer.Detokenizer(token_db)
200    symbolizer_matcher: SymbolizerMatcher | None = None
201    if artifacts_dir:
202        symbolizer_matcher = functools.partial(
203            _snapshot_symbolizer_matcher, artifacts_dir
204        )
205    out_file.write(
206        process_snapshots(
207            serialized_snapshot=in_file.read(),
208            detokenizer=detokenizer,
209            symbolizer_matcher=symbolizer_matcher,
210        )
211    )
212
213
214def _parse_args():
215    parser = argparse.ArgumentParser(description='Decode Pigweed snapshots')
216    parser.add_argument(
217        'in_file', type=argparse.FileType('rb'), help='Binary snapshot file'
218    )
219    parser.add_argument(
220        '--out-file',
221        '-o',
222        default='-',
223        type=argparse.FileType('wb'),
224        help='File to output decoded snapshots to. Defaults to stdout.',
225    )
226    parser.add_argument(
227        '--token-db',
228        type=argparse.FileType('r'),
229        help='Token database or ELF file to use for detokenization.',
230    )
231    parser.add_argument(
232        '--artifacts-dir',
233        type=Path,
234        help=(
235            'Directory to recursively search for matching ELF files to use '
236            'for symbolization.'
237        ),
238    )
239    return parser.parse_args()
240
241
242if __name__ == '__main__':
243    logging.basicConfig(format='%(message)s', level=logging.INFO)
244    _load_and_dump_snapshots(**vars(_parse_args()))
245    sys.exit(0)
246