1# Copyright (C) 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import dataclasses as dc 16from urllib.parse import urlparse 17from typing import List, Optional 18 19from perfetto.trace_processor.http import TraceProcessorHttp 20from perfetto.trace_processor.platform import PlatformDelegate 21from perfetto.trace_processor.protos import ProtoFactory 22from perfetto.trace_processor.shell import load_shell 23from perfetto.trace_uri_resolver import registry 24from perfetto.trace_uri_resolver.registry import ResolverRegistry 25 26# Defining this field as a module variable means this can be changed by 27# implementations at startup and used for all TraceProcessor objects 28# without having to specify on each one. 29# In Google3, this field is rewritten using Copybara to a implementation 30# which can integrates with internal infra. 31PLATFORM_DELEGATE = PlatformDelegate 32 33TraceReference = registry.TraceReference 34 35# Custom exception raised if any trace_processor functions return a 36# response with an error defined 37class TraceProcessorException(Exception): 38 39 def __init__(self, message): 40 super().__init__(message) 41 42 43@dc.dataclass 44class TraceProcessorConfig: 45 bin_path: Optional[str] 46 unique_port: bool 47 verbose: bool 48 ingest_ftrace_in_raw: bool 49 resolver_registry: Optional[ResolverRegistry] 50 51 def __init__(self, 52 bin_path: Optional[str] = None, 53 unique_port: bool = True, 54 verbose: bool = False, 55 ingest_ftrace_in_raw: bool = False, 56 resolver_registry: Optional[ResolverRegistry] = None): 57 self.bin_path = bin_path 58 self.unique_port = unique_port 59 self.verbose = verbose 60 self.ingest_ftrace_in_raw = ingest_ftrace_in_raw 61 self.resolver_registry = resolver_registry 62 63 64class TraceProcessor: 65 66 # Values of these constants correspond to the QueryResponse message at 67 # protos/perfetto/trace_processor/trace_processor.proto 68 QUERY_CELL_INVALID_FIELD_ID = 0 69 QUERY_CELL_NULL_FIELD_ID = 1 70 QUERY_CELL_VARINT_FIELD_ID = 2 71 QUERY_CELL_FLOAT64_FIELD_ID = 3 72 QUERY_CELL_STRING_FIELD_ID = 4 73 QUERY_CELL_BLOB_FIELD_ID = 5 74 75 # This is the class returned to the user and contains one row of the 76 # resultant query. Each column name is stored as an attribute of this 77 # class, with the value corresponding to the column name and row in 78 # the query results table. 79 class Row(object): 80 81 def __str__(self): 82 return str(self.__dict__) 83 84 def __repr__(self): 85 return self.__dict__ 86 87 class QueryResultIterator: 88 89 def __init__(self, column_names, batches): 90 self.__column_names = column_names 91 self.__column_count = 0 92 self.__count = 0 93 self.__cells = [] 94 self.__data_lists = [[], [], [], [], [], []] 95 self.__data_lists_index = [0, 0, 0, 0, 0, 0] 96 self.__current_index = 0 97 98 # Iterate over all the batches and collect their 99 # contents into lists based on the type of the batch 100 batch_index = 0 101 while True: 102 # It's possible on some occasions that there are non UTF-8 characters 103 # in the string_cells field. If this is the case, string_cells is 104 # a bytestring which needs to be decoded (but passing ignore so that 105 # we don't fail in decoding). 106 strings_batch_str = batches[batch_index].string_cells 107 try: 108 strings_batch_str = strings_batch_str.decode('utf-8', 'ignore') 109 except AttributeError: 110 # AttributeError can occur when |strings_batch_str| is an str which 111 # happens when everything in it is UTF-8 (protobuf automatically 112 # does the conversion if it can). 113 pass 114 115 # Null-terminated strings in a batch are concatenated 116 # into a single large byte array, so we split on the 117 # null-terminator to get the individual strings 118 strings_batch = strings_batch_str.split('\0')[:-1] 119 self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend( 120 strings_batch) 121 self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend( 122 batches[batch_index].varint_cells) 123 self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend( 124 batches[batch_index].float64_cells) 125 self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend( 126 batches[batch_index].blob_cells) 127 self.__cells.extend(batches[batch_index].cells) 128 129 if batches[batch_index].is_last_batch: 130 break 131 batch_index += 1 132 133 # If there are no rows in the query result, don't bother updating the 134 # counts to avoid dealing with / 0 errors. 135 if len(self.__cells) == 0: 136 return 137 138 # The count we collected so far was a count of all individual columns 139 # in the query result, so we divide by the number of columns in a row 140 # to get the number of rows 141 self.__column_count = len(self.__column_names) 142 self.__count = int(len(self.__cells) / self.__column_count) 143 144 # Data integrity check - see that we have the expected amount of cells 145 # for the number of rows that we need to return 146 if len(self.__cells) % self.__column_count != 0: 147 raise TraceProcessorException("Cell count " + str(len(self.__cells)) + 148 " is not a multiple of column count " + 149 str(len(self.__column_names))) 150 151 # To use the query result as a populated Pandas dataframe, this 152 # function must be called directly after calling query inside 153 # TraceProcesor. 154 def as_pandas_dataframe(self): 155 try: 156 import pandas as pd 157 158 # Populate the dataframe with the query results 159 rows = [] 160 for i in range(0, self.__count): 161 row = [] 162 base_cell_index = i * self.__column_count 163 for num in range(len(self.__column_names)): 164 col_type = self.__cells[base_cell_index + num] 165 if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: 166 raise TraceProcessorException('Invalid cell type') 167 168 if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID: 169 row.append(None) 170 else: 171 col_index = self.__data_lists_index[col_type] 172 self.__data_lists_index[col_type] += 1 173 row.append(self.__data_lists[col_type][col_index]) 174 rows.append(row) 175 176 df = pd.DataFrame(rows, columns=self.__column_names) 177 return df.astype(object).where(df.notnull(), 178 None).reset_index(drop=True) 179 180 except ModuleNotFoundError: 181 raise TraceProcessorException( 182 'Python dependencies missing. Please pip3 install pandas numpy') 183 184 def __len__(self): 185 return self.__count 186 187 def __iter__(self): 188 return self 189 190 def __next__(self): 191 if self.__current_index == self.__count: 192 raise StopIteration 193 result = TraceProcessor.Row() 194 base_cell_index = self.__current_index * self.__column_count 195 for num, column_name in enumerate(self.__column_names): 196 col_type = self.__cells[base_cell_index + num] 197 if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: 198 raise TraceProcessorException('Invalid cell type') 199 if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID: 200 col_index = self.__data_lists_index[col_type] 201 self.__data_lists_index[col_type] += 1 202 setattr(result, column_name, self.__data_lists[col_type][col_index]) 203 else: 204 setattr(result, column_name, None) 205 206 self.__current_index += 1 207 return result 208 209 def __init__(self, 210 trace: Optional[TraceReference] = None, 211 addr: Optional[str] = None, 212 config: TraceProcessorConfig = TraceProcessorConfig(), 213 file_path: Optional[str] = None): 214 """Create a trace processor instance. 215 216 Args: 217 trace: reference to a trace to be loaded into the trace 218 processor instance. 219 220 One of several types is supported: 221 1) path to a trace file to open and read 222 2) a file like object (file, io.BytesIO or similar) to read 223 3) a generator yielding bytes 224 4) a trace URI which resolves to one of the above types 225 5) a trace URI resolver; this is a subclass of 226 resolver.TraceUriResolver which generates a reference to a 227 trace when the |resolve| method is called on it. 228 229 An URI is similar to a connection string (e.g. for a web 230 address or SQL database) which specifies where to lookup traces 231 and which traces to pick from this data source. The format of a 232 string should be as follows: 233 resolver_name:key_1=list,of,values;key_2=value 234 235 Custom resolvers can be provided to handle URIs via 236 |config.resolver_registry|. 237 addr: address of a running trace processor instance. Useful to query an 238 already loaded trace. 239 config: configuration options which customize functionality of trace 240 processor and the Python binding. 241 file_path (deprecated): path to a trace file to load. Use 242 |trace| instead of this field: specifying both will cause 243 an exception to be thrown. 244 """ 245 246 if trace and file_path: 247 raise TraceProcessorException( 248 "trace and file_path cannot both be specified.") 249 250 self.config = config 251 self.platform_delegate = PLATFORM_DELEGATE() 252 self.protos = ProtoFactory(self.platform_delegate) 253 self.resolver_registry = config.resolver_registry or \ 254 self.platform_delegate.default_resolver_registry() 255 self.http = self._create_tp_http(addr) 256 257 if trace or file_path: 258 self._parse_trace(trace if trace else file_path) 259 260 def query(self, sql: str): 261 """Executes passed in SQL query using class defined HTTP API, and returns 262 the response as a QueryResultIterator. Raises TraceProcessorException if 263 the response returns with an error. 264 265 Args: 266 sql: SQL query written as a String 267 268 Returns: 269 A class which can iterate through each row of the results table. This 270 can also be converted to a pandas dataframe by calling the 271 as_pandas_dataframe() function after calling query. 272 """ 273 response = self.http.execute_query(sql) 274 if response.error: 275 raise TraceProcessorException(response.error) 276 277 return TraceProcessor.QueryResultIterator(response.column_names, 278 response.batch) 279 280 def metric(self, metrics: List[str]): 281 """Returns the metrics data corresponding to the passed in trace metric. 282 Raises TraceProcessorException if the response returns with an error. 283 284 Args: 285 metrics: A list of valid metrics as defined in TraceMetrics 286 287 Returns: 288 The metrics data as a proto message 289 """ 290 response = self.http.compute_metric(metrics) 291 if response.error: 292 raise TraceProcessorException(response.error) 293 294 metrics = self.protos.TraceMetrics() 295 metrics.ParseFromString(response.metrics) 296 return metrics 297 298 def enable_metatrace(self): 299 """Enable metatrace for the currently running trace_processor. 300 """ 301 return self.http.enable_metatrace() 302 303 def disable_and_read_metatrace(self): 304 """Disable and return the metatrace formed from the currently running 305 trace_processor. This must be enabled before attempting to disable. This 306 returns the serialized bytes of the metatrace data directly. Raises 307 TraceProcessorException if the response returns with an error. 308 """ 309 response = self.http.disable_and_read_metatrace() 310 if response.error: 311 raise TraceProcessorException(response.error) 312 313 return response.metatrace 314 315 def _create_tp_http(self, addr: str) -> TraceProcessorHttp: 316 if addr: 317 p = urlparse(addr) 318 parsed = p.netloc if p.netloc else p.path 319 return TraceProcessorHttp(parsed, protos=self.protos) 320 321 url, self.subprocess = load_shell(self.config.bin_path, 322 self.config.unique_port, 323 self.config.verbose, 324 self.config.ingest_ftrace_in_raw, 325 self.platform_delegate) 326 return TraceProcessorHttp(url, protos=self.protos) 327 328 def _parse_trace(self, trace: TraceReference): 329 resolved_lst = self.resolver_registry.resolve(trace) 330 if not resolved_lst: 331 raise TraceProcessorException( 332 'trace argument did not resolve to a trace.') 333 334 if len(resolved_lst) > 1: 335 raise TraceProcessorException( 336 'trace argument resolved to more than one trace. Trace processor ' 337 'only supports loading a single trace; please use ' 338 'BatchTraceProcessor to operate on multiple traces.') 339 340 resolved = resolved_lst[0] 341 for chunk in resolved.generator: 342 result = self.http.parse(chunk) 343 if result.error: 344 raise TraceProcessorException( 345 f'Failed while parsing trace. Error message: {result.error}') 346 self.http.notify_eof() 347 348 def __enter__(self): 349 return self 350 351 def __exit__(self, a, b, c): 352 del a, b, c # Unused. 353 self.close() 354 return False 355 356 def close(self): 357 if hasattr(self, 'subprocess'): 358 self.subprocess.kill() 359 self.http.conn.close() 360