1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import argparse 8import dataclasses 9import datetime as dt 10import logging 11from typing import (TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, 12 Type) 13 14from crossbench import exception 15from crossbench import path as pth 16from crossbench.action_runner.action.click import ClickAction 17from crossbench.action_runner.action.enums import ReadyState 18from crossbench.action_runner.action.get import GetAction 19from crossbench.action_runner.action.wait import WaitAction 20from crossbench.benchmarks.loading.config.blocks import ActionBlock 21from crossbench.benchmarks.loading.config.page import PageConfig 22from crossbench.benchmarks.loading.input_source import InputSource 23from crossbench.cli.config.secrets import SecretsConfig 24from crossbench.config import ConfigObject 25from crossbench.parse import DurationParseError, DurationParser, ObjectParser 26 27if TYPE_CHECKING: 28 from crossbench.action_runner.action.action import Action 29 30 31@dataclasses.dataclass(frozen=True) 32class PagesConfig(ConfigObject): 33 pages: Tuple[PageConfig, ...] = () 34 secrets: Optional[SecretsConfig] = None 35 36 def validate(self) -> None: 37 super().validate() 38 for index, page in enumerate(self.pages): 39 assert isinstance(page, PageConfig), ( 40 f"pages[{index}] is not a PageConfig but {type(page).__name__}") 41 42 @classmethod 43 def parse_str(cls, value: str) -> PagesConfig: 44 """ 45 Simple comma-separate config: 46 value = URL, [DURATION], ... 47 """ 48 values: List[str] = [] 49 previous_part: Optional[str] = None 50 for part in value.strip().split(","): 51 part = ObjectParser.non_empty_str(part, "url or duration") 52 try: 53 DurationParser.positive_duration(part) 54 if not previous_part: 55 raise argparse.ArgumentTypeError( 56 "Duration can only follow after url. " 57 f"Current value: {repr(part)}") 58 values[-1] = f"{previous_part},{part}" 59 previous_part = None 60 except DurationParseError: 61 previous_part = part 62 values.append(part) 63 return cls.parse_sequence(values) 64 65 @classmethod 66 def parse_unknown_path(cls, path: pth.LocalPath, **kwargs) -> PagesConfig: 67 # Make sure we get errors for invalid files. 68 return cls.parse_config_path(path, **kwargs) 69 70 @classmethod 71 def parse_other(cls, value: Any, **kwargs) -> PagesConfig: 72 if isinstance(value, (list, tuple)): 73 return cls.parse_sequence(value, **kwargs) 74 return super().parse_other(value, **kwargs) 75 76 @classmethod 77 def parse_sequence(cls, values: Sequence[str]) -> PagesConfig: 78 """ 79 Variant a): List of comma-separate URLs 80 [ "URL,[DURATION]", ... ] 81 """ 82 # TODO: support parsing a list of PageConfig dicts 83 if not values: 84 raise argparse.ArgumentTypeError("Got empty page list.") 85 pages: List[PageConfig] = [] 86 for index, single_line_config in enumerate(values): 87 with exception.annotate_argparsing( 88 f"Parsing pages[{index}]: {repr(single_line_config)}"): 89 pages.append(PageConfig.parse_str(single_line_config)) 90 return PagesConfig(pages=tuple(pages)) 91 92 @classmethod 93 def parse_dict(cls, config: Dict) -> PagesConfig: 94 """ 95 Variant a): 96 { "pages": { "LABEL": PAGE_CONFIG }, "secrets": { ... } } 97 """ 98 with exception.annotate_argparsing("Parsing stories"): 99 if "pages" not in config: 100 raise argparse.ArgumentTypeError( 101 "Config does not provide a 'pages' dict.") 102 secrets: Optional[SecretsConfig] = None 103 if secrets_data := config.get("secrets"): 104 secrets = SecretsConfig.parse(secrets_data) 105 pages_config = ObjectParser.non_empty_dict(config["pages"], "pages") 106 with exception.annotate_argparsing("Parsing config 'pages'"): 107 pages = cls._parse_pages(pages_config, secrets) 108 return PagesConfig(pages, secrets) 109 raise exception.UnreachableError() 110 111 @classmethod 112 def _parse_pages( 113 cls, 114 data: Dict[str, Any], 115 secrets: Optional[SecretsConfig] = None) -> Tuple[PageConfig, ...]: 116 pages = [] 117 for name, page_config in data.items(): 118 with exception.annotate_argparsing(f"Parsing story ...['{name}']"): 119 # TODO: fix secrets on the inner page and on the outer pages config 120 page = PageConfig.parse(page_config, label=name, secrets=secrets) 121 pages.append(page) 122 return tuple(pages) 123 124 125class DevToolsRecorderPagesConfig(PagesConfig): 126 127 @classmethod 128 def parse_str(cls: Type[PagesConfig], value: str) -> PagesConfig: 129 raise NotImplementedError() 130 131 @classmethod 132 def parse_dict(cls, config: Dict[str, Any]) -> DevToolsRecorderPagesConfig: 133 config = ObjectParser.non_empty_dict(config) 134 with exception.annotate_argparsing("Loading DevTools recording file"): 135 title = ObjectParser.non_empty_str(config["title"], "title") 136 actions = cls._parse_steps(config["steps"]) 137 # Use default block 138 blocks = (ActionBlock(actions=actions),) 139 pages = (PageConfig(label=title, blocks=blocks),) 140 return DevToolsRecorderPagesConfig(pages) 141 raise exception.UnreachableError() 142 143 @classmethod 144 def _parse_steps(cls, steps: List[Dict[str, Any]]) -> Tuple[Action, ...]: 145 actions: List[Action] = [] 146 for step in steps: 147 if maybe_actions := cls.parse_step(step): 148 actions.extend(maybe_actions) 149 # TODO(cbruni): make this configurable 150 actions.append(WaitAction(duration=dt.timedelta(seconds=1))) 151 return tuple(actions) 152 153 @classmethod 154 def parse_step(cls, step: Dict[str, Any]) -> List[Action]: 155 step_type: str = step["type"] 156 default_timeout = dt.timedelta(seconds=10) 157 if step_type == "navigate": 158 return [cls._parse_navigate_step(step, default_timeout)] 159 if step_type == "click": 160 return [cls._parse_click_step(step, default_timeout)] 161 if step_type == "setViewport": 162 # Resizing is ignored for now. 163 return [] 164 raise ValueError(f"Unsupported step: {step_type}") 165 166 @classmethod 167 def _parse_navigate_step(cls, step: Dict[str, Any], 168 default_timeout: dt.timedelta) -> Action: 169 del default_timeout 170 return GetAction( # type: ignore 171 step["url"], ready_state=ReadyState.COMPLETE) 172 173 @classmethod 174 def _parse_click_step(cls, step: Dict[str, Any], 175 default_timeout: dt.timedelta) -> Action: 176 selector = cls._parse_selectors(step["selectors"]) 177 return ClickAction( 178 InputSource.JS, 179 selector=selector, 180 scroll_into_view=True, 181 timeout=default_timeout) 182 183 @classmethod 184 def _parse_selectors(cls, selectors: List[List[str]]) -> str: 185 xpath: Optional[str] = None 186 aria: Optional[str] = None 187 text: Optional[str] = None 188 css: Optional[str] = None 189 # Detect all single-element selectors first. 190 for selector_list in selectors: 191 if len(selector_list) != 1: 192 continue 193 selector_candidate = selector_list[0] 194 if not aria and selector_candidate.startswith("aria/"): 195 aria = selector_candidate 196 elif not xpath and selector_candidate.startswith("xpath//"): 197 xpath = selector_candidate 198 elif not text and selector_candidate.startswith("css/"): 199 css = selector_candidate 200 elif not text and selector_candidate.startswith("text/"): 201 text = selector_candidate 202 elif not text and selector_candidate.startswith("pierce/"): 203 # not supported yet. 204 pass 205 else: 206 css = f"css/{selector_candidate}" 207 208 if xpath: 209 assert xpath.startswith("xpath/") 210 return xpath 211 if css: 212 _, css = css.split("css/", maxsplit=1) 213 return css 214 if aria: 215 _, aria = aria.split("aria/", maxsplit=1) 216 return f"[aria-label={repr(aria)}]" 217 if text: 218 _, text = text.split("text/", maxsplit=1) 219 return f"xpath///*[text()={repr(text)}]" 220 221 raise ValueError("Need at least one single element xpath or aria " 222 "selector for click action") 223 224 225class ListPagesConfig(PagesConfig): 226 227 VALID_EXTENSIONS: Tuple[str, ...] = (".txt", ".list") 228 229 @classmethod 230 def parse_str(cls, value: str) -> PagesConfig: 231 raise argparse.ArgumentTypeError( 232 f"URL list file {repr(value)} does not exist.") 233 234 @classmethod 235 def parse_path(cls, path: pth.LocalPath, **kwargs) -> PagesConfig: 236 assert not kwargs, f"{cls.__name__} does not support extra kwargs" 237 pages: List[PageConfig] = [] 238 with exception.annotate_argparsing(f"Loading Pages list file: {path.name}"): 239 line: int = 0 240 with path.open() as f: 241 for single_line_config in f.readlines(): 242 with exception.annotate_argparsing(f"Parsing line {line}"): 243 line += 1 244 single_line_config = single_line_config.strip() 245 if not single_line_config: 246 logging.warning("Skipping empty line %s", line) 247 continue 248 pages.append(PageConfig.parse(single_line_config)) 249 return PagesConfig(pages=tuple(pages)) 250 251 @classmethod 252 def parse_dict(cls, config: Dict) -> PagesConfig: 253 config = ObjectParser.non_empty_dict(config, "pages") 254 with exception.annotate_argparsing("Parsing scenarios / pages"): 255 if "pages" not in config: 256 raise argparse.ArgumentTypeError( 257 "Config does not provide a 'pages' dict.") 258 pages = config["pages"] 259 if isinstance(pages, str): 260 pages = [pages] 261 if not isinstance(pages, (list, tuple)): 262 raise argparse.ArgumentTypeError( 263 f"Expected list/tuple for pages, but got {type(pages)}") 264 return cls.parse_sequence(pages) 265 raise exception.UnreachableError() 266