• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright (C) 2021 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#      http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Contains classes for BatchTraceProcessor API."""
16import abc
17import concurrent.futures as cf
18import dataclasses as dc
19import time
20from enum import Enum
21import multiprocessing
22from typing import Any, Callable, Dict, Tuple, List, Optional
23
24import pandas as pd
25
26from perfetto.batch_trace_processor.platform import PlatformDelegate
27from perfetto.trace_processor.api import PLATFORM_DELEGATE as TP_PLATFORM_DELEGATE
28from perfetto.trace_processor.api import TraceProcessor
29from perfetto.trace_processor.api import TraceProcessorException
30from perfetto.trace_processor.api import TraceProcessorConfig
31from perfetto.trace_uri_resolver import registry
32from perfetto.trace_uri_resolver.registry import ResolverRegistry
33
34# Defining this field as a module variable means this can be changed by
35# implementations at startup and used for all BatchTraceProcessor objects
36# without having to specify on each one.
37# In Google3, this field is rewritten using Copybara to a implementation
38# which can integrates with internal infra.
39PLATFORM_DELEGATE = PlatformDelegate
40
41TraceListReference = registry.TraceListReference
42Metadata = Dict[str, str]
43
44
45# Enum encoding how errors while loading/querying traces in BatchTraceProcessor
46# should be handled.
47class FailureHandling(Enum):
48  # If any trace fails to load or be queried, raises an exception causing the
49  # entire batch trace processor to fail.
50  # This is the default behaviour and the method which should be preferred for
51  # any interactive use of BatchTraceProcessor.
52  RAISE_EXCEPTION = 0
53
54  # If a trace fails to load or be queried, the trace processor for that trace
55  # is dropped but loading of other traces is unaffected. A failure integer is
56  # incremented in the Stats class for the batch trace processor instance.
57  INCREMENT_STAT = 1
58
59
60@dc.dataclass
61class BatchTraceProcessorConfig:
62  tp_config: TraceProcessorConfig
63  load_failure_handling: FailureHandling
64  query_failure_handling: FailureHandling
65
66  def __init__(
67      self,
68      tp_config: TraceProcessorConfig = TraceProcessorConfig(),
69      load_failure_handling: FailureHandling = FailureHandling.RAISE_EXCEPTION,
70      execute_failure_handling: FailureHandling = FailureHandling
71      .RAISE_EXCEPTION,
72  ):
73    self.tp_config = tp_config
74    self.load_failure_handling = load_failure_handling
75    self.execute_failure_handling = execute_failure_handling
76
77
78# Contains stats about the events which happened during the use of
79# BatchTraceProcessor.
80@dc.dataclass
81class Stats:
82  # The number of traces which failed to load; only non-zero if
83  # FailureHanding.INCREMENT_STAT is chosen as the load failure handling type.
84  load_failures: int = 0
85
86  # The number of traces which failed while executing (query, metric or
87  # arbitary function); only non-zero if FailureHanding.INCREMENT_STAT is
88  # chosen as the execute failure handling type.
89  execute_failures: int = 0
90
91
92class BatchTraceProcessor:
93  """Run ad-hoc SQL queries across many Perfetto traces.
94
95  Usage:
96    with BatchTraceProcessor(traces) as btp:
97      dfs = btp.query('select * from slice')
98      for df in dfs:
99        print(df)
100  """
101
102  class Observer(abc.ABC):
103    """Observer that can be used to provide side-channel information about
104    processed traces.
105    """
106
107    @abc.abstractmethod
108    def trace_processed(self, metadata: Metadata,
109                        execution_time_seconds: float):
110      """Invoked every time query has been executed on a trace.
111
112      Args:
113        metadata: Metadata provided by trace resolver, can be used to identify
114          the trace.
115
116        execution_time_seconds: Query execution time, in seconds.
117      """
118      raise NotImplementedError
119
120  def __init__(self,
121               traces: TraceListReference,
122               config: BatchTraceProcessorConfig = BatchTraceProcessorConfig(),
123               observer: Optional[Observer] = None):
124    """Creates a batch trace processor instance.
125
126    BatchTraceProcessor is the blessed way of running ad-hoc queries in
127    Python across many traces.
128
129    Args:
130      traces: A list of traces, a trace URI resolver or a URI which
131        can be resolved to a list of traces.
132
133        If a list, each of items must be one of the following types:
134        1) path to a trace file to open and read
135        2) a file like object (file, io.BytesIO or similar) to read
136        3) a generator yielding bytes
137        4) an URI which resolves to a trace
138
139        A trace URI resolver is a subclass of resolver.TraceUriResolver
140        which generates a list of trace references when the |resolve|
141        method is called on it.
142
143        A URI is similar to a connection string (e.g. for a web
144        address or SQL database) which specifies where to lookup traces
145        and which traces to pick from this data source. The format of a
146        string should be as follows:
147        resolver_name:key_1=list,of,values;key_2=value
148
149        Custom resolvers can be provided to handle URIs via
150        |config.resolver_registry|.
151      config: configuration options which customize functionality of batch
152        trace processor and underlying trace processors.
153      observer: an optional observer for side-channel information, e.g.
154        running time of queries.
155    """
156
157    self.tps_and_metadata = None
158    self.closed = False
159    self._stats = Stats()
160
161    self.platform_delegate = PLATFORM_DELEGATE()
162    self.tp_platform_delegate = TP_PLATFORM_DELEGATE()
163    self.config = config
164
165    self.observer = observer
166
167    # Make sure the descendent trace processors are using the same resolver
168    # registry (even though they won't actually use it as we will resolve
169    # everything fully in this class).
170    self.resolver_registry = config.tp_config.resolver_registry or \
171      self.tp_platform_delegate.default_resolver_registry()
172    self.config.tp_config.resolver_registry = self.resolver_registry
173
174    # Resolve all the traces to their final form.
175    resolved = self.resolver_registry.resolve(traces)
176
177    # As trace processor is completely CPU bound, it makes sense to just
178    # max out the CPUs available.
179    query_executor = self.platform_delegate.create_query_executor(
180        len(resolved)) or cf.ThreadPoolExecutor(
181            max_workers=multiprocessing.cpu_count())
182    load_exectuor = self.platform_delegate.create_load_executor(
183        len(resolved)) or query_executor
184
185    self.query_executor = query_executor
186    self.tps_and_metadata = [
187        x for x in load_exectuor.map(self._create_tp, resolved) if x is not None
188    ]
189
190  def metric(self, metrics: List[str]):
191    """Computes the provided metrics.
192
193    The computation happens in parallel across all the traces.
194
195    Args:
196      metrics: A list of valid metrics as defined in TraceMetrics
197
198    Returns:
199      A list of TraceMetric protos (one for each trace).
200    """
201    return self.execute(lambda tp: tp.metric(metrics))
202
203  def query(self, sql: str):
204    """Executes the provided SQL statement (returning a single row).
205
206    The execution happens in parallel across all the traces.
207
208    Args:
209      sql: The SQL statement to execute.
210
211    Returns:
212      A list of Pandas dataframes with the result of executing the query (one
213      per trace).
214
215    Raises:
216      TraceProcessorException: An error occurred running the query.
217    """
218    return self.execute(lambda tp: tp.query(sql).as_pandas_dataframe())
219
220  def query_and_flatten(self, sql: str):
221    """Executes the provided SQL statement and flattens the result.
222
223    The execution happens in parallel across all the traces and the
224    resulting Pandas dataframes are flattened into a single dataframe.
225
226    Args:
227      sql: The SQL statement to execute.
228
229    Returns:
230      A concatenated Pandas dataframe containing the result of executing the
231      query across all the traces.
232
233      If an URI or a trace resolver was passed to the constructor, the
234      contents of the |metadata| dictionary emitted by the resolver will also
235      be emitted as extra columns (key being column name, value being the
236      value in the dataframe).
237
238      For example:
239        class CustomResolver(TraceUriResolver):
240          def resolve(self):
241            return [TraceUriResolver.Result(trace='/tmp/path',
242                                            metadata={
243                                              'path': '/tmp/path'
244                                              'foo': 'bar'
245                                            })]
246
247        with BatchTraceProcessor(CustomResolver()) as btp:
248          df = btp.query_and_flatten('select count(1) as cnt from slice')
249
250      Then df will look like this:
251        cnt       path              foo
252        100       /tmp/path         bar
253
254    Raises:
255      TraceProcessorException: An error occurred running the query.
256    """
257    return self.execute_and_flatten(lambda tp: tp.query(sql).
258                                    as_pandas_dataframe())
259
260  def query_single_result(self, sql: str):
261    """Executes the provided SQL statement (returning a single row).
262
263    The execution happens in parallel across all the traces.
264
265    Args:
266      sql: The SQL statement to execute. This statement should return exactly
267        one row on any trace.
268
269    Returns:
270      A list of values with the result of executing the query (one per trace).
271
272    Raises:
273      TraceProcessorException: An error occurred running the query or more than
274        one result was returned.
275    """
276
277    def query_single_result_inner(tp):
278      df = tp.query(sql).as_pandas_dataframe()
279      if len(df.index) != 1:
280        raise TraceProcessorException("Query should only return a single row")
281
282      if len(df.columns) != 1:
283        raise TraceProcessorException(
284            "Query should only return a single column")
285
286      return df.iloc[0, 0]
287
288    return self.execute(query_single_result_inner)
289
290  def execute(self, fn: Callable[[TraceProcessor], Any]) -> List[Any]:
291    """Executes the provided function.
292
293    The execution happens in parallel across all the trace processor instances
294    owned by this object.
295
296    Args:
297      fn: The function to execute.
298
299    Returns:
300      A list of values with the result of executing the fucntion (one per
301      trace).
302    """
303
304    def wrapped(pair: Tuple[TraceProcessor, Metadata]):
305      (tp, metadata) = pair
306      return self._execute_handling_failure(fn, tp, metadata)
307
308    return list(self.query_executor.map(wrapped, self.tps_and_metadata))
309
310  def execute_and_flatten(self, fn: Callable[[TraceProcessor], pd.DataFrame]
311                         ) -> pd.DataFrame:
312    """Executes the provided function and flattens the result.
313
314    The execution happens in parallel across all the trace processor
315    instances owned by this object and the returned Pandas dataframes are
316    flattened into a single dataframe.
317
318    Args:
319      fn: The function to execute which returns a Pandas dataframe.
320
321    Returns:
322      A Pandas dataframe containing the result of executing the query across all
323      the traces. Extra columns containing the file path and args will
324      be added to the dataframe (see |query_and_flatten| for details).
325    """
326
327    def wrapped(pair: Tuple[TraceProcessor, Metadata]):
328      (tp, metadata) = pair
329      start = time.time()
330      df = self._execute_handling_failure(fn, tp, metadata)
331      end = time.time()
332      if self.observer:
333        self.observer.trace_processed(metadata, end - start)
334      for key, value in metadata.items():
335        df[key] = value
336      return df
337
338    df = pd.concat(
339        list(self.query_executor.map(wrapped, self.tps_and_metadata)))
340    return df.reset_index(drop=True)
341
342  def close(self):
343    """Closes this batch trace processor instance.
344
345    This closes all spawned trace processor instances, releasing all the memory
346    and resources those instances take.
347
348    No further calls to other methods in this class should be made after
349    calling this method.
350    """
351    if self.closed:
352      return
353    self.closed = True
354
355    if self.tps_and_metadata:
356      for tp, _ in self.tps_and_metadata:
357        tp.close()
358
359  def stats(self):
360    """Statistics about the operation of this batch trace processor instance.
361
362    See |Stats| class definition for the list of the statistics available."""
363    return self._stats
364
365  def _create_tp(self, trace: ResolverRegistry.Result
366                ) -> Optional[Tuple[TraceProcessor, Metadata]]:
367    try:
368      return TraceProcessor(
369          trace=trace.generator, config=self.config.tp_config), trace.metadata
370    except TraceProcessorException as ex:
371      if self.config.load_failure_handling == FailureHandling.RAISE_EXCEPTION:
372        raise ex
373      self._stats.load_failures += 1
374      return None
375
376  def _execute_handling_failure(self, fn: Callable[[TraceProcessor], Any],
377                                tp: TraceProcessor, metadata: Metadata):
378    try:
379      return fn(tp)
380    except TraceProcessorException as ex:
381      if self.config.execute_failure_handling == \
382          FailureHandling.RAISE_EXCEPTION:
383        raise TraceProcessorException(f'{metadata} {ex}') from None
384      self._stats.execute_failures += 1
385      return pd.DataFrame()
386
387  def __enter__(self):
388    return self
389
390  def __exit__(self, a, b, c):
391    del a, b, c  # Unused.
392    self.close()
393    return False
394
395  def __del__(self):
396    self.close()
397