1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import argparse 9import logging 10import re 11from typing import (TYPE_CHECKING, Any, Dict, Generic, List, Optional, Sequence, 12 Tuple, Type, TypeVar, cast) 13 14from ordered_set import OrderedSet 15 16from crossbench import helper 17from crossbench.cli.parser import CrossBenchArgumentParser 18from crossbench.flags.base import Flags 19from crossbench.parse import ObjectParser 20from crossbench.stories.press_benchmark import PressBenchmarkStory 21from crossbench.stories.story import Story 22 23if TYPE_CHECKING: 24 from crossbench import path as pth 25 from crossbench.browsers.attributes import BrowserAttributes 26 from crossbench.runner.runner import Runner 27 28 29class BenchmarkProbeMixin: 30 NAME: str = "" 31 IS_GENERAL_PURPOSE: bool = False 32 33 def __init__(self, *args, **kwargs): 34 self._benchmark = kwargs.pop("benchmark") 35 assert isinstance(self._benchmark, Benchmark) 36 super().__init__(*args, **kwargs) 37 38 @property 39 def benchmark(self) -> Benchmark: 40 return self._benchmark 41 42 43class Benchmark(abc.ABC): 44 NAME: str = "" 45 DEFAULT_STORY_CLS: Type[Story] = Story 46 PROBES: Tuple[Type[BenchmarkProbeMixin], ...] = () 47 DEFAULT_REPETITIONS: int = 1 48 49 @classmethod 50 def cli_help(cls) -> str: 51 assert cls.__doc__, (f"Benchmark class {cls} must provide a doc string.") 52 # Return the first non-empty line 53 return cls.__doc__.strip().splitlines()[0] 54 55 @classmethod 56 def cli_description(cls) -> str: 57 assert cls.__doc__ 58 return cls.__doc__.strip() 59 60 @classmethod 61 def cli_epilog(cls) -> str: 62 return "" 63 64 @classmethod 65 def aliases(cls) -> Tuple[str, ...]: 66 return tuple() 67 68 @classmethod 69 def add_cli_parser( 70 cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser: 71 parser = subparsers.add_parser( 72 cls.NAME, 73 formatter_class=argparse.RawDescriptionHelpFormatter, 74 help=cls.cli_help(), 75 description=cls.cli_description(), 76 epilog=cls.cli_epilog(), 77 aliases=aliases) 78 assert isinstance(parser, CrossBenchArgumentParser) 79 return parser 80 81 @classmethod 82 def describe(cls) -> Dict[str, Any]: 83 return { 84 "name": cls.NAME, 85 "description": "\n".join(helper.wrap_lines(cls.cli_description(), 70)), 86 "stories": [], 87 "probes-default": { 88 probe_cls.NAME: 89 "\n".join( 90 list( 91 helper.wrap_lines((probe_cls.__doc__ or "").strip(), 92 70))) for probe_cls in cls.PROBES 93 } 94 } 95 96 @classmethod 97 def default_probe_config_path(cls) -> Optional[pth.LocalPath]: 98 return None 99 100 @classmethod 101 def default_network_config_path(cls) -> Optional[pth.LocalPath]: 102 return None 103 104 @classmethod 105 def extra_flags(cls, browser_attributes: BrowserAttributes) -> Flags: 106 del browser_attributes 107 return Flags() 108 109 @classmethod 110 def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]: 111 del args 112 return {} 113 114 @classmethod 115 def from_cli_args(cls, args: argparse.Namespace) -> Benchmark: 116 kwargs = cls.kwargs_from_cli(args) 117 return cls(**kwargs) 118 119 def __init__(self, stories: Sequence[Story]) -> None: 120 assert self.NAME is not None, f"{self} has no .NAME property" 121 assert self.DEFAULT_STORY_CLS != Story, ( 122 f"{self} has no .DEFAULT_STORY_CLS property") 123 self.stories: List[Story] = self._validate_stories(stories) 124 125 def _validate_stories(self, stories: Sequence[Story]) -> List[Story]: 126 assert stories, "No stories provided" 127 for story in stories: 128 assert isinstance(story, self.DEFAULT_STORY_CLS), ( 129 f"story={story} should be a subclass/the same " 130 f"class as {self.DEFAULT_STORY_CLS}") 131 return list(stories) 132 133 def setup(self, runner: Runner) -> None: 134 del runner 135 136 137StoryT = TypeVar("StoryT", bound=Story) 138 139 140class StoryFilter(Generic[StoryT], metaclass=abc.ABCMeta): 141 CAN_COMBINE_STORIES: bool = True 142 143 @classmethod 144 def add_cli_parser( 145 cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser: 146 parser.add_argument( 147 "--stories", 148 "--story", 149 dest="stories", 150 default="default", 151 help="Comma-separated list of story names. " 152 "Use 'all' for selecting all available stories. " 153 "Use 'default' for the standard selection of stories.") 154 if cls.CAN_COMBINE_STORIES: 155 is_combined_group = parser.add_mutually_exclusive_group() 156 is_combined_group.add_argument( 157 "--combined", 158 dest="separate", 159 default=False, 160 action="store_false", 161 help="Run each story in the same session. (default)") 162 is_combined_group.add_argument( 163 "--separate", 164 action="store_true", 165 help="Run each story in a fresh browser.") 166 167 return parser 168 169 @classmethod 170 def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]: 171 return {"patterns": args.stories.split(",")} 172 173 @classmethod 174 def from_cli_args(cls, story_cls: Type[StoryT], 175 args: argparse.Namespace) -> StoryFilter[StoryT]: 176 kwargs = cls.kwargs_from_cli(args) 177 return cls(story_cls, **kwargs) 178 179 def __init__(self, 180 story_cls: Type[StoryT], 181 patterns: Sequence[str], 182 separate: bool = False) -> None: 183 self.story_cls: Type[StoryT] = story_cls 184 assert issubclass( 185 story_cls, Story), (f"Subclass of {Story} expected, found {story_cls}") 186 # Using order-preserving dict instead of set 187 self._known_names: Dict[str, 188 None] = dict.fromkeys(story_cls.all_story_names()) 189 self.stories: Sequence[StoryT] = [] 190 # TODO: only use one method. 191 self.process_all(patterns) 192 self.stories = self.create_stories(separate) 193 194 @abc.abstractmethod 195 def process_all(self, patterns: Sequence[str]) -> None: 196 pass 197 198 @abc.abstractmethod 199 def create_stories(self, separate: bool) -> Sequence[StoryT]: 200 pass 201 202 203class SubStoryBenchmark(Benchmark, metaclass=abc.ABCMeta): 204 STORY_FILTER_CLS: Type[StoryFilter] = StoryFilter 205 206 @classmethod 207 def add_cli_parser( 208 cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser: 209 parser = super().add_cli_parser(subparsers, aliases) 210 return parser 211 212 @classmethod 213 def cli_description(cls) -> str: 214 desc = super().cli_description() 215 desc += "\n\n" 216 desc += ("Stories (alternatively use the 'describe benchmark " 217 f"{cls.NAME}' command):\n") 218 desc += ", ".join(cls.all_story_names()) 219 desc += "\n\n" 220 desc += "Filtering (for --stories): " 221 assert cls.STORY_FILTER_CLS.__doc__, ( 222 f"{cls.STORY_FILTER_CLS} has no doc string.") 223 desc += cls.STORY_FILTER_CLS.__doc__.strip() 224 225 return desc 226 227 @classmethod 228 def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]: 229 kwargs = super().kwargs_from_cli(args) 230 kwargs["stories"] = cls.stories_from_cli_args(args) 231 return kwargs 232 233 @classmethod 234 def stories_from_cli_args(cls, args: argparse.Namespace) -> Sequence[Story]: 235 return cls.STORY_FILTER_CLS.from_cli_args(cls.DEFAULT_STORY_CLS, 236 args).stories 237 238 @classmethod 239 def describe(cls) -> Dict[str, Any]: 240 data = super().describe() 241 data["stories"] = cls.all_story_names() 242 return data 243 244 @classmethod 245 def all_story_names(cls) -> Sequence[str]: 246 return sorted(cls.DEFAULT_STORY_CLS.all_story_names()) 247 248 249PressBenchmarkStoryT = TypeVar( 250 "PressBenchmarkStoryT", bound=PressBenchmarkStory) 251 252 253class PressBenchmarkStoryFilter(StoryFilter[PressBenchmarkStoryT], 254 Generic[PressBenchmarkStoryT]): 255 """ 256 Filter stories by name or regexp. 257 258 Syntax: 259 "all" Include all stories (defaults to story_names). 260 "name" Include story with the given name. 261 "-name" Exclude story with the given name' 262 "foo.*" Include stories whose name matches the regexp. 263 "-foo.*" Exclude stories whose name matches the regexp. 264 265 These patterns can be combined: 266 [".*", "-foo", "-bar"] Includes all except the "foo" and "bar" story 267 """ 268 269 @classmethod 270 def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]: 271 kwargs = super().kwargs_from_cli(args) 272 kwargs["separate"] = args.separate 273 kwargs["url"] = args.custom_benchmark_url 274 return kwargs 275 276 def __init__(self, 277 story_cls: Type[PressBenchmarkStoryT], 278 patterns: Sequence[str], 279 separate: bool = False, 280 url: Optional[str] = None): 281 self.url: Optional[str] = url 282 self._selected_names: OrderedSet[str] = OrderedSet() 283 super().__init__(story_cls, patterns, separate) 284 assert issubclass(self.story_cls, PressBenchmarkStory) 285 for name in self._known_names: 286 assert name, "Invalid empty story name" 287 assert not name.startswith("-"), ( 288 f"Known story names cannot start with '-', but got '{name}'.") 289 assert not name == "all", "Known story name cannot match 'all'." 290 291 def process_all(self, patterns: Sequence[str]) -> None: 292 if not isinstance(patterns, (list, tuple)): 293 raise ValueError("Expected Sequence of story name or patterns " 294 f"but got '{type(patterns)}'.") 295 for pattern in patterns: 296 self.process_pattern(pattern) 297 298 def process_pattern(self, pattern: str) -> None: 299 if pattern.startswith("-"): 300 self.remove(pattern[1:]) 301 else: 302 self.add(pattern) 303 304 def add(self, pattern: str) -> None: 305 self._check_processed_pattern(pattern) 306 regexp = self._pattern_to_regexp(pattern) 307 self._add_matching(regexp, pattern) 308 309 def remove(self, pattern: str) -> None: 310 self._check_processed_pattern(pattern) 311 regexp = self._pattern_to_regexp(pattern) 312 self._remove_matching(regexp, pattern) 313 314 def _pattern_to_regexp(self, pattern: str) -> re.Pattern: 315 if pattern == "all": 316 return re.compile(".*") 317 if pattern == "default": 318 default_story_names = self.story_cls.default_story_names() 319 if default_story_names == self.story_cls.all_story_names(): 320 return re.compile(".*") 321 joined_names = "|".join(re.escape(name) for name in default_story_names) 322 return re.compile(f"^({joined_names})$") 323 if pattern in self._known_names: 324 return re.compile(re.escape(pattern)) 325 return re.compile(pattern) 326 327 def _check_processed_pattern(self, pattern: str) -> None: 328 if not pattern: 329 raise ValueError("Empty pattern is not allowed") 330 if pattern == "-": 331 raise ValueError(f"Empty remove pattern not allowed: '{pattern}'") 332 if pattern[0] == "-": 333 raise ValueError(f"Unprocessed negative pattern not allowed: '{pattern}'") 334 335 def _add_matching(self, regexp: re.Pattern, original_pattern: str) -> None: 336 substories = self._regexp_match(regexp, original_pattern) 337 self._selected_names.update(substories) 338 339 def _remove_matching(self, regexp: re.Pattern, original_pattern: str) -> None: 340 substories = self._regexp_match(regexp, original_pattern) 341 for substory in substories: 342 try: 343 self._selected_names.remove(substory) 344 except KeyError as e: 345 raise ValueError( 346 "Removing Story failed: " 347 f"name='{substory}' extracted by pattern='{original_pattern}'" 348 "is not in the filtered story list") from e 349 350 def _regexp_match(self, regexp: re.Pattern, 351 original_pattern: str) -> List[str]: 352 substories = [ 353 substory for substory in self._known_names if regexp.fullmatch(substory) 354 ] 355 if not substories: 356 logging.warning( 357 "No matching stories, using case-insensitive fallback regexp.") 358 iregexp: re.Pattern = re.compile(regexp.pattern, flags=re.IGNORECASE) 359 substories = [ 360 substory for substory in self._known_names 361 if iregexp.fullmatch(substory) 362 ] 363 if not substories: 364 raise ValueError(f"'{original_pattern}' didn't match any stories.") 365 if len(substories) == len(self._known_names) and self._selected_names: 366 raise ValueError(f"'{original_pattern}' matched all and overrode all" 367 "previously filtered story names.") 368 return substories 369 370 def create_stories(self, separate: bool) -> Sequence[PressBenchmarkStoryT]: 371 logging.info("SELECTED STORIES: %s", 372 str(list(map(str, self._selected_names)))) 373 names = list(self._selected_names) 374 return self.create_stories_from_names(names, separate) 375 376 def create_stories_from_names( 377 self, names: List[str], separate: bool) -> Sequence[PressBenchmarkStoryT]: 378 return self.story_cls.from_names(names, separate=separate, url=self.url) 379 380 381class PressBenchmark(SubStoryBenchmark): 382 STORY_FILTER_CLS = PressBenchmarkStoryFilter 383 DEFAULT_STORY_CLS: Type[PressBenchmarkStory] = PressBenchmarkStory 384 385 @classmethod 386 @abc.abstractmethod 387 def short_base_name(cls) -> str: 388 raise NotImplementedError() 389 390 @classmethod 391 @abc.abstractmethod 392 def base_name(cls) -> str: 393 raise NotImplementedError() 394 395 @classmethod 396 @abc.abstractmethod 397 def version(cls) -> Tuple[int, ...]: 398 raise NotImplementedError() 399 400 @classmethod 401 def aliases(cls) -> Tuple[str, ...]: 402 version = [str(v) for v in cls.version()] 403 assert version, "Expected non-empty version tuple." 404 version_names = [] 405 dot_version = ".".join(version) 406 for name in (cls.short_base_name(), cls.base_name()): 407 assert name, "Expected non-empty base name." 408 version_names.append(f"{name}{dot_version}") 409 version_names.append(f"{name}_{dot_version}") 410 return tuple(version_names) 411 412 @classmethod 413 def add_cli_parser( 414 cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser: 415 parser = super().add_cli_parser(subparsers, aliases) 416 # TODO: Move story-related args to dedicated PressBenchmarkStoryFilter class 417 benchmark_url_group = parser.add_mutually_exclusive_group() 418 live_url = cls.DEFAULT_STORY_CLS.URL 419 local_url = cls.DEFAULT_STORY_CLS.URL_LOCAL 420 official_url = cls.DEFAULT_STORY_CLS.URL_OFFICIAL 421 benchmark_url_group.add_argument( 422 "--live", 423 "--live-url", 424 "--browser-ben", 425 dest="custom_benchmark_url", 426 const=None, 427 action="store_const", 428 help=(f"Use chrome live benchmark url ({live_url}) " 429 "on https://browserben.ch.")) 430 benchmark_url_group.add_argument( 431 "--official", 432 "--official-url", 433 dest="custom_benchmark_url", 434 const=official_url, 435 action="store_const", 436 help=(f"Use officially hosted live/online benchmark url " 437 f"({official_url}).")) 438 benchmark_url_group.add_argument( 439 "--local", 440 "--local-url", 441 "--url", 442 "--custom-benchmark-url", 443 type=ObjectParser.httpx_url_str, 444 nargs="?", 445 dest="custom_benchmark_url", 446 const=local_url, 447 help=(f"Use custom or locally (default={local_url}) " 448 "hosted benchmark url.")) 449 cls.STORY_FILTER_CLS.add_cli_parser(parser) 450 return parser 451 452 @classmethod 453 def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]: 454 kwargs = super().kwargs_from_cli(args) 455 kwargs["custom_url"] = args.custom_benchmark_url 456 return kwargs 457 458 @classmethod 459 def describe(cls) -> Dict[str, Any]: 460 data = super().describe() 461 assert issubclass(cls.DEFAULT_STORY_CLS, PressBenchmarkStory) 462 data["url"] = cls.DEFAULT_STORY_CLS.URL 463 data["url-official"] = cls.DEFAULT_STORY_CLS.URL_OFFICIAL 464 data["url-local"] = cls.DEFAULT_STORY_CLS.URL_LOCAL 465 return data 466 467 def __init__(self, 468 stories: Sequence[Story], 469 custom_url: Optional[str] = None): 470 super().__init__(stories) 471 self.custom_url = custom_url 472 if custom_url: 473 for story in stories: 474 press_story = cast(PressBenchmarkStory, story) 475 assert press_story.url == custom_url 476 477 def setup(self, runner: Runner) -> None: 478 super().setup(runner) 479 self.validate_url(runner) 480 481 def validate_url(self, runner: Runner) -> None: 482 if self.custom_url: 483 if runner.has_any_live_network(): 484 self._validate_custom_url(runner, self.custom_url) 485 return 486 first_story = cast(PressBenchmarkStory, self.stories[0]) 487 url = first_story.url 488 if not runner.has_all_live_network() and not url: 489 # For non-live networks we create a matching URL 490 return 491 if not url: 492 raise ValueError("Invalid empty url") 493 if all(runner.env.validate_url(url, p) for p in runner.platforms): 494 return 495 msg = [ 496 f"Could not reach live benchmark URL: '{url}'." 497 f"Please make sure you're connected to the internet." 498 ] 499 local_url = first_story.URL_LOCAL 500 if local_url: 501 msg.append( 502 f"Alternatively use --local for the default local URL: {local_url}") 503 raise ValueError("\n".join(msg)) 504 505 def _validate_custom_url(self, runner: Runner, url: str) -> None: 506 if not all(runner.env.validate_url(url, p) for p in runner.platforms): 507 raise ValueError( 508 f"Could not reach custom benchmark URL: '{self.custom_url}'. " 509 f"Please make sure your local web server is running.") 510