• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import dataclasses
8import datetime as dt
9from typing import (TYPE_CHECKING, Any, Dict, Iterator, Optional, Sequence,
10                    Tuple, Type, cast)
11from urllib import parse as urlparse
12
13from crossbench import path as pth
14from crossbench.action_runner.action.action_type import ActionType
15from crossbench.action_runner.action.get import GetAction
16from crossbench.benchmarks.loading.config.blocks import (ActionBlock,
17                                                         ActionBlockListConfig)
18from crossbench.benchmarks.loading.config.login.custom import LoginBlock
19from crossbench.benchmarks.loading.page.live import PAGES
20from crossbench.benchmarks.loading.playback_controller import \
21    PlaybackController
22from crossbench.cli.config.secrets import SecretsConfig
23from crossbench.config import ConfigObject, ConfigParser
24from crossbench.parse import DurationParser, ObjectParser
25
26if TYPE_CHECKING:
27  from crossbench.action_runner.action.action import Action
28
29
30@dataclasses.dataclass(frozen=True)
31class PageConfig(ConfigObject):
32  label: Optional[str] = None
33  playback: Optional[PlaybackController] = None
34  secrets: SecretsConfig = SecretsConfig()
35  login: Optional[LoginBlock] = None
36  setup: Optional[ActionBlock] = None
37  blocks: Tuple[ActionBlock, ...] = tuple()
38
39  @classmethod
40  def parse_other(cls: Type[PageConfig], value: Any, **kwargs) -> PageConfig:
41    if isinstance(value, (list, tuple)):
42      return cls.parse_sequence(value, **kwargs)
43    return super().parse_other(value)
44
45  @classmethod
46  def parse_str(  # pylint: disable=arguments-differ
47      cls: Type[PageConfig],
48      value: str,
49      label: Optional[str] = None) -> PageConfig:
50    """
51    Simple comma-separated string with optional duration:
52      value = URL,[DURATION]
53    """
54    parts = value.rsplit(",", maxsplit=1)
55    duration = dt.timedelta()
56    raw_url: str = parts[0]
57    if raw_url in PAGES:
58      url = PAGES[raw_url].url
59      label = label or raw_url
60    else:
61      url = ObjectParser.parse_fuzzy_url_str(raw_url)
62    if len(parts) == 2:
63      duration = DurationParser.positive_duration(parts[1])
64    return cls.from_url(label, url, duration)
65
66  @classmethod
67  def parse_sequence(cls: Type[PageConfig],
68                     value: Sequence[Any],
69                     label: Optional[str] = None,
70                     secrets: Optional[SecretsConfig] = None) -> PageConfig:
71    value = ObjectParser.non_empty_sequence(value, "story actions or blocks")
72    blocks = ActionBlockListConfig.parse_sequence(value)
73    if label is not None:
74      label = ObjectParser.non_empty_str(label, "label")
75    secrets = secrets or SecretsConfig()
76    return cls(label, secrets=secrets, blocks=blocks.blocks)
77
78  @classmethod
79  def parse_dict(  # pylint: disable=arguments-differ
80      cls: Type[PageConfig],
81      config: Dict[str, Any],
82      label: Optional[str] = None,
83      secrets: Optional[SecretsConfig] = None) -> PageConfig:
84    config = ObjectParser.non_empty_dict(config, "story actions or blocks")
85    page_config = cls.config_parser().parse(
86        config, label=label, secrets=secrets)
87    return page_config
88
89  @classmethod
90  def config_parser(cls: Type[PageConfig]) -> ConfigParser[PageConfig]:
91    parser = ConfigParser(f"{cls.__name__} parser", cls)
92    parser.add_argument("label", type=ObjectParser.non_empty_str)
93    parser.add_argument("playback", type=PlaybackController.parse)
94    parser.add_argument("secrets", type=SecretsConfig, default=SecretsConfig())
95    parser.add_argument("login", type=LoginBlock)
96    parser.add_argument("setup", type=ActionBlock)
97    parser.add_argument(
98        "blocks",
99        aliases=("actions", "url", "urls"),
100        type=ActionBlockListConfig)
101    return parser
102
103  @classmethod
104  def from_url(cls,
105               label: Optional[str],
106               url: str,
107               duration: dt.timedelta = dt.timedelta()) -> PageConfig:
108    actions = (GetAction(url, duration=duration),)
109    blocks = (ActionBlock(actions=actions),)
110    return PageConfig(label=label, blocks=blocks)
111
112  def actions(self) -> Iterator[Action]:
113    for block in self.blocks:
114      yield from block
115
116  @property
117  def duration(self) -> dt.timedelta:
118    return sum((action.duration for action in self.actions()), dt.timedelta())
119
120  @property
121  def any_label(self) -> str:
122    return self.label or self.url_label
123
124  @property
125  def url_label(self) -> str:
126    url = urlparse.urlparse(self.first_url)
127    if url.scheme == "about":
128      return url.path
129    if url.scheme == "file":
130      return pth.LocalPath(url.path).name
131    if hostname := url.hostname:
132      if hostname.startswith("www."):
133        return hostname[len("www."):]
134      return hostname
135    return str(url)
136
137  @property
138  def first_url(self) -> str:
139    for action in self.actions():
140      if action.TYPE == ActionType.GET:
141        return cast(GetAction, action).url
142    raise RuntimeError("No GET action with an URL found.")
143