1#!/usr/bin/env python3 2# Copyright (C) 2021 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Contains classes for BatchTraceProcessor API.""" 16import abc 17import concurrent.futures as cf 18import dataclasses as dc 19import time 20from enum import Enum 21import multiprocessing 22from typing import Any, Callable, Dict, Tuple, List, Optional 23 24import pandas as pd 25 26from perfetto.batch_trace_processor.platform import PlatformDelegate 27from perfetto.trace_processor.api import PLATFORM_DELEGATE as TP_PLATFORM_DELEGATE 28from perfetto.trace_processor.api import TraceProcessor 29from perfetto.trace_processor.api import TraceProcessorException 30from perfetto.trace_processor.api import TraceProcessorConfig 31from perfetto.trace_uri_resolver import registry 32from perfetto.trace_uri_resolver.registry import ResolverRegistry 33 34# Defining this field as a module variable means this can be changed by 35# implementations at startup and used for all BatchTraceProcessor objects 36# without having to specify on each one. 37# In Google3, this field is rewritten using Copybara to a implementation 38# which can integrates with internal infra. 39PLATFORM_DELEGATE = PlatformDelegate 40 41TraceListReference = registry.TraceListReference 42Metadata = Dict[str, str] 43 44 45# Enum encoding how errors while loading/querying traces in BatchTraceProcessor 46# should be handled. 47class FailureHandling(Enum): 48 # If any trace fails to load or be queried, raises an exception causing the 49 # entire batch trace processor to fail. 50 # This is the default behaviour and the method which should be preferred for 51 # any interactive use of BatchTraceProcessor. 52 RAISE_EXCEPTION = 0 53 54 # If a trace fails to load or be queried, the trace processor for that trace 55 # is dropped but loading of other traces is unaffected. A failure integer is 56 # incremented in the Stats class for the batch trace processor instance. 57 INCREMENT_STAT = 1 58 59 60@dc.dataclass 61class BatchTraceProcessorConfig: 62 tp_config: TraceProcessorConfig 63 load_failure_handling: FailureHandling 64 query_failure_handling: FailureHandling 65 66 def __init__( 67 self, 68 tp_config: TraceProcessorConfig = TraceProcessorConfig(), 69 load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION, 70 execute_failure_handling: FailureHandling = FailureHandling 71 .RAISE_EXCEPTION, 72 ): 73 self.tp_config = tp_config 74 self.load_failure_handling = load_failure_handling 75 self.execute_failure_handling = execute_failure_handling 76 77 78# Contains stats about the events which happened during the use of 79# BatchTraceProcessor. 80@dc.dataclass 81class Stats: 82 # The number of traces which failed to load; only non-zero if 83 # FailureHanding.INCREMENT_STAT is chosen as the load failure handling type. 84 load_failures: int = 0 85 86 # The number of traces which failed while executing (query, metric or 87 # arbitary function); only non-zero if FailureHanding.INCREMENT_STAT is 88 # chosen as the execute failure handling type. 89 execute_failures: int = 0 90 91 92class BatchTraceProcessor: 93 """Run ad-hoc SQL queries across many Perfetto traces. 94 95 Usage: 96 with BatchTraceProcessor(traces) as btp: 97 dfs = btp.query('select * from slice') 98 for df in dfs: 99 print(df) 100 """ 101 102 class Observer(abc.ABC): 103 """Observer that can be used to provide side-channel information about 104 processed traces. 105 """ 106 107 @abc.abstractmethod 108 def trace_processed(self, metadata: Metadata, 109 execution_time_seconds: float): 110 """Invoked every time query has been executed on a trace. 111 112 Args: 113 metadata: Metadata provided by trace resolver, can be used to identify 114 the trace. 115 116 execution_time_seconds: Query execution time, in seconds. 117 """ 118 raise NotImplementedError 119 120 def __init__(self, 121 traces: TraceListReference, 122 config: BatchTraceProcessorConfig = BatchTraceProcessorConfig(), 123 observer: Optional[Observer] = None): 124 """Creates a batch trace processor instance. 125 126 BatchTraceProcessor is the blessed way of running ad-hoc queries in 127 Python across many traces. 128 129 Args: 130 traces: A list of traces, a trace URI resolver or a URI which 131 can be resolved to a list of traces. 132 133 If a list, each of items must be one of the following types: 134 1) path to a trace file to open and read 135 2) a file like object (file, io.BytesIO or similar) to read 136 3) a generator yielding bytes 137 4) an URI which resolves to a trace 138 139 A trace URI resolver is a subclass of resolver.TraceUriResolver 140 which generates a list of trace references when the |resolve| 141 method is called on it. 142 143 A URI is similar to a connection string (e.g. for a web 144 address or SQL database) which specifies where to lookup traces 145 and which traces to pick from this data source. The format of a 146 string should be as follows: 147 resolver_name:key_1=list,of,values;key_2=value 148 149 Custom resolvers can be provided to handle URIs via 150 |config.resolver_registry|. 151 config: configuration options which customize functionality of batch 152 trace processor and underlying trace processors. 153 observer: an optional observer for side-channel information, e.g. 154 running time of queries. 155 """ 156 157 self.tps_and_metadata = None 158 self.closed = False 159 self._stats = Stats() 160 161 self.platform_delegate = PLATFORM_DELEGATE() 162 self.tp_platform_delegate = TP_PLATFORM_DELEGATE() 163 self.config = config 164 165 self.observer = observer 166 167 # Make sure the descendent trace processors are using the same resolver 168 # registry (even though they won't actually use it as we will resolve 169 # everything fully in this class). 170 self.resolver_registry = config.tp_config.resolver_registry or \ 171 self.tp_platform_delegate.default_resolver_registry() 172 self.config.tp_config.resolver_registry = self.resolver_registry 173 174 # Resolve all the traces to their final form. 175 resolved = self.resolver_registry.resolve(traces) 176 177 # As trace processor is completely CPU bound, it makes sense to just 178 # max out the CPUs available. 179 query_executor = self.platform_delegate.create_query_executor( 180 len(resolved)) or cf.ThreadPoolExecutor( 181 max_workers=multiprocessing.cpu_count()) 182 load_exectuor = self.platform_delegate.create_load_executor( 183 len(resolved)) or query_executor 184 185 self.query_executor = query_executor 186 self.tps_and_metadata = [ 187 x for x in load_exectuor.map(self._create_tp, resolved) if x is not None 188 ] 189 190 def metric(self, metrics: List[str]): 191 """Computes the provided metrics. 192 193 The computation happens in parallel across all the traces. 194 195 Args: 196 metrics: A list of valid metrics as defined in TraceMetrics 197 198 Returns: 199 A list of TraceMetric protos (one for each trace). 200 """ 201 return self.execute(lambda tp: tp.metric(metrics)) 202 203 def query(self, sql: str): 204 """Executes the provided SQL statement (returning a single row). 205 206 The execution happens in parallel across all the traces. 207 208 Args: 209 sql: The SQL statement to execute. 210 211 Returns: 212 A list of Pandas dataframes with the result of executing the query (one 213 per trace). 214 215 Raises: 216 TraceProcessorException: An error occurred running the query. 217 """ 218 return self.execute(lambda tp: tp.query(sql).as_pandas_dataframe()) 219 220 def query_and_flatten(self, sql: str): 221 """Executes the provided SQL statement and flattens the result. 222 223 The execution happens in parallel across all the traces and the 224 resulting Pandas dataframes are flattened into a single dataframe. 225 226 Args: 227 sql: The SQL statement to execute. 228 229 Returns: 230 A concatenated Pandas dataframe containing the result of executing the 231 query across all the traces. 232 233 If an URI or a trace resolver was passed to the constructor, the 234 contents of the |metadata| dictionary emitted by the resolver will also 235 be emitted as extra columns (key being column name, value being the 236 value in the dataframe). 237 238 For example: 239 class CustomResolver(TraceUriResolver): 240 def resolve(self): 241 return [TraceUriResolver.Result(trace='/tmp/path', 242 metadata={ 243 'path': '/tmp/path' 244 'foo': 'bar' 245 })] 246 247 with BatchTraceProcessor(CustomResolver()) as btp: 248 df = btp.query_and_flatten('select count(1) as cnt from slice') 249 250 Then df will look like this: 251 cnt path foo 252 100 /tmp/path bar 253 254 Raises: 255 TraceProcessorException: An error occurred running the query. 256 """ 257 return self.execute_and_flatten(lambda tp: tp.query(sql). 258 as_pandas_dataframe()) 259 260 def query_single_result(self, sql: str): 261 """Executes the provided SQL statement (returning a single row). 262 263 The execution happens in parallel across all the traces. 264 265 Args: 266 sql: The SQL statement to execute. This statement should return exactly 267 one row on any trace. 268 269 Returns: 270 A list of values with the result of executing the query (one per trace). 271 272 Raises: 273 TraceProcessorException: An error occurred running the query or more than 274 one result was returned. 275 """ 276 277 def query_single_result_inner(tp): 278 df = tp.query(sql).as_pandas_dataframe() 279 if len(df.index) != 1: 280 raise TraceProcessorException("Query should only return a single row") 281 282 if len(df.columns) != 1: 283 raise TraceProcessorException( 284 "Query should only return a single column") 285 286 return df.iloc[0, 0] 287 288 return self.execute(query_single_result_inner) 289 290 def execute(self, fn: Callable[[TraceProcessor], Any]) -> List[Any]: 291 """Executes the provided function. 292 293 The execution happens in parallel across all the trace processor instances 294 owned by this object. 295 296 Args: 297 fn: The function to execute. 298 299 Returns: 300 A list of values with the result of executing the fucntion (one per 301 trace). 302 """ 303 304 def wrapped(pair: Tuple[TraceProcessor, Metadata]): 305 (tp, metadata) = pair 306 return self._execute_handling_failure(fn, tp, metadata) 307 308 return list(self.query_executor.map(wrapped, self.tps_and_metadata)) 309 310 def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame] 311 ) -> pd.DataFrame: 312 """Executes the provided function and flattens the result. 313 314 The execution happens in parallel across all the trace processor 315 instances owned by this object and the returned Pandas dataframes are 316 flattened into a single dataframe. 317 318 Args: 319 fn: The function to execute which returns a Pandas dataframe. 320 321 Returns: 322 A Pandas dataframe containing the result of executing the query across all 323 the traces. Extra columns containing the file path and args will 324 be added to the dataframe (see |query_and_flatten| for details). 325 """ 326 327 def wrapped(pair: Tuple[TraceProcessor, Metadata]): 328 (tp, metadata) = pair 329 start = time.time() 330 df = self._execute_handling_failure(fn, tp, metadata) 331 end = time.time() 332 if self.observer: 333 self.observer.trace_processed(metadata, end - start) 334 for key, value in metadata.items(): 335 df[key] = value 336 return df 337 338 df = pd.concat( 339 list(self.query_executor.map(wrapped, self.tps_and_metadata))) 340 return df.reset_index(drop=True) 341 342 def close(self): 343 """Closes this batch trace processor instance. 344 345 This closes all spawned trace processor instances, releasing all the memory 346 and resources those instances take. 347 348 No further calls to other methods in this class should be made after 349 calling this method. 350 """ 351 if self.closed: 352 return 353 self.closed = True 354 355 if self.tps_and_metadata: 356 for tp, _ in self.tps_and_metadata: 357 tp.close() 358 359 def stats(self): 360 """Statistics about the operation of this batch trace processor instance. 361 362 See |Stats| class definition for the list of the statistics available.""" 363 return self._stats 364 365 def _create_tp(self, trace: ResolverRegistry.Result 366 ) -> Optional[Tuple[TraceProcessor, Metadata]]: 367 try: 368 return TraceProcessor( 369 trace=trace.generator, config=self.config.tp_config), trace.metadata 370 except TraceProcessorException as ex: 371 if self.config.load_failure_handling == FailureHandling.RAISE_EXCEPTION: 372 raise ex 373 self._stats.load_failures += 1 374 return None 375 376 def _execute_handling_failure(self, fn: Callable[[TraceProcessor], Any], 377 tp: TraceProcessor, metadata: Metadata): 378 try: 379 return fn(tp) 380 except TraceProcessorException as ex: 381 if self.config.execute_failure_handling == \ 382 FailureHandling.RAISE_EXCEPTION: 383 raise TraceProcessorException(f'{metadata} {ex}') from None 384 self._stats.execute_failures += 1 385 return pd.DataFrame() 386 387 def __enter__(self): 388 return self 389 390 def __exit__(self, a, b, c): 391 del a, b, c # Unused. 392 self.close() 393 return False 394 395 def __del__(self): 396 self.close() 397