• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tool for processing and outputting Snapshot protos as text"""
15
16import argparse
17import functools
18import logging
19import sys
20from pathlib import Path
21from typing import Optional, BinaryIO, TextIO, Callable
22import pw_tokenizer
23import pw_cpu_exception_cortex_m
24import pw_build_info.build_id
25from pw_snapshot_metadata import metadata
26from pw_snapshot_protos import snapshot_pb2
27from pw_symbolizer import LlvmSymbolizer, Symbolizer
28from pw_thread import thread_analyzer
29from pw_chrono import timestamp_analyzer
30
31_LOG = logging.getLogger('snapshot_processor')
32
33_BRANDING = """
34        ____ _       __    _____ _   _____    ____  _____ __  ______  ______
35       / __ \\ |     / /   / ___// | / /   |  / __ \\/ ___// / / / __ \\/_  __/
36      / /_/ / | /| / /    \\__ \\/  |/ / /| | / /_/ /\\__ \\/ /_/ / / / / / /
37     / ____/| |/ |/ /    ___/ / /|  / ___ |/ ____/___/ / __  / /_/ / / /
38    /_/     |__/|__/____/____/_/ |_/_/  |_/_/    /____/_/ /_/\\____/ /_/
39                  /_____/
40
41"""
42
43# Deprecated, use SymbolizerMatcher. Will be removed shortly.
44ElfMatcher = Callable[[snapshot_pb2.Snapshot], Optional[Path]]
45
46# Symbolizers are useful for turning addresses into source code locations and
47# function names. As a single snapshot may contain embedded snapshots from
48# multiple devices, there's a need to match ELF files to the correct snapshot to
49# correctly symbolize addresses.
50#
51# A SymbolizerMatcher is a function that takes a snapshot and investigates its
52# metadata (often build ID, device name, or the version string) to determine
53# whether a Symbolizer may be loaded with a suitable ELF file for symbolization.
54SymbolizerMatcher = Callable[[snapshot_pb2.Snapshot], Symbolizer]
55
56
57def process_snapshot(
58    serialized_snapshot: bytes,
59    detokenizer: Optional[pw_tokenizer.Detokenizer] = None,
60    elf_matcher: Optional[ElfMatcher] = None,
61    symbolizer_matcher: Optional[SymbolizerMatcher] = None,
62) -> str:
63    """Processes a single snapshot."""
64
65    output = [_BRANDING]
66
67    captured_metadata = metadata.process_snapshot(
68        serialized_snapshot, detokenizer
69    )
70    if captured_metadata:
71        output.append(captured_metadata)
72
73    # Open a symbolizer.
74    snapshot = snapshot_pb2.Snapshot()
75    snapshot.ParseFromString(serialized_snapshot)
76
77    if symbolizer_matcher is not None:
78        symbolizer = symbolizer_matcher(snapshot)
79    elif elf_matcher is not None:
80        symbolizer = LlvmSymbolizer(elf_matcher(snapshot))
81    else:
82        symbolizer = LlvmSymbolizer()
83
84    cortex_m_cpu_state = pw_cpu_exception_cortex_m.process_snapshot(
85        serialized_snapshot, symbolizer
86    )
87    if cortex_m_cpu_state:
88        output.append(cortex_m_cpu_state)
89
90    thread_info = thread_analyzer.process_snapshot(
91        serialized_snapshot, detokenizer, symbolizer
92    )
93
94    if thread_info:
95        output.append(thread_info)
96
97    timestamp_info = timestamp_analyzer.process_snapshot(serialized_snapshot)
98
99    if timestamp_info:
100        output.append(timestamp_info)
101
102    # Check and emit the number of related snapshots embedded in this snapshot.
103    if snapshot.related_snapshots:
104        snapshot_count = len(snapshot.related_snapshots)
105        plural = 's' if snapshot_count > 1 else ''
106        output.append(
107            f'This snapshot contains {snapshot_count} related snapshot{plural}'
108        )
109        output.append('')
110
111    return '\n'.join(output)
112
113
114def process_snapshots(
115    serialized_snapshot: bytes,
116    detokenizer: Optional[pw_tokenizer.Detokenizer] = None,
117    elf_matcher: Optional[ElfMatcher] = None,
118    user_processing_callback: Optional[Callable[[bytes], str]] = None,
119    symbolizer_matcher: Optional[SymbolizerMatcher] = None,
120) -> str:
121    """Processes a snapshot that may have multiple embedded snapshots."""
122    output = []
123    # Process the top-level snapshot.
124    output.append(
125        process_snapshot(
126            serialized_snapshot, detokenizer, elf_matcher, symbolizer_matcher
127        )
128    )
129
130    # If the user provided a custom processing callback, call it on each
131    # snapshot.
132    if user_processing_callback is not None:
133        output.append(user_processing_callback(serialized_snapshot))
134
135    # Process any related snapshots that were embedded in this one.
136    snapshot = snapshot_pb2.Snapshot()
137    snapshot.ParseFromString(serialized_snapshot)
138    for nested_snapshot in snapshot.related_snapshots:
139        output.append('\n[' + '=' * 78 + ']\n')
140        output.append(
141            str(
142                process_snapshots(
143                    nested_snapshot.SerializeToString(),
144                    detokenizer,
145                    elf_matcher,
146                    user_processing_callback,
147                    symbolizer_matcher,
148                )
149            )
150        )
151
152    return '\n'.join(output)
153
154
155def _snapshot_symbolizer_matcher(
156    artifacts_dir: Path, snapshot: snapshot_pb2.Snapshot
157) -> LlvmSymbolizer:
158    matching_elf: Optional[Path] = pw_build_info.build_id.find_matching_elf(
159        snapshot.metadata.software_build_uuid, artifacts_dir
160    )
161    if not matching_elf:
162        _LOG.error(
163            'Error: No matching ELF found for GNU build ID %s.',
164            snapshot.metadata.software_build_uuid.hex(),
165        )
166    return LlvmSymbolizer(matching_elf)
167
168
169def _load_and_dump_snapshots(
170    in_file: BinaryIO,
171    out_file: TextIO,
172    token_db: Optional[TextIO],
173    artifacts_dir: Optional[Path],
174):
175    detokenizer = None
176    if token_db:
177        detokenizer = pw_tokenizer.Detokenizer(token_db)
178    symbolizer_matcher: Optional[SymbolizerMatcher] = None
179    if artifacts_dir:
180        symbolizer_matcher = functools.partial(
181            _snapshot_symbolizer_matcher, artifacts_dir
182        )
183    out_file.write(
184        process_snapshots(
185            serialized_snapshot=in_file.read(),
186            detokenizer=detokenizer,
187            symbolizer_matcher=symbolizer_matcher,
188        )
189    )
190
191
192def _parse_args():
193    parser = argparse.ArgumentParser(description='Decode Pigweed snapshots')
194    parser.add_argument(
195        'in_file', type=argparse.FileType('rb'), help='Binary snapshot file'
196    )
197    parser.add_argument(
198        '--out-file',
199        '-o',
200        default='-',
201        type=argparse.FileType('wb'),
202        help='File to output decoded snapshots to. Defaults to stdout.',
203    )
204    parser.add_argument(
205        '--token-db',
206        type=argparse.FileType('r'),
207        help='Token database or ELF file to use for detokenization.',
208    )
209    parser.add_argument(
210        '--artifacts-dir',
211        type=Path,
212        help=(
213            'Directory to recursively search for matching ELF files to use '
214            'for symbolization.'
215        ),
216    )
217    return parser.parse_args()
218
219
220if __name__ == '__main__':
221    logging.basicConfig(format='%(message)s', level=logging.INFO)
222    _load_and_dump_snapshots(**vars(_parse_args()))
223    sys.exit(0)
224