1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import collections 8import json 9import logging 10import zipfile 11from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union 12 13import pandas as pd 14from google.protobuf.json_format import MessageToJson 15from perfetto.batch_trace_processor.api import (BatchTraceProcessor, 16 BatchTraceProcessorConfig) 17from perfetto.trace_processor.api import TraceProcessor, TraceProcessorConfig 18from perfetto.trace_uri_resolver.path import PathUriResolver 19from perfetto.trace_uri_resolver.registry import ResolverRegistry 20from perfetto.trace_uri_resolver.resolver import TraceUriResolver 21 22from crossbench import path as pth 23from crossbench.parse import PathParser 24from crossbench.probes.metric import MetricsMerger 25from crossbench.probes.probe import Probe, ProbeConfigParser, ProbeContext 26from crossbench.probes.results import LocalProbeResult, ProbeResult 27 28if TYPE_CHECKING: 29 from crossbench.env import HostEnvironment 30 from crossbench.runner.groups.browsers import BrowsersRunGroup 31 from crossbench.runner.run import Run 32 from crossbench.types import JsonDict 33 34_QUERIES_DIR = pth.LocalPath(__file__).parent / "queries" 35_MODULES_DIR = pth.LocalPath(__file__).parent / "modules/ext" 36 37 38class CrossbenchTraceUriResolver(TraceUriResolver): 39 PREFIX = "crossbench" 40 41 def __init__(self, traces: Union[Iterable[Run], TraceProcessorProbeContext]): 42 43 def metadata(run: Run) -> Dict[str, str]: 44 return { 45 "cb_browser": run.browser.unique_name, 46 "cb_story": run.story.name, 47 "cb_temperature": run.temperature, 48 "cb_run": str(run.repetition) 49 } 50 51 if isinstance(traces, TraceProcessorProbeContext): 52 self._resolved = [ 53 TraceUriResolver.Result( 54 trace=str(traces.merged_trace_path.absolute()), 55 metadata=metadata(traces.run)) 56 ] 57 else: 58 self._resolved = [ 59 TraceUriResolver.Result( 60 trace=str( 61 run.results.get_by_name( 62 TraceProcessorProbe.NAME).trace.absolute()), 63 metadata=metadata(run)) for run in traces 64 ] 65 66 def resolve(self) -> List["TraceUriResolver.Result"]: 67 return self._resolved 68 69 70class TraceProcessorProbe(Probe): 71 """ 72 Trace processor probe. 73 """ 74 75 NAME = "trace_processor" 76 77 @classmethod 78 def config_parser(cls) -> ProbeConfigParser: 79 parser = super().config_parser() 80 parser.add_argument( 81 "batch", 82 type=bool, 83 required=False, 84 default=False, 85 help="Run queries in batch mode when all the test runs are done. This " 86 "can considerably reduce the run time at the expense of higher " 87 "memory usage (all traces will be loaded into memory at the same " 88 "time)") 89 parser.add_argument( 90 "metrics", 91 type=str, 92 is_list=True, 93 default=tuple(), 94 help="Name of metric to be run (can be any metric from Perfetto)") 95 parser.add_argument( 96 "queries", 97 type=str, 98 is_list=True, 99 default=tuple(), 100 help="Name of query to be run (under probes/trace_processor/queries)") 101 parser.add_argument( 102 "trace_processor_bin", 103 type=PathParser.local_binary_path, 104 required=False, 105 help="Path to the trace_processor binary") 106 return parser 107 108 def __init__(self, 109 batch: bool, 110 metrics: Iterable[str], 111 queries: Iterable[str], 112 trace_processor_bin: Optional[pth.LocalPath] = None): 113 super().__init__() 114 self._batch = batch 115 self._metrics = tuple(metrics) 116 self._queries = tuple(queries) 117 self._trace_processor_bin: Optional[pth.LocalPath] = None 118 if trace_processor_bin: 119 self._trace_processor_bin = PathParser.local_binary_path( 120 trace_processor_bin, "trace_processor") 121 122 @property 123 def batch(self) -> bool: 124 return self._batch 125 126 @property 127 def metrics(self) -> Tuple[str, ...]: 128 return self._metrics 129 130 @property 131 def queries(self) -> Tuple[str, ...]: 132 return self._queries 133 134 @property 135 def has_work(self) -> bool: 136 return len(self._queries) != 0 or len(self._metrics) != 0 137 138 @property 139 def needs_tp_run(self) -> bool: 140 return (not self.batch) and self.has_work 141 142 @property 143 def needs_btp_run(self) -> bool: 144 return self._batch and self.has_work 145 146 @property 147 def trace_processor_bin(self) -> Optional[pth.LocalPath]: 148 return self._trace_processor_bin 149 150 @property 151 def tp_config(self) -> TraceProcessorConfig: 152 extra_flags = [ 153 "--add-sql-module", 154 _MODULES_DIR, 155 ] 156 157 return TraceProcessorConfig( 158 bin_path=self.trace_processor_bin, 159 resolver_registry=ResolverRegistry( 160 resolvers=[CrossbenchTraceUriResolver, PathUriResolver]), 161 extra_flags=extra_flags) 162 163 def get_context(self, run: Run) -> TraceProcessorProbeContext: 164 return TraceProcessorProbeContext(self, run) 165 166 def validate_env(self, env: HostEnvironment) -> None: 167 super().validate_env(env) 168 self._check_sql() 169 170 def _check_sql(self) -> None: 171 """ 172 Runs all metrics and queries on an empty trace. This will ensure that they 173 are correctly defined in trace processor. 174 """ 175 with TraceProcessor(trace="/dev/null", config=self.tp_config) as tp: 176 for metric in self.metrics: 177 tp.metric([metric]) 178 for query in self.queries: 179 query_path = _QUERIES_DIR / f"{query}.sql" 180 tp.query(query_path.read_text()) 181 182 def _add_cb_columns(self, df: pd.DataFrame, run: Run) -> pd.DataFrame: 183 df["cb_browser"] = run.browser.unique_name 184 df["cb_story"] = run.story.name 185 df["cb_temperature"] = run.temperature 186 df["cb_run"] = run.repetition 187 return df 188 189 def _aggregate_results_by_query( 190 self, runs: Iterable[Run]) -> Dict[str, pd.DataFrame]: 191 res: Dict[str, pd.DataFrame] = {} 192 for run in runs: 193 for file in run.results.get(self).csv_list: 194 df = pd.read_csv(file) 195 df = self._add_cb_columns(df, run) 196 if file.stem in res: 197 res[file.stem] = pd.concat([res[file.stem], df]) 198 else: 199 res[file.stem] = df 200 201 return res 202 203 def _merge_json(self, runs: Iterable[Run]) -> Dict[str, JsonDict]: 204 merged_metrics: Dict[str, 205 MetricsMerger] = collections.defaultdict(MetricsMerger) 206 for run in runs: 207 for file_path in run.results[self].json_list: 208 with file_path.open() as json_file: 209 merged_metrics[file_path.stem].add(json.load(json_file)) 210 211 return { 212 metric_name: merged.to_json() 213 for metric_name, merged in merged_metrics.items() 214 } 215 216 def merge_browsers(self, group: BrowsersRunGroup) -> ProbeResult: 217 if self.needs_btp_run: 218 return self._run_btp(group) 219 220 return self._merge_browser_files(group) 221 222 def _merge_browser_files(self, group: BrowsersRunGroup) -> LocalProbeResult: 223 group_dir = group.get_local_probe_result_path(self) 224 group_dir.mkdir() 225 csv_files = [] 226 json_files = [] 227 for query, df in self._aggregate_results_by_query(group.runs).items(): 228 csv_file = group_dir / f"{pth.safe_filename(query)}.csv" 229 df.to_csv(path_or_buf=csv_file, index=False) 230 csv_files.append(csv_file) 231 for metric, data in self._merge_json(group.runs).items(): 232 json_file = group_dir / f"{pth.safe_filename(metric)}.json" 233 with json_file.open("x") as f: 234 json.dump(data, f, indent=4) 235 # TODO(375390958): figure out why files aren't fully written to 236 # pyfakefs here. 237 f.write("\n") 238 json_files.append(json_file) 239 return LocalProbeResult(csv=csv_files, json=json_files) 240 241 def _run_btp(self, group: BrowsersRunGroup) -> LocalProbeResult: 242 group_dir = group.get_local_probe_result_path(self) 243 group_dir.mkdir() 244 btp_config = BatchTraceProcessorConfig(tp_config=self.tp_config) 245 246 with BatchTraceProcessor( 247 traces=CrossbenchTraceUriResolver(group.runs), 248 config=btp_config) as btp: 249 250 def run_query(query: str): 251 query_path = _QUERIES_DIR / f"{query}.sql" 252 csv_file = group_dir / f"{pth.safe_filename(query)}.csv" 253 btp.query_and_flatten(query_path.read_text()).to_csv( 254 path_or_buf=csv_file, index=False) 255 return csv_file 256 257 csv_files = list(map(run_query, self.queries)) 258 259 def run_metric(metric: str): 260 json_file = group_dir / f"{pth.safe_filename(metric)}.json" 261 protos = btp.metric([metric]) 262 with json_file.open("x") as f: 263 for p in protos: 264 f.write(MessageToJson(p)) 265 return json_file 266 267 json_files = list(map(run_metric, self.metrics)) 268 269 return LocalProbeResult(csv=csv_files, json=json_files) 270 271 def log_browsers_result(self, group: BrowsersRunGroup) -> None: 272 logging.info("-" * 80) 273 logging.critical("TraceProcessor merged traces:") 274 for run in group.runs: 275 logging.critical(" - %s", run.results[self].trace) 276 277 278class TraceProcessorProbeContext(ProbeContext[TraceProcessorProbe]): 279 280 def __init__(self, probe: TraceProcessorProbe, run: Run) -> None: 281 super().__init__(probe, run) 282 283 def get_default_result_path(self) -> pth.AnyPath: 284 result_dir = super().get_default_result_path() 285 self.host_platform.mkdir(result_dir) 286 return result_dir 287 288 def setup(self) -> None: 289 pass 290 291 def start(self) -> None: 292 pass 293 294 def stop(self) -> None: 295 pass 296 297 def teardown(self) -> ProbeResult: 298 return self._merge_trace_files().merge(self._maybe_run_tp()) 299 300 def _merge_trace_files(self) -> LocalProbeResult: 301 with self.run.actions("TRACE_PROCESSOR: Merging trace files", verbose=True): 302 with zipfile.ZipFile(self.merged_trace_path, "w") as zip_file: 303 for f in self.run.results.all_traces(): 304 zip_file.write(f, arcname=f.relative_to(self.run.out_dir)) 305 return LocalProbeResult(trace=(self.merged_trace_path,)) 306 307 def _maybe_run_tp(self): 308 if not self.probe.needs_tp_run: 309 return LocalProbeResult() 310 311 with TraceProcessor( 312 trace=CrossbenchTraceUriResolver(self), 313 config=self.probe.tp_config) as tp: 314 return self._run_queries(tp).merge(self._run_metrics(tp)) 315 316 def _run_queries(self, tp: TraceProcessor) -> LocalProbeResult: 317 318 def run_query(query: str): 319 query_path = _QUERIES_DIR / f"{query}.sql" 320 csv_file = self.local_result_path / f"{pth.safe_filename(query)}.csv" 321 tp.query(query_path.read_text()).as_pandas_dataframe().to_csv( 322 path_or_buf=csv_file, index=False) 323 return csv_file 324 325 with self.run.actions("TRACE_PROCESSOR: Running queries", verbose=True): 326 files = tuple(map(run_query, self.probe.queries)) 327 return LocalProbeResult(csv=files) 328 329 def _run_metrics(self, tp: TraceProcessor) -> LocalProbeResult: 330 331 def run_metric(metric: str): 332 json_file = self.local_result_path / f"{pth.safe_filename(metric)}.json" 333 proto = tp.metric([metric]) 334 with json_file.open("x") as f: 335 f.write(MessageToJson(proto)) 336 return json_file 337 338 with self.run.actions("TRACE_PROCESSOR: Running metrics", verbose=True): 339 files = tuple(map(run_metric, self.probe.metrics)) 340 return LocalProbeResult(json=files) 341 342 @property 343 def merged_trace_path(self): 344 return self.local_result_path / "merged_trace.zip" 345