1# Copyright (C) 2020 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import dataclasses as dc 16from urllib.parse import urlparse 17from typing import List, Optional 18 19from perfetto.trace_processor.http import TraceProcessorHttp 20from perfetto.trace_processor.platform import PlatformDelegate 21from perfetto.trace_processor.protos import ProtoFactory 22from perfetto.trace_processor.shell import load_shell 23from perfetto.trace_uri_resolver import registry 24from perfetto.trace_uri_resolver.registry import ResolverRegistry 25 26# Defining this field as a module variable means this can be changed by 27# implementations at startup and used for all TraceProcessor objects 28# without having to specify on each one. 29# In Google3, this field is rewritten using Copybara to a implementation 30# which can integrates with internal infra. 31PLATFORM_DELEGATE = PlatformDelegate 32 33TraceReference = registry.TraceReference 34 35# Custom exception raised if any trace_processor functions return a 36# response with an error defined 37class TraceProcessorException(Exception): 38 39 def __init__(self, message): 40 super().__init__(message) 41 42 43@dc.dataclass 44class TraceProcessorConfig: 45 bin_path: Optional[str] 46 unique_port: bool 47 verbose: bool 48 ingest_ftrace_in_raw: bool 49 enable_dev_features: bool 50 resolver_registry: Optional[ResolverRegistry] 51 52 def __init__(self, 53 bin_path: Optional[str] = None, 54 unique_port: bool = True, 55 verbose: bool = False, 56 ingest_ftrace_in_raw: bool = False, 57 enable_dev_features=False, 58 resolver_registry: Optional[ResolverRegistry] = None): 59 self.bin_path = bin_path 60 self.unique_port = unique_port 61 self.verbose = verbose 62 self.ingest_ftrace_in_raw = ingest_ftrace_in_raw 63 self.enable_dev_features = enable_dev_features 64 self.resolver_registry = resolver_registry 65 66 67class TraceProcessor: 68 69 # Values of these constants correspond to the QueryResponse message at 70 # protos/perfetto/trace_processor/trace_processor.proto 71 QUERY_CELL_INVALID_FIELD_ID = 0 72 QUERY_CELL_NULL_FIELD_ID = 1 73 QUERY_CELL_VARINT_FIELD_ID = 2 74 QUERY_CELL_FLOAT64_FIELD_ID = 3 75 QUERY_CELL_STRING_FIELD_ID = 4 76 QUERY_CELL_BLOB_FIELD_ID = 5 77 78 # This is the class returned to the user and contains one row of the 79 # resultant query. Each column name is stored as an attribute of this 80 # class, with the value corresponding to the column name and row in 81 # the query results table. 82 class Row(object): 83 # Required for pytype to correctly infer attributes from Row objects 84 _HAS_DYNAMIC_ATTRIBUTES = True 85 86 def __str__(self): 87 return str(self.__dict__) 88 89 def __repr__(self): 90 return self.__dict__ 91 92 class QueryResultIterator: 93 94 def __init__(self, column_names, batches): 95 self.__column_names = list(column_names) 96 self.__column_count = 0 97 self.__count = 0 98 self.__cells = [] 99 self.__data_lists = [[], [], [], [], [], []] 100 self.__data_lists_index = [0, 0, 0, 0, 0, 0] 101 self.__current_index = 0 102 103 # Iterate over all the batches and collect their 104 # contents into lists based on the type of the batch 105 batch_index = 0 106 while True: 107 # It's possible on some occasions that there are non UTF-8 characters 108 # in the string_cells field. If this is the case, string_cells is 109 # a bytestring which needs to be decoded (but passing ignore so that 110 # we don't fail in decoding). 111 strings_batch_str = batches[batch_index].string_cells 112 try: 113 strings_batch_str = strings_batch_str.decode('utf-8', 'ignore') 114 except AttributeError: 115 # AttributeError can occur when |strings_batch_str| is an str which 116 # happens when everything in it is UTF-8 (protobuf automatically 117 # does the conversion if it can). 118 pass 119 120 # Null-terminated strings in a batch are concatenated 121 # into a single large byte array, so we split on the 122 # null-terminator to get the individual strings 123 strings_batch = strings_batch_str.split('\0')[:-1] 124 self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend( 125 strings_batch) 126 self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend( 127 batches[batch_index].varint_cells) 128 self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend( 129 batches[batch_index].float64_cells) 130 self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend( 131 batches[batch_index].blob_cells) 132 self.__cells.extend(batches[batch_index].cells) 133 134 if batches[batch_index].is_last_batch: 135 break 136 batch_index += 1 137 138 # If there are no rows in the query result, don't bother updating the 139 # counts to avoid dealing with / 0 errors. 140 if len(self.__cells) == 0: 141 return 142 143 # The count we collected so far was a count of all individual columns 144 # in the query result, so we divide by the number of columns in a row 145 # to get the number of rows 146 self.__column_count = len(self.__column_names) 147 self.__count = int(len(self.__cells) / self.__column_count) 148 149 # Data integrity check - see that we have the expected amount of cells 150 # for the number of rows that we need to return 151 if len(self.__cells) % self.__column_count != 0: 152 raise TraceProcessorException("Cell count " + str(len(self.__cells)) + 153 " is not a multiple of column count " + 154 str(len(self.__column_names))) 155 156 # To use the query result as a populated Pandas dataframe, this 157 # function must be called directly after calling query inside 158 # TraceProcesor. 159 def as_pandas_dataframe(self): 160 try: 161 import pandas as pd 162 163 # Populate the dataframe with the query results 164 rows = [] 165 for i in range(0, self.__count): 166 row = [] 167 base_cell_index = i * self.__column_count 168 for num in range(len(self.__column_names)): 169 col_type = self.__cells[base_cell_index + num] 170 if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: 171 raise TraceProcessorException('Invalid cell type') 172 173 if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID: 174 row.append(None) 175 else: 176 col_index = self.__data_lists_index[col_type] 177 self.__data_lists_index[col_type] += 1 178 row.append(self.__data_lists[col_type][col_index]) 179 rows.append(row) 180 181 df = pd.DataFrame(rows, columns=self.__column_names) 182 return df.astype(object).where(df.notnull(), 183 None).reset_index(drop=True) 184 185 except ModuleNotFoundError: 186 raise TraceProcessorException( 187 'Python dependencies missing. Please pip3 install pandas numpy') 188 189 def __len__(self): 190 return self.__count 191 192 def __iter__(self): 193 return self 194 195 def __next__(self): 196 if self.__current_index == self.__count: 197 raise StopIteration 198 result = TraceProcessor.Row() 199 base_cell_index = self.__current_index * self.__column_count 200 for num, column_name in enumerate(self.__column_names): 201 col_type = self.__cells[base_cell_index + num] 202 if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID: 203 raise TraceProcessorException('Invalid cell type') 204 if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID: 205 col_index = self.__data_lists_index[col_type] 206 self.__data_lists_index[col_type] += 1 207 setattr(result, column_name, self.__data_lists[col_type][col_index]) 208 else: 209 setattr(result, column_name, None) 210 211 self.__current_index += 1 212 return result 213 214 def __init__(self, 215 trace: Optional[TraceReference] = None, 216 addr: Optional[str] = None, 217 config: TraceProcessorConfig = TraceProcessorConfig(), 218 file_path: Optional[str] = None): 219 """Create a trace processor instance. 220 221 Args: 222 trace: reference to a trace to be loaded into the trace 223 processor instance. 224 225 One of several types is supported: 226 1) path to a trace file to open and read 227 2) a file like object (file, io.BytesIO or similar) to read 228 3) a generator yielding bytes 229 4) a trace URI which resolves to one of the above types 230 5) a trace URI resolver; this is a subclass of 231 resolver.TraceUriResolver which generates a reference to a 232 trace when the |resolve| method is called on it. 233 234 An URI is similar to a connection string (e.g. for a web 235 address or SQL database) which specifies where to lookup traces 236 and which traces to pick from this data source. The format of a 237 string should be as follows: 238 resolver_name:key_1=list,of,values;key_2=value 239 240 Custom resolvers can be provided to handle URIs via 241 |config.resolver_registry|. 242 addr: address of a running trace processor instance. Useful to query an 243 already loaded trace. 244 config: configuration options which customize functionality of trace 245 processor and the Python binding. 246 file_path (deprecated): path to a trace file to load. Use 247 |trace| instead of this field: specifying both will cause 248 an exception to be thrown. 249 """ 250 251 if trace and file_path: 252 raise TraceProcessorException( 253 "trace and file_path cannot both be specified.") 254 255 self.config = config 256 self.platform_delegate = PLATFORM_DELEGATE() 257 self.protos = ProtoFactory(self.platform_delegate) 258 self.resolver_registry = config.resolver_registry or \ 259 self.platform_delegate.default_resolver_registry() 260 self.http = self._create_tp_http(addr) 261 262 if trace or file_path: 263 try: 264 self._parse_trace(trace if trace else file_path) 265 except TraceProcessorException as ex: 266 self.close() 267 raise ex 268 269 def query(self, sql: str): 270 """Executes passed in SQL query using class defined HTTP API, and returns 271 the response as a QueryResultIterator. Raises TraceProcessorException if 272 the response returns with an error. 273 274 Args: 275 sql: SQL query written as a String 276 277 Returns: 278 A class which can iterate through each row of the results table. This 279 can also be converted to a pandas dataframe by calling the 280 as_pandas_dataframe() function after calling query. 281 """ 282 response = self.http.execute_query(sql) 283 if response.error: 284 raise TraceProcessorException(response.error) 285 286 return TraceProcessor.QueryResultIterator(response.column_names, 287 response.batch) 288 289 def metric(self, metrics: List[str]): 290 """Returns the metrics data corresponding to the passed in trace metric. 291 Raises TraceProcessorException if the response returns with an error. 292 293 Args: 294 metrics: A list of valid metrics as defined in TraceMetrics 295 296 Returns: 297 The metrics data as a proto message 298 """ 299 response = self.http.compute_metric(metrics) 300 if response.error: 301 raise TraceProcessorException(response.error) 302 303 metrics = self.protos.TraceMetrics() 304 metrics.ParseFromString(response.metrics) 305 return metrics 306 307 def enable_metatrace(self): 308 """Enable metatrace for the currently running trace_processor. 309 """ 310 return self.http.enable_metatrace() 311 312 def disable_and_read_metatrace(self): 313 """Disable and return the metatrace formed from the currently running 314 trace_processor. This must be enabled before attempting to disable. This 315 returns the serialized bytes of the metatrace data directly. Raises 316 TraceProcessorException if the response returns with an error. 317 """ 318 response = self.http.disable_and_read_metatrace() 319 if response.error: 320 raise TraceProcessorException(response.error) 321 322 return response.metatrace 323 324 def _create_tp_http(self, addr: str) -> TraceProcessorHttp: 325 if addr: 326 p = urlparse(addr) 327 parsed = p.netloc if p.netloc else p.path 328 return TraceProcessorHttp(parsed, protos=self.protos) 329 330 url, self.subprocess = load_shell(self.config.bin_path, 331 self.config.unique_port, 332 self.config.verbose, 333 self.config.ingest_ftrace_in_raw, 334 self.config.enable_dev_features, 335 self.platform_delegate) 336 return TraceProcessorHttp(url, protos=self.protos) 337 338 def _parse_trace(self, trace: TraceReference): 339 resolved_lst = self.resolver_registry.resolve(trace) 340 if not resolved_lst: 341 raise TraceProcessorException( 342 'trace argument did not resolve to a trace.') 343 344 if len(resolved_lst) > 1: 345 raise TraceProcessorException( 346 'trace argument resolved to more than one trace. Trace processor ' 347 'only supports loading a single trace; please use ' 348 'BatchTraceProcessor to operate on multiple traces.') 349 350 resolved = resolved_lst[0] 351 for chunk in resolved.generator: 352 result = self.http.parse(chunk) 353 if result.error: 354 raise TraceProcessorException( 355 f'Failed while parsing trace. Error message: {result.error}') 356 self.http.notify_eof() 357 358 def __enter__(self): 359 return self 360 361 def __exit__(self, a, b, c): 362 del a, b, c # Unused. 363 self.close() 364 return False 365 366 def close(self): 367 if hasattr(self, 'subprocess'): 368 self.subprocess.kill() 369 self.subprocess.wait() 370 371 if hasattr(self, 'http'): 372 self.http.conn.close() 373 374 def __del__(self): 375 self.close() 376