1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Device tracing classes to interact with targets via RPC.""" 15 16import os 17import logging 18import tempfile 19 20# from pathlib import Path 21# from types import ModuleType 22# from typing import Callable, List 23 24import pw_transfer 25from pw_file import file_pb2 26from pw_rpc.callback_client.errors import RpcError 27from pw_system.device import Device 28from pw_trace import trace 29from pw_trace_tokenized import trace_tokenized 30 31_LOG = logging.getLogger('tracing') 32DEFAULT_TICKS_PER_SECOND = 1000 33 34 35class DeviceWithTracing(Device): 36 """Represents an RPC Client for a device running a Pigweed target with 37 tracing. 38 39 The target must have RPC support for the following services: 40 - logging 41 - file 42 - transfer 43 - tracing 44 Note: use this class as a base for specialized device representations. 45 """ 46 47 def __init__( 48 self, 49 *device_args, 50 ticks_per_second: int | None = None, 51 time_offset: int = 0, 52 **device_kwargs, 53 ): 54 super().__init__(*device_args, **device_kwargs) 55 56 # Create the transfer manager 57 self.transfer_service = self.rpcs.pw.transfer.Transfer 58 self.transfer_manager = pw_transfer.Manager( 59 self.transfer_service, 60 default_response_timeout_s=self.rpc_timeout_s, 61 initial_response_timeout_s=self.rpc_timeout_s, 62 default_protocol_version=pw_transfer.ProtocolVersion.LATEST, 63 ) 64 self.time_offset = time_offset 65 66 if ticks_per_second: 67 self.ticks_per_second = ticks_per_second 68 else: 69 self.ticks_per_second = self.get_ticks_per_second() 70 _LOG.info('ticks_per_second set to %i', self.ticks_per_second) 71 72 def get_ticks_per_second(self) -> int: 73 trace_service = self.rpcs.pw.trace.proto.TraceService 74 try: 75 resp = trace_service.GetClockParameters() 76 if not resp.status.ok(): 77 _LOG.error( 78 'Failed to get clock parameters: %s. Using default value', 79 resp.status, 80 ) 81 return DEFAULT_TICKS_PER_SECOND 82 except RpcError as rpc_err: 83 _LOG.exception('%s. Using default value', rpc_err) 84 return DEFAULT_TICKS_PER_SECOND 85 86 return resp.response.clock_parameters.tick_period_seconds_denominator 87 88 def list_files(self) -> list: 89 """Lists all files on this device.""" 90 fs_service = self.rpcs.pw.file.FileSystem 91 stream_response = fs_service.List() 92 93 if not stream_response.status.ok(): 94 _LOG.error('Failed to list files %s', stream_response.status) 95 return [] 96 97 return stream_response.responses 98 99 def delete_file(self, path: str) -> bool: 100 """Delete a file on this device.""" 101 fs_service = self.rpcs.pw.file.FileSystem 102 req = file_pb2.DeleteRequest(path=path) 103 stream_response = fs_service.Delete(req) 104 if not stream_response.status.ok(): 105 _LOG.error( 106 'Failed to delete file %s file: %s', 107 path, 108 stream_response.status, 109 ) 110 return False 111 112 return True 113 114 def transfer_file(self, file_id: int, dest_path: str) -> bool: 115 """Transfer a file on this device to the host.""" 116 try: 117 data = self.transfer_manager.read(file_id) 118 with open(dest_path, "wb") as bin_file: 119 bin_file.write(data) 120 except pw_transfer.Error: 121 _LOG.exception('Failed to transfer file_id %i', file_id) 122 return False 123 124 return True 125 126 def start_tracing(self) -> None: 127 """Turns on tracing on this device.""" 128 trace_service = self.rpcs.pw.trace.proto.TraceService 129 trace_service.Start() 130 131 def stop_tracing(self, trace_output_path: str = "trace.json") -> None: 132 """Turns off tracing on this device and downloads the trace file.""" 133 trace_service = self.rpcs.pw.trace.proto.TraceService 134 resp = trace_service.Stop() 135 136 # If there's no tokenizer, there's no need to transfer the trace 137 # file from the device after stopping tracing, as there's not much 138 # that can be done with it. 139 if not self.detokenizer: 140 _LOG.error('No tokenizer specified. Not transfering trace') 141 return 142 143 trace_bin_path = tempfile.NamedTemporaryFile(delete=False) 144 trace_bin_path.close() 145 try: 146 if not self.transfer_file( 147 resp.response.file_id, trace_bin_path.name 148 ): 149 return 150 151 with open(trace_bin_path.name, 'rb') as bin_file: 152 trace_data = bin_file.read() 153 events = trace_tokenized.get_trace_events( 154 [self.detokenizer.database], 155 trace_data, 156 self.ticks_per_second, 157 self.time_offset, 158 ) 159 json_lines = trace.generate_trace_json(events) 160 trace_tokenized.save_trace_file(json_lines, trace_output_path) 161 162 _LOG.info( 163 'Wrote trace file %s', 164 trace_output_path, 165 ) 166 finally: 167 os.remove(trace_bin_path.name) 168