• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Device tracing classes to interact with targets via RPC."""
15
16import os
17import logging
18import tempfile
19
20# from pathlib import Path
21# from types import ModuleType
22# from typing import Callable, List
23
24import pw_transfer
25from pw_file import file_pb2
26from pw_rpc.callback_client.errors import RpcError
27from pw_system.device import Device
28from pw_trace import trace
29from pw_trace_tokenized import trace_tokenized
30
31_LOG = logging.getLogger('tracing')
32DEFAULT_TICKS_PER_SECOND = 1000
33
34
35class DeviceWithTracing(Device):
36    """Represents an RPC Client for a device running a Pigweed target with
37    tracing.
38
39    The target must have RPC support for the following services:
40     - logging
41     - file
42     - transfer
43     - tracing
44    Note: use this class as a base for specialized device representations.
45    """
46
47    def __init__(
48        self,
49        *device_args,
50        ticks_per_second: int | None = None,
51        time_offset: int = 0,
52        **device_kwargs,
53    ):
54        super().__init__(*device_args, **device_kwargs)
55
56        # Create the transfer manager
57        self.transfer_service = self.rpcs.pw.transfer.Transfer
58        self.transfer_manager = pw_transfer.Manager(
59            self.transfer_service,
60            default_response_timeout_s=self.rpc_timeout_s,
61            initial_response_timeout_s=self.rpc_timeout_s,
62            default_protocol_version=pw_transfer.ProtocolVersion.LATEST,
63        )
64        self.time_offset = time_offset
65
66        if ticks_per_second:
67            self.ticks_per_second = ticks_per_second
68        else:
69            self.ticks_per_second = self.get_ticks_per_second()
70        _LOG.info('ticks_per_second set to %i', self.ticks_per_second)
71
72    def get_ticks_per_second(self) -> int:
73        trace_service = self.rpcs.pw.trace.proto.TraceService
74        try:
75            resp = trace_service.GetClockParameters()
76            if not resp.status.ok():
77                _LOG.error(
78                    'Failed to get clock parameters: %s. Using default value',
79                    resp.status,
80                )
81                return DEFAULT_TICKS_PER_SECOND
82        except RpcError as rpc_err:
83            _LOG.exception('%s. Using default value', rpc_err)
84            return DEFAULT_TICKS_PER_SECOND
85
86        return resp.response.clock_parameters.tick_period_seconds_denominator
87
88    def list_files(self) -> list:
89        """Lists all files on this device."""
90        fs_service = self.rpcs.pw.file.FileSystem
91        stream_response = fs_service.List()
92
93        if not stream_response.status.ok():
94            _LOG.error('Failed to list files %s', stream_response.status)
95            return []
96
97        return stream_response.responses
98
99    def delete_file(self, path: str) -> bool:
100        """Delete a file on this device."""
101        fs_service = self.rpcs.pw.file.FileSystem
102        req = file_pb2.DeleteRequest(path=path)
103        stream_response = fs_service.Delete(req)
104        if not stream_response.status.ok():
105            _LOG.error(
106                'Failed to delete file %s file: %s',
107                path,
108                stream_response.status,
109            )
110            return False
111
112        return True
113
114    def transfer_file(self, file_id: int, dest_path: str) -> bool:
115        """Transfer a file on this device to the host."""
116        try:
117            data = self.transfer_manager.read(file_id)
118            with open(dest_path, "wb") as bin_file:
119                bin_file.write(data)
120        except pw_transfer.Error:
121            _LOG.exception('Failed to transfer file_id %i', file_id)
122            return False
123
124        return True
125
126    def start_tracing(self) -> None:
127        """Turns on tracing on this device."""
128        trace_service = self.rpcs.pw.trace.proto.TraceService
129        trace_service.Start()
130
131    def stop_tracing(self, trace_output_path: str = "trace.json") -> None:
132        """Turns off tracing on this device and downloads the trace file."""
133        trace_service = self.rpcs.pw.trace.proto.TraceService
134        resp = trace_service.Stop()
135
136        # If there's no tokenizer, there's no need to transfer the trace
137        # file from the device after stopping tracing, as there's not much
138        # that can be done with it.
139        if not self.detokenizer:
140            _LOG.error('No tokenizer specified. Not transfering trace')
141            return
142
143        trace_bin_path = tempfile.NamedTemporaryFile(delete=False)
144        trace_bin_path.close()
145        try:
146            if not self.transfer_file(
147                resp.response.file_id, trace_bin_path.name
148            ):
149                return
150
151            with open(trace_bin_path.name, 'rb') as bin_file:
152                trace_data = bin_file.read()
153                events = trace_tokenized.get_trace_events(
154                    [self.detokenizer.database],
155                    trace_data,
156                    self.ticks_per_second,
157                    self.time_offset,
158                )
159                json_lines = trace.generate_trace_json(events)
160                trace_tokenized.save_trace_file(json_lines, trace_output_path)
161
162            _LOG.info(
163                'Wrote trace file %s',
164                trace_output_path,
165            )
166        finally:
167            os.remove(trace_bin_path.name)
168