1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import dataclasses 8import datetime as dt 9from typing import (TYPE_CHECKING, Any, Dict, Iterator, Optional, Sequence, 10 Tuple, Type, cast) 11from urllib import parse as urlparse 12 13from crossbench import path as pth 14from crossbench.action_runner.action.action_type import ActionType 15from crossbench.action_runner.action.get import GetAction 16from crossbench.benchmarks.loading.config.blocks import (ActionBlock, 17 ActionBlockListConfig) 18from crossbench.benchmarks.loading.config.login.custom import LoginBlock 19from crossbench.benchmarks.loading.page.live import PAGES 20from crossbench.benchmarks.loading.playback_controller import \ 21 PlaybackController 22from crossbench.cli.config.secrets import SecretsConfig 23from crossbench.config import ConfigObject, ConfigParser 24from crossbench.parse import DurationParser, ObjectParser 25 26if TYPE_CHECKING: 27 from crossbench.action_runner.action.action import Action 28 29 30@dataclasses.dataclass(frozen=True) 31class PageConfig(ConfigObject): 32 label: Optional[str] = None 33 playback: Optional[PlaybackController] = None 34 secrets: SecretsConfig = SecretsConfig() 35 login: Optional[LoginBlock] = None 36 setup: Optional[ActionBlock] = None 37 blocks: Tuple[ActionBlock, ...] = tuple() 38 39 @classmethod 40 def parse_other(cls: Type[PageConfig], value: Any, **kwargs) -> PageConfig: 41 if isinstance(value, (list, tuple)): 42 return cls.parse_sequence(value, **kwargs) 43 return super().parse_other(value) 44 45 @classmethod 46 def parse_str( # pylint: disable=arguments-differ 47 cls: Type[PageConfig], 48 value: str, 49 label: Optional[str] = None) -> PageConfig: 50 """ 51 Simple comma-separated string with optional duration: 52 value = URL,[DURATION] 53 """ 54 parts = value.rsplit(",", maxsplit=1) 55 duration = dt.timedelta() 56 raw_url: str = parts[0] 57 if raw_url in PAGES: 58 url = PAGES[raw_url].url 59 label = label or raw_url 60 else: 61 url = ObjectParser.parse_fuzzy_url_str(raw_url) 62 if len(parts) == 2: 63 duration = DurationParser.positive_duration(parts[1]) 64 return cls.from_url(label, url, duration) 65 66 @classmethod 67 def parse_sequence(cls: Type[PageConfig], 68 value: Sequence[Any], 69 label: Optional[str] = None, 70 secrets: Optional[SecretsConfig] = None) -> PageConfig: 71 value = ObjectParser.non_empty_sequence(value, "story actions or blocks") 72 blocks = ActionBlockListConfig.parse_sequence(value) 73 if label is not None: 74 label = ObjectParser.non_empty_str(label, "label") 75 secrets = secrets or SecretsConfig() 76 return cls(label, secrets=secrets, blocks=blocks.blocks) 77 78 @classmethod 79 def parse_dict( # pylint: disable=arguments-differ 80 cls: Type[PageConfig], 81 config: Dict[str, Any], 82 label: Optional[str] = None, 83 secrets: Optional[SecretsConfig] = None) -> PageConfig: 84 config = ObjectParser.non_empty_dict(config, "story actions or blocks") 85 page_config = cls.config_parser().parse( 86 config, label=label, secrets=secrets) 87 return page_config 88 89 @classmethod 90 def config_parser(cls: Type[PageConfig]) -> ConfigParser[PageConfig]: 91 parser = ConfigParser(f"{cls.__name__} parser", cls) 92 parser.add_argument("label", type=ObjectParser.non_empty_str) 93 parser.add_argument("playback", type=PlaybackController.parse) 94 parser.add_argument("secrets", type=SecretsConfig, default=SecretsConfig()) 95 parser.add_argument("login", type=LoginBlock) 96 parser.add_argument("setup", type=ActionBlock) 97 parser.add_argument( 98 "blocks", 99 aliases=("actions", "url", "urls"), 100 type=ActionBlockListConfig) 101 return parser 102 103 @classmethod 104 def from_url(cls, 105 label: Optional[str], 106 url: str, 107 duration: dt.timedelta = dt.timedelta()) -> PageConfig: 108 actions = (GetAction(url, duration=duration),) 109 blocks = (ActionBlock(actions=actions),) 110 return PageConfig(label=label, blocks=blocks) 111 112 def actions(self) -> Iterator[Action]: 113 for block in self.blocks: 114 yield from block 115 116 @property 117 def duration(self) -> dt.timedelta: 118 return sum((action.duration for action in self.actions()), dt.timedelta()) 119 120 @property 121 def any_label(self) -> str: 122 return self.label or self.url_label 123 124 @property 125 def url_label(self) -> str: 126 url = urlparse.urlparse(self.first_url) 127 if url.scheme == "about": 128 return url.path 129 if url.scheme == "file": 130 return pth.LocalPath(url.path).name 131 if hostname := url.hostname: 132 if hostname.startswith("www."): 133 return hostname[len("www."):] 134 return hostname 135 return str(url) 136 137 @property 138 def first_url(self) -> str: 139 for action in self.actions(): 140 if action.TYPE == ActionType.GET: 141 return cast(GetAction, action).url 142 raise RuntimeError("No GET action with an URL found.") 143