• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import argparse
9import logging
10import re
11from typing import (TYPE_CHECKING, Any, Dict, Generic, List, Optional, Sequence,
12                    Tuple, Type, TypeVar, cast)
13
14from ordered_set import OrderedSet
15
16from crossbench import helper
17from crossbench.cli.parser import CrossBenchArgumentParser
18from crossbench.flags.base import Flags
19from crossbench.parse import ObjectParser
20from crossbench.stories.press_benchmark import PressBenchmarkStory
21from crossbench.stories.story import Story
22
23if TYPE_CHECKING:
24  from crossbench import path as pth
25  from crossbench.browsers.attributes import BrowserAttributes
26  from crossbench.runner.runner import Runner
27
28
29class BenchmarkProbeMixin:
30  NAME: str = ""
31  IS_GENERAL_PURPOSE: bool = False
32
33  def __init__(self, *args, **kwargs):
34    self._benchmark = kwargs.pop("benchmark")
35    assert isinstance(self._benchmark, Benchmark)
36    super().__init__(*args, **kwargs)
37
38  @property
39  def benchmark(self) -> Benchmark:
40    return self._benchmark
41
42
43class Benchmark(abc.ABC):
44  NAME: str = ""
45  DEFAULT_STORY_CLS: Type[Story] = Story
46  PROBES: Tuple[Type[BenchmarkProbeMixin], ...] = ()
47  DEFAULT_REPETITIONS: int = 1
48
49  @classmethod
50  def cli_help(cls) -> str:
51    assert cls.__doc__, (f"Benchmark class {cls} must provide a doc string.")
52    # Return the first non-empty line
53    return cls.__doc__.strip().splitlines()[0]
54
55  @classmethod
56  def cli_description(cls) -> str:
57    assert cls.__doc__
58    return cls.__doc__.strip()
59
60  @classmethod
61  def cli_epilog(cls) -> str:
62    return ""
63
64  @classmethod
65  def aliases(cls) -> Tuple[str, ...]:
66    return tuple()
67
68  @classmethod
69  def add_cli_parser(
70      cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
71    parser = subparsers.add_parser(
72        cls.NAME,
73        formatter_class=argparse.RawDescriptionHelpFormatter,
74        help=cls.cli_help(),
75        description=cls.cli_description(),
76        epilog=cls.cli_epilog(),
77        aliases=aliases)
78    assert isinstance(parser, CrossBenchArgumentParser)
79    return parser
80
81  @classmethod
82  def describe(cls) -> Dict[str, Any]:
83    return {
84        "name": cls.NAME,
85        "description": "\n".join(helper.wrap_lines(cls.cli_description(), 70)),
86        "stories": [],
87        "probes-default": {
88            probe_cls.NAME:
89                "\n".join(
90                    list(
91                        helper.wrap_lines((probe_cls.__doc__ or "").strip(),
92                                          70))) for probe_cls in cls.PROBES
93        }
94    }
95
96  @classmethod
97  def default_probe_config_path(cls) -> Optional[pth.LocalPath]:
98    return None
99
100  @classmethod
101  def default_network_config_path(cls) -> Optional[pth.LocalPath]:
102    return None
103
104  @classmethod
105  def extra_flags(cls, browser_attributes: BrowserAttributes) -> Flags:
106    del browser_attributes
107    return Flags()
108
109  @classmethod
110  def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
111    del args
112    return {}
113
114  @classmethod
115  def from_cli_args(cls, args: argparse.Namespace) -> Benchmark:
116    kwargs = cls.kwargs_from_cli(args)
117    return cls(**kwargs)
118
119  def __init__(self, stories: Sequence[Story]) -> None:
120    assert self.NAME is not None, f"{self} has no .NAME property"
121    assert self.DEFAULT_STORY_CLS != Story, (
122        f"{self} has no .DEFAULT_STORY_CLS property")
123    self.stories: List[Story] = self._validate_stories(stories)
124
125  def _validate_stories(self, stories: Sequence[Story]) -> List[Story]:
126    assert stories, "No stories provided"
127    for story in stories:
128      assert isinstance(story, self.DEFAULT_STORY_CLS), (
129          f"story={story} should be a subclass/the same "
130          f"class as {self.DEFAULT_STORY_CLS}")
131    return list(stories)
132
133  def setup(self, runner: Runner) -> None:
134    del runner
135
136
137StoryT = TypeVar("StoryT", bound=Story)
138
139
140class StoryFilter(Generic[StoryT], metaclass=abc.ABCMeta):
141  CAN_COMBINE_STORIES: bool = True
142
143  @classmethod
144  def add_cli_parser(
145      cls, parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
146    parser.add_argument(
147        "--stories",
148        "--story",
149        dest="stories",
150        default="default",
151        help="Comma-separated list of story names. "
152        "Use 'all' for selecting all available stories. "
153        "Use 'default' for the standard selection of stories.")
154    if cls.CAN_COMBINE_STORIES:
155      is_combined_group = parser.add_mutually_exclusive_group()
156      is_combined_group.add_argument(
157          "--combined",
158          dest="separate",
159          default=False,
160          action="store_false",
161          help="Run each story in the same session. (default)")
162      is_combined_group.add_argument(
163          "--separate",
164          action="store_true",
165          help="Run each story in a fresh browser.")
166
167    return parser
168
169  @classmethod
170  def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
171    return {"patterns": args.stories.split(",")}
172
173  @classmethod
174  def from_cli_args(cls, story_cls: Type[StoryT],
175                    args: argparse.Namespace) -> StoryFilter[StoryT]:
176    kwargs = cls.kwargs_from_cli(args)
177    return cls(story_cls, **kwargs)
178
179  def __init__(self,
180               story_cls: Type[StoryT],
181               patterns: Sequence[str],
182               separate: bool = False) -> None:
183    self.story_cls: Type[StoryT] = story_cls
184    assert issubclass(
185        story_cls, Story), (f"Subclass of {Story} expected, found {story_cls}")
186    # Using order-preserving dict instead of set
187    self._known_names: Dict[str,
188                            None] = dict.fromkeys(story_cls.all_story_names())
189    self.stories: Sequence[StoryT] = []
190    # TODO: only use one method.
191    self.process_all(patterns)
192    self.stories = self.create_stories(separate)
193
194  @abc.abstractmethod
195  def process_all(self, patterns: Sequence[str]) -> None:
196    pass
197
198  @abc.abstractmethod
199  def create_stories(self, separate: bool) -> Sequence[StoryT]:
200    pass
201
202
203class SubStoryBenchmark(Benchmark, metaclass=abc.ABCMeta):
204  STORY_FILTER_CLS: Type[StoryFilter] = StoryFilter
205
206  @classmethod
207  def add_cli_parser(
208      cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
209    parser = super().add_cli_parser(subparsers, aliases)
210    return parser
211
212  @classmethod
213  def cli_description(cls) -> str:
214    desc = super().cli_description()
215    desc += "\n\n"
216    desc += ("Stories (alternatively use the 'describe benchmark "
217             f"{cls.NAME}' command):\n")
218    desc += ", ".join(cls.all_story_names())
219    desc += "\n\n"
220    desc += "Filtering (for --stories): "
221    assert cls.STORY_FILTER_CLS.__doc__, (
222        f"{cls.STORY_FILTER_CLS} has no doc string.")
223    desc += cls.STORY_FILTER_CLS.__doc__.strip()
224
225    return desc
226
227  @classmethod
228  def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
229    kwargs = super().kwargs_from_cli(args)
230    kwargs["stories"] = cls.stories_from_cli_args(args)
231    return kwargs
232
233  @classmethod
234  def stories_from_cli_args(cls, args: argparse.Namespace) -> Sequence[Story]:
235    return cls.STORY_FILTER_CLS.from_cli_args(cls.DEFAULT_STORY_CLS,
236                                              args).stories
237
238  @classmethod
239  def describe(cls) -> Dict[str, Any]:
240    data = super().describe()
241    data["stories"] = cls.all_story_names()
242    return data
243
244  @classmethod
245  def all_story_names(cls) -> Sequence[str]:
246    return sorted(cls.DEFAULT_STORY_CLS.all_story_names())
247
248
249PressBenchmarkStoryT = TypeVar(
250    "PressBenchmarkStoryT", bound=PressBenchmarkStory)
251
252
253class PressBenchmarkStoryFilter(StoryFilter[PressBenchmarkStoryT],
254                                Generic[PressBenchmarkStoryT]):
255  """
256  Filter stories by name or regexp.
257
258  Syntax:
259    "all"     Include all stories (defaults to story_names).
260    "name"    Include story with the given name.
261    "-name"   Exclude story with the given name'
262    "foo.*"   Include stories whose name matches the regexp.
263    "-foo.*"  Exclude stories whose name matches the regexp.
264
265  These patterns can be combined:
266    [".*", "-foo", "-bar"] Includes all except the "foo" and "bar" story
267  """
268
269  @classmethod
270  def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
271    kwargs = super().kwargs_from_cli(args)
272    kwargs["separate"] = args.separate
273    kwargs["url"] = args.custom_benchmark_url
274    return kwargs
275
276  def __init__(self,
277               story_cls: Type[PressBenchmarkStoryT],
278               patterns: Sequence[str],
279               separate: bool = False,
280               url: Optional[str] = None):
281    self.url: Optional[str] = url
282    self._selected_names: OrderedSet[str] = OrderedSet()
283    super().__init__(story_cls, patterns, separate)
284    assert issubclass(self.story_cls, PressBenchmarkStory)
285    for name in self._known_names:
286      assert name, "Invalid empty story name"
287      assert not name.startswith("-"), (
288          f"Known story names cannot start with '-', but got '{name}'.")
289      assert not name == "all", "Known story name cannot match 'all'."
290
291  def process_all(self, patterns: Sequence[str]) -> None:
292    if not isinstance(patterns, (list, tuple)):
293      raise ValueError("Expected Sequence of story name or patterns "
294                       f"but got '{type(patterns)}'.")
295    for pattern in patterns:
296      self.process_pattern(pattern)
297
298  def process_pattern(self, pattern: str) -> None:
299    if pattern.startswith("-"):
300      self.remove(pattern[1:])
301    else:
302      self.add(pattern)
303
304  def add(self, pattern: str) -> None:
305    self._check_processed_pattern(pattern)
306    regexp = self._pattern_to_regexp(pattern)
307    self._add_matching(regexp, pattern)
308
309  def remove(self, pattern: str) -> None:
310    self._check_processed_pattern(pattern)
311    regexp = self._pattern_to_regexp(pattern)
312    self._remove_matching(regexp, pattern)
313
314  def _pattern_to_regexp(self, pattern: str) -> re.Pattern:
315    if pattern == "all":
316      return re.compile(".*")
317    if pattern == "default":
318      default_story_names = self.story_cls.default_story_names()
319      if default_story_names == self.story_cls.all_story_names():
320        return re.compile(".*")
321      joined_names = "|".join(re.escape(name) for name in default_story_names)
322      return re.compile(f"^({joined_names})$")
323    if pattern in self._known_names:
324      return re.compile(re.escape(pattern))
325    return re.compile(pattern)
326
327  def _check_processed_pattern(self, pattern: str) -> None:
328    if not pattern:
329      raise ValueError("Empty pattern is not allowed")
330    if pattern == "-":
331      raise ValueError(f"Empty remove pattern not allowed: '{pattern}'")
332    if pattern[0] == "-":
333      raise ValueError(f"Unprocessed negative pattern not allowed: '{pattern}'")
334
335  def _add_matching(self, regexp: re.Pattern, original_pattern: str) -> None:
336    substories = self._regexp_match(regexp, original_pattern)
337    self._selected_names.update(substories)
338
339  def _remove_matching(self, regexp: re.Pattern, original_pattern: str) -> None:
340    substories = self._regexp_match(regexp, original_pattern)
341    for substory in substories:
342      try:
343        self._selected_names.remove(substory)
344      except KeyError as e:
345        raise ValueError(
346            "Removing Story failed: "
347            f"name='{substory}' extracted by pattern='{original_pattern}'"
348            "is not in the filtered story list") from e
349
350  def _regexp_match(self, regexp: re.Pattern,
351                    original_pattern: str) -> List[str]:
352    substories = [
353        substory for substory in self._known_names if regexp.fullmatch(substory)
354    ]
355    if not substories:
356      logging.warning(
357          "No matching stories, using case-insensitive fallback regexp.")
358      iregexp: re.Pattern = re.compile(regexp.pattern, flags=re.IGNORECASE)
359      substories = [
360          substory for substory in self._known_names
361          if iregexp.fullmatch(substory)
362      ]
363    if not substories:
364      raise ValueError(f"'{original_pattern}' didn't match any stories.")
365    if len(substories) == len(self._known_names) and self._selected_names:
366      raise ValueError(f"'{original_pattern}' matched all and overrode all"
367                       "previously filtered story names.")
368    return substories
369
370  def create_stories(self, separate: bool) -> Sequence[PressBenchmarkStoryT]:
371    logging.info("SELECTED STORIES: %s",
372                 str(list(map(str, self._selected_names))))
373    names = list(self._selected_names)
374    return self.create_stories_from_names(names, separate)
375
376  def create_stories_from_names(
377      self, names: List[str], separate: bool) -> Sequence[PressBenchmarkStoryT]:
378    return self.story_cls.from_names(names, separate=separate, url=self.url)
379
380
381class PressBenchmark(SubStoryBenchmark):
382  STORY_FILTER_CLS = PressBenchmarkStoryFilter
383  DEFAULT_STORY_CLS: Type[PressBenchmarkStory] = PressBenchmarkStory
384
385  @classmethod
386  @abc.abstractmethod
387  def short_base_name(cls) -> str:
388    raise NotImplementedError()
389
390  @classmethod
391  @abc.abstractmethod
392  def base_name(cls) -> str:
393    raise NotImplementedError()
394
395  @classmethod
396  @abc.abstractmethod
397  def version(cls) -> Tuple[int, ...]:
398    raise NotImplementedError()
399
400  @classmethod
401  def aliases(cls) -> Tuple[str, ...]:
402    version = [str(v) for v in cls.version()]
403    assert version, "Expected non-empty version tuple."
404    version_names = []
405    dot_version = ".".join(version)
406    for name in (cls.short_base_name(), cls.base_name()):
407      assert name, "Expected non-empty base name."
408      version_names.append(f"{name}{dot_version}")
409      version_names.append(f"{name}_{dot_version}")
410    return tuple(version_names)
411
412  @classmethod
413  def add_cli_parser(
414      cls, subparsers, aliases: Sequence[str] = ()) -> CrossBenchArgumentParser:
415    parser = super().add_cli_parser(subparsers, aliases)
416    # TODO: Move story-related args to dedicated PressBenchmarkStoryFilter class
417    benchmark_url_group = parser.add_mutually_exclusive_group()
418    live_url = cls.DEFAULT_STORY_CLS.URL
419    local_url = cls.DEFAULT_STORY_CLS.URL_LOCAL
420    official_url = cls.DEFAULT_STORY_CLS.URL_OFFICIAL
421    benchmark_url_group.add_argument(
422        "--live",
423        "--live-url",
424        "--browser-ben",
425        dest="custom_benchmark_url",
426        const=None,
427        action="store_const",
428        help=(f"Use chrome live benchmark url ({live_url}) "
429              "on https://browserben.ch."))
430    benchmark_url_group.add_argument(
431        "--official",
432        "--official-url",
433        dest="custom_benchmark_url",
434        const=official_url,
435        action="store_const",
436        help=(f"Use officially hosted live/online benchmark url "
437              f"({official_url})."))
438    benchmark_url_group.add_argument(
439        "--local",
440        "--local-url",
441        "--url",
442        "--custom-benchmark-url",
443        type=ObjectParser.httpx_url_str,
444        nargs="?",
445        dest="custom_benchmark_url",
446        const=local_url,
447        help=(f"Use custom or locally (default={local_url}) "
448              "hosted benchmark url."))
449    cls.STORY_FILTER_CLS.add_cli_parser(parser)
450    return parser
451
452  @classmethod
453  def kwargs_from_cli(cls, args: argparse.Namespace) -> Dict[str, Any]:
454    kwargs = super().kwargs_from_cli(args)
455    kwargs["custom_url"] = args.custom_benchmark_url
456    return kwargs
457
458  @classmethod
459  def describe(cls) -> Dict[str, Any]:
460    data = super().describe()
461    assert issubclass(cls.DEFAULT_STORY_CLS, PressBenchmarkStory)
462    data["url"] = cls.DEFAULT_STORY_CLS.URL
463    data["url-official"] = cls.DEFAULT_STORY_CLS.URL_OFFICIAL
464    data["url-local"] = cls.DEFAULT_STORY_CLS.URL_LOCAL
465    return data
466
467  def __init__(self,
468               stories: Sequence[Story],
469               custom_url: Optional[str] = None):
470    super().__init__(stories)
471    self.custom_url = custom_url
472    if custom_url:
473      for story in stories:
474        press_story = cast(PressBenchmarkStory, story)
475        assert press_story.url == custom_url
476
477  def setup(self, runner: Runner) -> None:
478    super().setup(runner)
479    self.validate_url(runner)
480
481  def validate_url(self, runner: Runner) -> None:
482    if self.custom_url:
483      if runner.has_any_live_network():
484        self._validate_custom_url(runner, self.custom_url)
485      return
486    first_story = cast(PressBenchmarkStory, self.stories[0])
487    url = first_story.url
488    if not runner.has_all_live_network() and not url:
489      # For non-live networks we create a matching URL
490      return
491    if not url:
492      raise ValueError("Invalid empty url")
493    if all(runner.env.validate_url(url, p) for p in runner.platforms):
494      return
495    msg = [
496        f"Could not reach live benchmark URL: '{url}'."
497        f"Please make sure you're connected to the internet."
498    ]
499    local_url = first_story.URL_LOCAL
500    if local_url:
501      msg.append(
502          f"Alternatively use --local for the default local URL: {local_url}")
503    raise ValueError("\n".join(msg))
504
505  def _validate_custom_url(self, runner: Runner, url: str) -> None:
506    if not all(runner.env.validate_url(url, p) for p in runner.platforms):
507      raise ValueError(
508          f"Could not reach custom benchmark URL: '{self.custom_url}'. "
509          f"Please make sure your local web server is running.")
510