• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2020 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import dataclasses as dc
16from urllib.parse import urlparse
17from typing import List, Optional
18
19from perfetto.trace_processor.http import TraceProcessorHttp
20from perfetto.trace_processor.platform import PlatformDelegate
21from perfetto.trace_processor.protos import ProtoFactory
22from perfetto.trace_processor.shell import load_shell
23from perfetto.trace_uri_resolver import registry
24from perfetto.trace_uri_resolver.registry import ResolverRegistry
25
26# Defining this field as a module variable means this can be changed by
27# implementations at startup and used for all TraceProcessor objects
28# without having to specify on each one.
29# In Google3, this field is rewritten using Copybara to a implementation
30# which can integrates with internal infra.
31PLATFORM_DELEGATE = PlatformDelegate
32
33TraceReference = registry.TraceReference
34
35# Custom exception raised if any trace_processor functions return a
36# response with an error defined
37class TraceProcessorException(Exception):
38
39  def __init__(self, message):
40    super().__init__(message)
41
42
43@dc.dataclass
44class TraceProcessorConfig:
45  bin_path: Optional[str]
46  unique_port: bool
47  verbose: bool
48  ingest_ftrace_in_raw: bool
49  resolver_registry: Optional[ResolverRegistry]
50
51  def __init__(self,
52               bin_path: Optional[str] = None,
53               unique_port: bool = True,
54               verbose: bool = False,
55               ingest_ftrace_in_raw: bool = False,
56               resolver_registry: Optional[ResolverRegistry] = None):
57    self.bin_path = bin_path
58    self.unique_port = unique_port
59    self.verbose = verbose
60    self.ingest_ftrace_in_raw = ingest_ftrace_in_raw
61    self.resolver_registry = resolver_registry
62
63
64class TraceProcessor:
65
66  # Values of these constants correspond to the QueryResponse message at
67  # protos/perfetto/trace_processor/trace_processor.proto
68  QUERY_CELL_INVALID_FIELD_ID = 0
69  QUERY_CELL_NULL_FIELD_ID = 1
70  QUERY_CELL_VARINT_FIELD_ID = 2
71  QUERY_CELL_FLOAT64_FIELD_ID = 3
72  QUERY_CELL_STRING_FIELD_ID = 4
73  QUERY_CELL_BLOB_FIELD_ID = 5
74
75  # This is the class returned to the user and contains one row of the
76  # resultant query. Each column name is stored as an attribute of this
77  # class, with the value corresponding to the column name and row in
78  # the query results table.
79  class Row(object):
80
81    def __str__(self):
82      return str(self.__dict__)
83
84    def __repr__(self):
85      return self.__dict__
86
87  class QueryResultIterator:
88
89    def __init__(self, column_names, batches):
90      self.__column_names = column_names
91      self.__column_count = 0
92      self.__count = 0
93      self.__cells = []
94      self.__data_lists = [[], [], [], [], [], []]
95      self.__data_lists_index = [0, 0, 0, 0, 0, 0]
96      self.__current_index = 0
97
98      # Iterate over all the batches and collect their
99      # contents into lists based on the type of the batch
100      batch_index = 0
101      while True:
102        # It's possible on some occasions that there are non UTF-8 characters
103        # in the string_cells field. If this is the case, string_cells is
104        # a bytestring which needs to be decoded (but passing ignore so that
105        # we don't fail in decoding).
106        strings_batch_str = batches[batch_index].string_cells
107        try:
108          strings_batch_str = strings_batch_str.decode('utf-8', 'ignore')
109        except AttributeError:
110          # AttributeError can occur when |strings_batch_str| is an str which
111          # happens when everything in it is UTF-8 (protobuf automatically
112          # does the conversion if it can).
113          pass
114
115        # Null-terminated strings in a batch are concatenated
116        # into a single large byte array, so we split on the
117        # null-terminator to get the individual strings
118        strings_batch = strings_batch_str.split('\0')[:-1]
119        self.__data_lists[TraceProcessor.QUERY_CELL_STRING_FIELD_ID].extend(
120            strings_batch)
121        self.__data_lists[TraceProcessor.QUERY_CELL_VARINT_FIELD_ID].extend(
122            batches[batch_index].varint_cells)
123        self.__data_lists[TraceProcessor.QUERY_CELL_FLOAT64_FIELD_ID].extend(
124            batches[batch_index].float64_cells)
125        self.__data_lists[TraceProcessor.QUERY_CELL_BLOB_FIELD_ID].extend(
126            batches[batch_index].blob_cells)
127        self.__cells.extend(batches[batch_index].cells)
128
129        if batches[batch_index].is_last_batch:
130          break
131        batch_index += 1
132
133      # If there are no rows in the query result, don't bother updating the
134      # counts to avoid dealing with / 0 errors.
135      if len(self.__cells) == 0:
136        return
137
138      # The count we collected so far was a count of all individual columns
139      # in the query result, so we divide by the number of columns in a row
140      # to get the number of rows
141      self.__column_count = len(self.__column_names)
142      self.__count = int(len(self.__cells) / self.__column_count)
143
144      # Data integrity check - see that we have the expected amount of cells
145      # for the number of rows that we need to return
146      if len(self.__cells) % self.__column_count != 0:
147        raise TraceProcessorException("Cell count " + str(len(self.__cells)) +
148                                      " is not a multiple of column count " +
149                                      str(len(self.__column_names)))
150
151    # To use the query result as a populated Pandas dataframe, this
152    # function must be called directly after calling query inside
153    # TraceProcesor.
154    def as_pandas_dataframe(self):
155      try:
156        import pandas as pd
157
158        # Populate the dataframe with the query results
159        rows = []
160        for i in range(0, self.__count):
161          row = []
162          base_cell_index = i * self.__column_count
163          for num in range(len(self.__column_names)):
164            col_type = self.__cells[base_cell_index + num]
165            if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
166              raise TraceProcessorException('Invalid cell type')
167
168            if col_type == TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
169              row.append(None)
170            else:
171              col_index = self.__data_lists_index[col_type]
172              self.__data_lists_index[col_type] += 1
173              row.append(self.__data_lists[col_type][col_index])
174          rows.append(row)
175
176        df = pd.DataFrame(rows, columns=self.__column_names)
177        return df.astype(object).where(df.notnull(),
178                                       None).reset_index(drop=True)
179
180      except ModuleNotFoundError:
181        raise TraceProcessorException(
182            'Python dependencies missing. Please pip3 install pandas numpy')
183
184    def __len__(self):
185      return self.__count
186
187    def __iter__(self):
188      return self
189
190    def __next__(self):
191      if self.__current_index == self.__count:
192        raise StopIteration
193      result = TraceProcessor.Row()
194      base_cell_index = self.__current_index * self.__column_count
195      for num, column_name in enumerate(self.__column_names):
196        col_type = self.__cells[base_cell_index + num]
197        if col_type == TraceProcessor.QUERY_CELL_INVALID_FIELD_ID:
198          raise TraceProcessorException('Invalid cell type')
199        if col_type != TraceProcessor.QUERY_CELL_NULL_FIELD_ID:
200          col_index = self.__data_lists_index[col_type]
201          self.__data_lists_index[col_type] += 1
202          setattr(result, column_name, self.__data_lists[col_type][col_index])
203        else:
204          setattr(result, column_name, None)
205
206      self.__current_index += 1
207      return result
208
209  def __init__(self,
210               trace: Optional[TraceReference] = None,
211               addr: Optional[str] = None,
212               config: TraceProcessorConfig = TraceProcessorConfig(),
213               file_path: Optional[str] = None):
214    """Create a trace processor instance.
215
216    Args:
217      trace: reference to a trace to be loaded into the trace
218        processor instance.
219
220        One of several types is supported:
221        1) path to a trace file to open and read
222        2) a file like object (file, io.BytesIO or similar) to read
223        3) a generator yielding bytes
224        4) a trace URI which resolves to one of the above types
225        5) a trace URI resolver; this is a subclass of
226        resolver.TraceUriResolver which generates a reference to a
227        trace when the |resolve| method is called on it.
228
229        An URI is similar to a connection string (e.g. for a web
230        address or SQL database) which specifies where to lookup traces
231        and which traces to pick from this data source. The format of a
232        string should be as follows:
233        resolver_name:key_1=list,of,values;key_2=value
234
235        Custom resolvers can be provided to handle URIs via
236        |config.resolver_registry|.
237      addr: address of a running trace processor instance. Useful to query an
238        already loaded trace.
239      config: configuration options which customize functionality of trace
240        processor and the Python binding.
241      file_path (deprecated): path to a trace file to load. Use
242        |trace| instead of this field: specifying both will cause
243        an exception to be thrown.
244    """
245
246    if trace and file_path:
247      raise TraceProcessorException(
248          "trace and file_path cannot both be specified.")
249
250    self.config = config
251    self.platform_delegate = PLATFORM_DELEGATE()
252    self.protos = ProtoFactory(self.platform_delegate)
253    self.resolver_registry = config.resolver_registry or \
254      self.platform_delegate.default_resolver_registry()
255    self.http = self._create_tp_http(addr)
256
257    if trace or file_path:
258      self._parse_trace(trace if trace else file_path)
259
260  def query(self, sql: str):
261    """Executes passed in SQL query using class defined HTTP API, and returns
262    the response as a QueryResultIterator. Raises TraceProcessorException if
263    the response returns with an error.
264
265    Args:
266      sql: SQL query written as a String
267
268    Returns:
269      A class which can iterate through each row of the results table. This
270      can also be converted to a pandas dataframe by calling the
271      as_pandas_dataframe() function after calling query.
272    """
273    response = self.http.execute_query(sql)
274    if response.error:
275      raise TraceProcessorException(response.error)
276
277    return TraceProcessor.QueryResultIterator(response.column_names,
278                                              response.batch)
279
280  def metric(self, metrics: List[str]):
281    """Returns the metrics data corresponding to the passed in trace metric.
282    Raises TraceProcessorException if the response returns with an error.
283
284    Args:
285      metrics: A list of valid metrics as defined in TraceMetrics
286
287    Returns:
288      The metrics data as a proto message
289    """
290    response = self.http.compute_metric(metrics)
291    if response.error:
292      raise TraceProcessorException(response.error)
293
294    metrics = self.protos.TraceMetrics()
295    metrics.ParseFromString(response.metrics)
296    return metrics
297
298  def enable_metatrace(self):
299    """Enable metatrace for the currently running trace_processor.
300    """
301    return self.http.enable_metatrace()
302
303  def disable_and_read_metatrace(self):
304    """Disable and return the metatrace formed from the currently running
305    trace_processor. This must be enabled before attempting to disable. This
306    returns the serialized bytes of the metatrace data directly. Raises
307    TraceProcessorException if the response returns with an error.
308    """
309    response = self.http.disable_and_read_metatrace()
310    if response.error:
311      raise TraceProcessorException(response.error)
312
313    return response.metatrace
314
315  def _create_tp_http(self, addr: str) -> TraceProcessorHttp:
316    if addr:
317      p = urlparse(addr)
318      parsed = p.netloc if p.netloc else p.path
319      return TraceProcessorHttp(parsed, protos=self.protos)
320
321    url, self.subprocess = load_shell(self.config.bin_path,
322                                      self.config.unique_port,
323                                      self.config.verbose,
324                                      self.config.ingest_ftrace_in_raw,
325                                      self.platform_delegate)
326    return TraceProcessorHttp(url, protos=self.protos)
327
328  def _parse_trace(self, trace: TraceReference):
329    resolved_lst = self.resolver_registry.resolve(trace)
330    if not resolved_lst:
331      raise TraceProcessorException(
332          'trace argument did not resolve to a trace.')
333
334    if len(resolved_lst) > 1:
335      raise TraceProcessorException(
336          'trace argument resolved to more than one trace. Trace processor '
337          'only supports loading a single trace; please use '
338          'BatchTraceProcessor to operate on multiple traces.')
339
340    resolved = resolved_lst[0]
341    for chunk in resolved.generator:
342      result = self.http.parse(chunk)
343      if result.error:
344        raise TraceProcessorException(
345            f'Failed while parsing trace. Error message: {result.error}')
346    self.http.notify_eof()
347
348  def __enter__(self):
349    return self
350
351  def __exit__(self, a, b, c):
352    del a, b, c  # Unused.
353    self.close()
354    return False
355
356  def close(self):
357    if hasattr(self, 'subprocess'):
358      self.subprocess.kill()
359    self.http.conn.close()
360