• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses as dc
16from urllib.parse import urlparse
17from typing import List, Optional
18
19from perfetto.trace_processor.http import TraceProcessorHttp
20from perfetto.trace_processor.platform import PlatformDelegate
21from perfetto.trace_processor.protos import ProtoFactory
22from perfetto.trace_processor.shell import load_shell
23from perfetto.trace_uri_resolver import registry
24from perfetto.trace_uri_resolver.registry import ResolverRegistry
25
26# Defining this field as a module variable means this can be changed by
27# implementations at startup and used for all TraceProcessor objects
28# without having to specify on each one.
29# In Google3, this field is rewritten using Copybara to a implementation
30# which can integrates with internal infra.
31PLATFORM_DELEGATE = PlatformDelegate
32
33TraceReference = registry.TraceReference
34
35# Custom exception raised if any trace_processor functions return a
36# response with an error defined
37class TraceProcessorException(Exception):
38
39  def __init__(self, message):
40    super().__init__(message)
41
42
43@dc.dataclass
44class TraceProcessorConfig:
45  bin_path: Optional[str]
46  unique_port: bool
47  verbose: bool
48  ingest_ftrace_in_raw: bool
49  enable_dev_features: bool
50  resolver_registry: Optional[ResolverRegistry]
51
52  def __init__(self,
53               bin_path: Optional[str] = None,
54               unique_port: bool = True,
55               verbose: bool = False,
56               ingest_ftrace_in_raw: bool = False,
57               enable_dev_features=False,
58               resolver_registry: Optional[ResolverRegistry] = None):
59    self.bin_path = bin_path
60    self.unique_port = unique_port
61    self.verbose = verbose
62    self.ingest_ftrace_in_raw = ingest_ftrace_in_raw
63    self.enable_dev_features = enable_dev_features
64    self.resolver_registry = resolver_registry
65
66
67class TraceProcessor:
68
69  # Values of these constants correspond to the QueryResponse message at
70  # protos/perfetto/trace_processor/trace_processor.proto
71  QUERY_CELL_INVALID_FIELD_ID = 0
72  QUERY_CELL_NULL_FIELD_ID = 1
73  QUERY_CELL_VARINT_FIELD_ID = 2
74  QUERY_CELL_FLOAT64_FIELD_ID = 3
75  QUERY_CELL_STRING_FIELD_ID = 4
76  QUERY_CELL_BLOB_FIELD_ID = 5
77
78  # This is the class returned to the user and contains one row of the
79  # resultant query. Each column name is stored as an attribute of this
80  # class, with the value corresponding to the column name and row in
81  # the query results table.
82  class Row(object):
83    # Required for pytype to correctly infer attributes from Row objects
84    _HAS_DYNAMIC_ATTRIBUTES = True
85
86    def __str__(self):
87      return str(self.__dict__)
88
89    def __repr__(self):
90      return self.__dict__
91
92  class QueryResultIterator:
93
94    def __init__(self, column_names, batches):
95      self.__column_names = list(column_names)
96      self.__column_count = 0
97      self.__count = 0
98      self.__cells = []
99      self.__data_lists = [[], [], [], [], [], []]
100      self.__data_lists_index = [0, 0, 0, 0, 0, 0]
101      self.__current_index = 0
102
103      # Iterate over all the batches and collect their
104      # contents into lists based on the type of the batch
105      batch_index = 0
106      while True:
107        # It's possible on some occasions that there are non UTF-8 characters
108        # in the string_cells field. If this is the case, string_cells is
109        # a bytestring which needs to be decoded (but passing ignore so that
110        # we don't fail in decoding).
111        strings_batch_str = batches[batch_index].string_cells
112        try:
113          strings_batch_str = strings_batch_str.decode('utf-8', 'ignore')
114        except AttributeError:
115          # AttributeError can occur when |strings_batch_str| is an str which
116          # happens when everything in it is UTF-8 (protobuf automatically
117          # does the conversion if it can).
118          pass
119
120        # Null-terminated strings in a batch are concatenated
121        # into a single large byte array, so we split on the
122        # null-terminator to get the individual strings
123        strings_batch = strings_batch_str.split('\0')[:-1]
124        self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend(
125            strings_batch)
126        self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend(
127            batches[batch_index].varint_cells)
128        self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend(
129            batches[batch_index].float64_cells)
130        self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend(
131            batches[batch_index].blob_cells)
132        self.__cells.extend(batches[batch_index].cells)
133
134        if batches[batch_index].is_last_batch:
135          break
136        batch_index += 1
137
138      # If there are no rows in the query result, don't bother updating the
139      # counts to avoid dealing with / 0 errors.
140      if len(self.__cells) == 0:
141        return
142
143      # The count we collected so far was a count of all individual columns
144      # in the query result, so we divide by the number of columns in a row
145      # to get the number of rows
146      self.__column_count = len(self.__column_names)
147      self.__count = int(len(self.__cells) / self.__column_count)
148
149      # Data integrity check - see that we have the expected amount of cells
150      # for the number of rows that we need to return
151      if len(self.__cells) % self.__column_count != 0:
152        raise TraceProcessorException("Cell count " + str(len(self.__cells)) +
153                                      " is not a multiple of column count " +
154                                      str(len(self.__column_names)))
155
156    # To use the query result as a populated Pandas dataframe, this
157    # function must be called directly after calling query inside
158    # TraceProcesor.
159    def as_pandas_dataframe(self):
160      try:
161        import pandas as pd
162
163        # Populate the dataframe with the query results
164        rows = []
165        for i in range(0, self.__count):
166          row = []
167          base_cell_index = i * self.__column_count
168          for num in range(len(self.__column_names)):
169            col_type = self.__cells[base_cell_index + num]
170            if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
171              raise TraceProcessorException('Invalid cell type')
172
173            if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
174              row.append(None)
175            else:
176              col_index = self.__data_lists_index[col_type]
177              self.__data_lists_index[col_type] += 1
178              row.append(self.__data_lists[col_type][col_index])
179          rows.append(row)
180
181        df = pd.DataFrame(rows, columns=self.__column_names)
182        return df.astype(object).where(df.notnull(),
183                                       None).reset_index(drop=True)
184
185      except ModuleNotFoundError:
186        raise TraceProcessorException(
187            'Python dependencies missing. Please pip3 install pandas numpy')
188
189    def __len__(self):
190      return self.__count
191
192    def __iter__(self):
193      return self
194
195    def __next__(self):
196      if self.__current_index == self.__count:
197        raise StopIteration
198      result = TraceProcessor.Row()
199      base_cell_index = self.__current_index * self.__column_count
200      for num, column_name in enumerate(self.__column_names):
201        col_type = self.__cells[base_cell_index + num]
202        if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
203          raise TraceProcessorException('Invalid cell type')
204        if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
205          col_index = self.__data_lists_index[col_type]
206          self.__data_lists_index[col_type] += 1
207          setattr(result, column_name, self.__data_lists[col_type][col_index])
208        else:
209          setattr(result, column_name, None)
210
211      self.__current_index += 1
212      return result
213
214  def __init__(self,
215               trace: Optional[TraceReference] = None,
216               addr: Optional[str] = None,
217               config: TraceProcessorConfig = TraceProcessorConfig(),
218               file_path: Optional[str] = None):
219    """Create a trace processor instance.
220
221    Args:
222      trace: reference to a trace to be loaded into the trace
223        processor instance.
224
225        One of several types is supported:
226        1) path to a trace file to open and read
227        2) a file like object (file, io.BytesIO or similar) to read
228        3) a generator yielding bytes
229        4) a trace URI which resolves to one of the above types
230        5) a trace URI resolver; this is a subclass of
231        resolver.TraceUriResolver which generates a reference to a
232        trace when the |resolve| method is called on it.
233
234        An URI is similar to a connection string (e.g. for a web
235        address or SQL database) which specifies where to lookup traces
236        and which traces to pick from this data source. The format of a
237        string should be as follows:
238        resolver_name:key_1=list,of,values;key_2=value
239
240        Custom resolvers can be provided to handle URIs via
241        |config.resolver_registry|.
242      addr: address of a running trace processor instance. Useful to query an
243        already loaded trace.
244      config: configuration options which customize functionality of trace
245        processor and the Python binding.
246      file_path (deprecated): path to a trace file to load. Use
247        |trace| instead of this field: specifying both will cause
248        an exception to be thrown.
249    """
250
251    if trace and file_path:
252      raise TraceProcessorException(
253          "trace and file_path cannot both be specified.")
254
255    self.config = config
256    self.platform_delegate = PLATFORM_DELEGATE()
257    self.protos = ProtoFactory(self.platform_delegate)
258    self.resolver_registry = config.resolver_registry or \
259      self.platform_delegate.default_resolver_registry()
260    self.http = self._create_tp_http(addr)
261
262    if trace or file_path:
263      try:
264        self._parse_trace(trace if trace else file_path)
265      except TraceProcessorException as ex:
266        self.close()
267        raise ex
268
269  def query(self, sql: str):
270    """Executes passed in SQL query using class defined HTTP API, and returns
271    the response as a QueryResultIterator. Raises TraceProcessorException if
272    the response returns with an error.
273
274    Args:
275      sql: SQL query written as a String
276
277    Returns:
278      A class which can iterate through each row of the results table. This
279      can also be converted to a pandas dataframe by calling the
280      as_pandas_dataframe() function after calling query.
281    """
282    response = self.http.execute_query(sql)
283    if response.error:
284      raise TraceProcessorException(response.error)
285
286    return TraceProcessor.QueryResultIterator(response.column_names,
287                                              response.batch)
288
289  def metric(self, metrics: List[str]):
290    """Returns the metrics data corresponding to the passed in trace metric.
291    Raises TraceProcessorException if the response returns with an error.
292
293    Args:
294      metrics: A list of valid metrics as defined in TraceMetrics
295
296    Returns:
297      The metrics data as a proto message
298    """
299    response = self.http.compute_metric(metrics)
300    if response.error:
301      raise TraceProcessorException(response.error)
302
303    metrics = self.protos.TraceMetrics()
304    metrics.ParseFromString(response.metrics)
305    return metrics
306
307  def enable_metatrace(self):
308    """Enable metatrace for the currently running trace_processor.
309    """
310    return self.http.enable_metatrace()
311
312  def disable_and_read_metatrace(self):
313    """Disable and return the metatrace formed from the currently running
314    trace_processor. This must be enabled before attempting to disable. This
315    returns the serialized bytes of the metatrace data directly. Raises
316    TraceProcessorException if the response returns with an error.
317    """
318    response = self.http.disable_and_read_metatrace()
319    if response.error:
320      raise TraceProcessorException(response.error)
321
322    return response.metatrace
323
324  def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
325    if addr:
326      p = urlparse(addr)
327      parsed = p.netloc if p.netloc else p.path
328      return TraceProcessorHttp(parsed, protos=self.protos)
329
330    url, self.subprocess = load_shell(self.config.bin_path,
331                                      self.config.unique_port,
332                                      self.config.verbose,
333                                      self.config.ingest_ftrace_in_raw,
334                                      self.config.enable_dev_features,
335                                      self.platform_delegate)
336    return TraceProcessorHttp(url, protos=self.protos)
337
338  def _parse_trace(self, trace: TraceReference):
339    resolved_lst = self.resolver_registry.resolve(trace)
340    if not resolved_lst:
341      raise TraceProcessorException(
342          'trace argument did not resolve to a trace.')
343
344    if len(resolved_lst) > 1:
345      raise TraceProcessorException(
346          'trace argument resolved to more than one trace. Trace processor '
347          'only supports loading a single trace; please use '
348          'BatchTraceProcessor to operate on multiple traces.')
349
350    resolved = resolved_lst[0]
351    for chunk in resolved.generator:
352      result = self.http.parse(chunk)
353      if result.error:
354        raise TraceProcessorException(
355            f'Failed while parsing trace. Error message: {result.error}')
356    self.http.notify_eof()
357
358  def __enter__(self):
359    return self
360
361  def __exit__(self, a, b, c):
362    del a, b, c  # Unused.
363    self.close()
364    return False
365
366  def close(self):
367    if hasattr(self, 'subprocess'):
368      self.subprocess.kill()
369      self.subprocess.wait()
370
371    if hasattr(self, 'http'):
372      self.http.conn.close()
373
374  def __del__(self):
375    self.close()
376