• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import collections
8import json
9import logging
10import zipfile
11from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
12
13import pandas as pd
14from google.protobuf.json_format import MessageToJson
15from perfetto.batch_trace_processor.api import (BatchTraceProcessor,
16                                                BatchTraceProcessorConfig)
17from perfetto.trace_processor.api import TraceProcessor, TraceProcessorConfig
18from perfetto.trace_uri_resolver.path import PathUriResolver
19from perfetto.trace_uri_resolver.registry import ResolverRegistry
20from perfetto.trace_uri_resolver.resolver import TraceUriResolver
21
22from crossbench import path as pth
23from crossbench.parse import PathParser
24from crossbench.probes.metric import MetricsMerger
25from crossbench.probes.probe import Probe, ProbeConfigParser, ProbeContext
26from crossbench.probes.results import LocalProbeResult, ProbeResult
27
28if TYPE_CHECKING:
29  from crossbench.env import HostEnvironment
30  from crossbench.runner.groups.browsers import BrowsersRunGroup
31  from crossbench.runner.run import Run
32  from crossbench.types import JsonDict
33
34_QUERIES_DIR = pth.LocalPath(__file__).parent / "queries"
35_MODULES_DIR = pth.LocalPath(__file__).parent / "modules/ext"
36
37
38class CrossbenchTraceUriResolver(TraceUriResolver):
39  PREFIX = "crossbench"
40
41  def __init__(self, traces: Union[Iterable[Run], TraceProcessorProbeContext]):
42
43    def metadata(run: Run) -> Dict[str, str]:
44      return {
45          "cb_browser": run.browser.unique_name,
46          "cb_story": run.story.name,
47          "cb_temperature": run.temperature,
48          "cb_run": str(run.repetition)
49      }
50
51    if isinstance(traces, TraceProcessorProbeContext):
52      self._resolved = [
53          TraceUriResolver.Result(
54              trace=str(traces.merged_trace_path.absolute()),
55              metadata=metadata(traces.run))
56      ]
57    else:
58      self._resolved = [
59          TraceUriResolver.Result(
60              trace=str(
61                  run.results.get_by_name(
62                      TraceProcessorProbe.NAME).trace.absolute()),
63              metadata=metadata(run)) for run in traces
64      ]
65
66  def resolve(self) -> List["TraceUriResolver.Result"]:
67    return self._resolved
68
69
70class TraceProcessorProbe(Probe):
71  """
72  Trace processor probe.
73  """
74
75  NAME = "trace_processor"
76
77  @classmethod
78  def config_parser(cls) -> ProbeConfigParser:
79    parser = super().config_parser()
80    parser.add_argument(
81        "batch",
82        type=bool,
83        required=False,
84        default=False,
85        help="Run queries in batch mode when all the test runs are done. This "
86        "can considerably reduce the run time at the expense of higher "
87        "memory usage (all traces will be loaded into memory at the same "
88        "time)")
89    parser.add_argument(
90        "metrics",
91        type=str,
92        is_list=True,
93        default=tuple(),
94        help="Name of metric to be run (can be any metric from Perfetto)")
95    parser.add_argument(
96        "queries",
97        type=str,
98        is_list=True,
99        default=tuple(),
100        help="Name of query to be run (under probes/trace_processor/queries)")
101    parser.add_argument(
102        "trace_processor_bin",
103        type=PathParser.local_binary_path,
104        required=False,
105        help="Path to the trace_processor binary")
106    return parser
107
108  def __init__(self,
109               batch: bool,
110               metrics: Iterable[str],
111               queries: Iterable[str],
112               trace_processor_bin: Optional[pth.LocalPath] = None):
113    super().__init__()
114    self._batch = batch
115    self._metrics = tuple(metrics)
116    self._queries = tuple(queries)
117    self._trace_processor_bin: Optional[pth.LocalPath] = None
118    if trace_processor_bin:
119      self._trace_processor_bin = PathParser.local_binary_path(
120          trace_processor_bin, "trace_processor")
121
122  @property
123  def batch(self) -> bool:
124    return self._batch
125
126  @property
127  def metrics(self) -> Tuple[str, ...]:
128    return self._metrics
129
130  @property
131  def queries(self) -> Tuple[str, ...]:
132    return self._queries
133
134  @property
135  def has_work(self) -> bool:
136    return len(self._queries) != 0 or len(self._metrics) != 0
137
138  @property
139  def needs_tp_run(self) -> bool:
140    return (not self.batch) and self.has_work
141
142  @property
143  def needs_btp_run(self) -> bool:
144    return self._batch and self.has_work
145
146  @property
147  def trace_processor_bin(self) -> Optional[pth.LocalPath]:
148    return self._trace_processor_bin
149
150  @property
151  def tp_config(self) -> TraceProcessorConfig:
152    extra_flags = [
153        "--add-sql-module",
154        _MODULES_DIR,
155    ]
156
157    return TraceProcessorConfig(
158        bin_path=self.trace_processor_bin,
159        resolver_registry=ResolverRegistry(
160            resolvers=[CrossbenchTraceUriResolver, PathUriResolver]),
161        extra_flags=extra_flags)
162
163  def get_context(self, run: Run) -> TraceProcessorProbeContext:
164    return TraceProcessorProbeContext(self, run)
165
166  def validate_env(self, env: HostEnvironment) -> None:
167    super().validate_env(env)
168    self._check_sql()
169
170  def _check_sql(self) -> None:
171    """
172    Runs all metrics and queries on an empty trace. This will ensure that they
173    are correctly defined in trace processor.
174    """
175    with TraceProcessor(trace="/dev/null", config=self.tp_config) as tp:
176      for metric in self.metrics:
177        tp.metric([metric])
178      for query in self.queries:
179        query_path = _QUERIES_DIR / f"{query}.sql"
180        tp.query(query_path.read_text())
181
182  def _add_cb_columns(self, df: pd.DataFrame, run: Run) -> pd.DataFrame:
183    df["cb_browser"] = run.browser.unique_name
184    df["cb_story"] = run.story.name
185    df["cb_temperature"] = run.temperature
186    df["cb_run"] = run.repetition
187    return df
188
189  def _aggregate_results_by_query(
190      self, runs: Iterable[Run]) -> Dict[str, pd.DataFrame]:
191    res: Dict[str, pd.DataFrame] = {}
192    for run in runs:
193      for file in run.results.get(self).csv_list:
194        df = pd.read_csv(file)
195        df = self._add_cb_columns(df, run)
196        if file.stem in res:
197          res[file.stem] = pd.concat([res[file.stem], df])
198        else:
199          res[file.stem] = df
200
201    return res
202
203  def _merge_json(self, runs: Iterable[Run]) -> Dict[str, JsonDict]:
204    merged_metrics: Dict[str,
205                         MetricsMerger] = collections.defaultdict(MetricsMerger)
206    for run in runs:
207      for file_path in run.results[self].json_list:
208        with file_path.open() as json_file:
209          merged_metrics[file_path.stem].add(json.load(json_file))
210
211    return {
212        metric_name: merged.to_json()
213        for metric_name, merged in merged_metrics.items()
214    }
215
216  def merge_browsers(self, group: BrowsersRunGroup) -> ProbeResult:
217    if self.needs_btp_run:
218      return self._run_btp(group)
219
220    return self._merge_browser_files(group)
221
222  def _merge_browser_files(self, group: BrowsersRunGroup) -> LocalProbeResult:
223    group_dir = group.get_local_probe_result_path(self)
224    group_dir.mkdir()
225    csv_files = []
226    json_files = []
227    for query, df in self._aggregate_results_by_query(group.runs).items():
228      csv_file = group_dir / f"{pth.safe_filename(query)}.csv"
229      df.to_csv(path_or_buf=csv_file, index=False)
230      csv_files.append(csv_file)
231    for metric, data in self._merge_json(group.runs).items():
232      json_file = group_dir / f"{pth.safe_filename(metric)}.json"
233      with json_file.open("x") as f:
234        json.dump(data, f, indent=4)
235        # TODO(375390958): figure out why files aren't fully written to
236        # pyfakefs here.
237        f.write("\n")
238      json_files.append(json_file)
239    return LocalProbeResult(csv=csv_files, json=json_files)
240
241  def _run_btp(self, group: BrowsersRunGroup) -> LocalProbeResult:
242    group_dir = group.get_local_probe_result_path(self)
243    group_dir.mkdir()
244    btp_config = BatchTraceProcessorConfig(tp_config=self.tp_config)
245
246    with BatchTraceProcessor(
247        traces=CrossbenchTraceUriResolver(group.runs),
248        config=btp_config) as btp:
249
250      def run_query(query: str):
251        query_path = _QUERIES_DIR / f"{query}.sql"
252        csv_file = group_dir / f"{pth.safe_filename(query)}.csv"
253        btp.query_and_flatten(query_path.read_text()).to_csv(
254            path_or_buf=csv_file, index=False)
255        return csv_file
256
257      csv_files = list(map(run_query, self.queries))
258
259      def run_metric(metric: str):
260        json_file = group_dir / f"{pth.safe_filename(metric)}.json"
261        protos = btp.metric([metric])
262        with json_file.open("x") as f:
263          for p in protos:
264            f.write(MessageToJson(p))
265        return json_file
266
267      json_files = list(map(run_metric, self.metrics))
268
269    return LocalProbeResult(csv=csv_files, json=json_files)
270
271  def log_browsers_result(self, group: BrowsersRunGroup) -> None:
272    logging.info("-" * 80)
273    logging.critical("TraceProcessor merged traces:")
274    for run in group.runs:
275      logging.critical("  - %s", run.results[self].trace)
276
277
278class TraceProcessorProbeContext(ProbeContext[TraceProcessorProbe]):
279
280  def __init__(self, probe: TraceProcessorProbe, run: Run) -> None:
281    super().__init__(probe, run)
282
283  def get_default_result_path(self) -> pth.AnyPath:
284    result_dir = super().get_default_result_path()
285    self.host_platform.mkdir(result_dir)
286    return result_dir
287
288  def setup(self) -> None:
289    pass
290
291  def start(self) -> None:
292    pass
293
294  def stop(self) -> None:
295    pass
296
297  def teardown(self) -> ProbeResult:
298    return self._merge_trace_files().merge(self._maybe_run_tp())
299
300  def _merge_trace_files(self) -> LocalProbeResult:
301    with self.run.actions("TRACE_PROCESSOR: Merging trace files", verbose=True):
302      with zipfile.ZipFile(self.merged_trace_path, "w") as zip_file:
303        for f in self.run.results.all_traces():
304          zip_file.write(f, arcname=f.relative_to(self.run.out_dir))
305    return LocalProbeResult(trace=(self.merged_trace_path,))
306
307  def _maybe_run_tp(self):
308    if not self.probe.needs_tp_run:
309      return LocalProbeResult()
310
311    with TraceProcessor(
312        trace=CrossbenchTraceUriResolver(self),
313        config=self.probe.tp_config) as tp:
314      return self._run_queries(tp).merge(self._run_metrics(tp))
315
316  def _run_queries(self, tp: TraceProcessor) -> LocalProbeResult:
317
318    def run_query(query: str):
319      query_path = _QUERIES_DIR / f"{query}.sql"
320      csv_file = self.local_result_path / f"{pth.safe_filename(query)}.csv"
321      tp.query(query_path.read_text()).as_pandas_dataframe().to_csv(
322          path_or_buf=csv_file, index=False)
323      return csv_file
324
325    with self.run.actions("TRACE_PROCESSOR: Running queries", verbose=True):
326      files = tuple(map(run_query, self.probe.queries))
327      return LocalProbeResult(csv=files)
328
329  def _run_metrics(self, tp: TraceProcessor) -> LocalProbeResult:
330
331    def run_metric(metric: str):
332      json_file = self.local_result_path / f"{pth.safe_filename(metric)}.json"
333      proto = tp.metric([metric])
334      with json_file.open("x") as f:
335        f.write(MessageToJson(proto))
336      return json_file
337
338    with self.run.actions("TRACE_PROCESSOR: Running metrics", verbose=True):
339      files = tuple(map(run_metric, self.probe.metrics))
340      return LocalProbeResult(json=files)
341
342  @property
343  def merged_trace_path(self):
344    return self.local_result_path / "merged_trace.zip"
345