• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import argparse
8import dataclasses
9import datetime as dt
10import logging
11from typing import (TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple,
12                    Type)
13
14from crossbench import exception
15from crossbench import path as pth
16from crossbench.action_runner.action.click import ClickAction
17from crossbench.action_runner.action.enums import ReadyState
18from crossbench.action_runner.action.get import GetAction
19from crossbench.action_runner.action.wait import WaitAction
20from crossbench.benchmarks.loading.config.blocks import ActionBlock
21from crossbench.benchmarks.loading.config.page import PageConfig
22from crossbench.benchmarks.loading.input_source import InputSource
23from crossbench.cli.config.secrets import SecretsConfig
24from crossbench.config import ConfigObject
25from crossbench.parse import DurationParseError, DurationParser, ObjectParser
26
27if TYPE_CHECKING:
28  from crossbench.action_runner.action.action import Action
29
30
31@dataclasses.dataclass(frozen=True)
32class PagesConfig(ConfigObject):
33  pages: Tuple[PageConfig, ...] = ()
34  secrets: Optional[SecretsConfig] = None
35
36  def validate(self) -> None:
37    super().validate()
38    for index, page in enumerate(self.pages):
39      assert isinstance(page, PageConfig), (
40          f"pages[{index}] is not a PageConfig but {type(page).__name__}")
41
42  @classmethod
43  def parse_str(cls, value: str) -> PagesConfig:
44    """
45    Simple comma-separate config:
46    value = URL, [DURATION], ...
47    """
48    values: List[str] = []
49    previous_part: Optional[str] = None
50    for part in value.strip().split(","):
51      part = ObjectParser.non_empty_str(part, "url or duration")
52      try:
53        DurationParser.positive_duration(part)
54        if not previous_part:
55          raise argparse.ArgumentTypeError(
56              "Duration can only follow after url. "
57              f"Current value: {repr(part)}")
58        values[-1] = f"{previous_part},{part}"
59        previous_part = None
60      except DurationParseError:
61        previous_part = part
62        values.append(part)
63    return cls.parse_sequence(values)
64
65  @classmethod
66  def parse_unknown_path(cls, path: pth.LocalPath, **kwargs) -> PagesConfig:
67    # Make sure we get errors for invalid files.
68    return cls.parse_config_path(path, **kwargs)
69
70  @classmethod
71  def parse_other(cls, value: Any, **kwargs) -> PagesConfig:
72    if isinstance(value, (list, tuple)):
73      return cls.parse_sequence(value, **kwargs)
74    return super().parse_other(value, **kwargs)
75
76  @classmethod
77  def parse_sequence(cls, values: Sequence[str]) -> PagesConfig:
78    """
79    Variant a): List of comma-separate URLs
80      [ "URL,[DURATION]", ... ]
81    """
82    # TODO: support parsing a list of PageConfig dicts
83    if not values:
84      raise argparse.ArgumentTypeError("Got empty page list.")
85    pages: List[PageConfig] = []
86    for index, single_line_config in enumerate(values):
87      with exception.annotate_argparsing(
88          f"Parsing pages[{index}]: {repr(single_line_config)}"):
89        pages.append(PageConfig.parse_str(single_line_config))
90    return PagesConfig(pages=tuple(pages))
91
92  @classmethod
93  def parse_dict(cls, config: Dict) -> PagesConfig:
94    """
95    Variant a):
96      { "pages": { "LABEL": PAGE_CONFIG }, "secrets": { ... } }
97    """
98    with exception.annotate_argparsing("Parsing stories"):
99      if "pages" not in config:
100        raise argparse.ArgumentTypeError(
101            "Config does not provide a 'pages' dict.")
102      secrets: Optional[SecretsConfig] = None
103      if secrets_data := config.get("secrets"):
104        secrets = SecretsConfig.parse(secrets_data)
105      pages_config = ObjectParser.non_empty_dict(config["pages"], "pages")
106      with exception.annotate_argparsing("Parsing config 'pages'"):
107        pages = cls._parse_pages(pages_config, secrets)
108        return PagesConfig(pages, secrets)
109    raise exception.UnreachableError()
110
111  @classmethod
112  def _parse_pages(
113      cls,
114      data: Dict[str, Any],
115      secrets: Optional[SecretsConfig] = None) -> Tuple[PageConfig, ...]:
116    pages = []
117    for name, page_config in data.items():
118      with exception.annotate_argparsing(f"Parsing story ...['{name}']"):
119        # TODO: fix secrets on the inner page and on the outer pages config
120        page = PageConfig.parse(page_config, label=name, secrets=secrets)
121        pages.append(page)
122    return tuple(pages)
123
124
125class DevToolsRecorderPagesConfig(PagesConfig):
126
127  @classmethod
128  def parse_str(cls: Type[PagesConfig], value: str) -> PagesConfig:
129    raise NotImplementedError()
130
131  @classmethod
132  def parse_dict(cls, config: Dict[str, Any]) -> DevToolsRecorderPagesConfig:
133    config = ObjectParser.non_empty_dict(config)
134    with exception.annotate_argparsing("Loading DevTools recording file"):
135      title = ObjectParser.non_empty_str(config["title"], "title")
136      actions = cls._parse_steps(config["steps"])
137      # Use default block
138      blocks = (ActionBlock(actions=actions),)
139      pages = (PageConfig(label=title, blocks=blocks),)
140      return DevToolsRecorderPagesConfig(pages)
141    raise exception.UnreachableError()
142
143  @classmethod
144  def _parse_steps(cls, steps: List[Dict[str, Any]]) -> Tuple[Action, ...]:
145    actions: List[Action] = []
146    for step in steps:
147      if maybe_actions := cls.parse_step(step):
148        actions.extend(maybe_actions)
149        # TODO(cbruni): make this configurable
150        actions.append(WaitAction(duration=dt.timedelta(seconds=1)))
151    return tuple(actions)
152
153  @classmethod
154  def parse_step(cls, step: Dict[str, Any]) -> List[Action]:
155    step_type: str = step["type"]
156    default_timeout = dt.timedelta(seconds=10)
157    if step_type == "navigate":
158      return [cls._parse_navigate_step(step, default_timeout)]
159    if step_type == "click":
160      return [cls._parse_click_step(step, default_timeout)]
161    if step_type == "setViewport":
162      # Resizing is ignored for now.
163      return []
164    raise ValueError(f"Unsupported step: {step_type}")
165
166  @classmethod
167  def _parse_navigate_step(cls, step: Dict[str, Any],
168                           default_timeout: dt.timedelta) -> Action:
169    del default_timeout
170    return GetAction(  # type: ignore
171        step["url"], ready_state=ReadyState.COMPLETE)
172
173  @classmethod
174  def _parse_click_step(cls, step: Dict[str, Any],
175                        default_timeout: dt.timedelta) -> Action:
176    selector = cls._parse_selectors(step["selectors"])
177    return ClickAction(
178        InputSource.JS,
179        selector=selector,
180        scroll_into_view=True,
181        timeout=default_timeout)
182
183  @classmethod
184  def _parse_selectors(cls, selectors: List[List[str]]) -> str:
185    xpath: Optional[str] = None
186    aria: Optional[str] = None
187    text: Optional[str] = None
188    css: Optional[str] = None
189    # Detect all single-element selectors first.
190    for selector_list in selectors:
191      if len(selector_list) != 1:
192        continue
193      selector_candidate = selector_list[0]
194      if not aria and selector_candidate.startswith("aria/"):
195        aria = selector_candidate
196      elif not xpath and selector_candidate.startswith("xpath//"):
197        xpath = selector_candidate
198      elif not text and selector_candidate.startswith("css/"):
199        css = selector_candidate
200      elif not text and selector_candidate.startswith("text/"):
201        text = selector_candidate
202      elif not text and selector_candidate.startswith("pierce/"):
203        # not supported yet.
204        pass
205      else:
206        css = f"css/{selector_candidate}"
207
208    if xpath:
209      assert xpath.startswith("xpath/")
210      return xpath
211    if css:
212      _, css = css.split("css/", maxsplit=1)
213      return css
214    if aria:
215      _, aria = aria.split("aria/", maxsplit=1)
216      return f"[aria-label={repr(aria)}]"
217    if text:
218      _, text = text.split("text/", maxsplit=1)
219      return f"xpath///*[text()={repr(text)}]"
220
221    raise ValueError("Need at least one single element xpath or aria "
222                     "selector for click action")
223
224
225class ListPagesConfig(PagesConfig):
226
227  VALID_EXTENSIONS: Tuple[str, ...] = (".txt", ".list")
228
229  @classmethod
230  def parse_str(cls, value: str) -> PagesConfig:
231    raise argparse.ArgumentTypeError(
232        f"URL list file {repr(value)} does not exist.")
233
234  @classmethod
235  def parse_path(cls, path: pth.LocalPath, **kwargs) -> PagesConfig:
236    assert not kwargs, f"{cls.__name__} does not support extra kwargs"
237    pages: List[PageConfig] = []
238    with exception.annotate_argparsing(f"Loading Pages list file: {path.name}"):
239      line: int = 0
240      with path.open() as f:
241        for single_line_config in f.readlines():
242          with exception.annotate_argparsing(f"Parsing line {line}"):
243            line += 1
244            single_line_config = single_line_config.strip()
245            if not single_line_config:
246              logging.warning("Skipping empty line %s", line)
247              continue
248            pages.append(PageConfig.parse(single_line_config))
249    return PagesConfig(pages=tuple(pages))
250
251  @classmethod
252  def parse_dict(cls, config: Dict) -> PagesConfig:
253    config = ObjectParser.non_empty_dict(config, "pages")
254    with exception.annotate_argparsing("Parsing scenarios / pages"):
255      if "pages" not in config:
256        raise argparse.ArgumentTypeError(
257            "Config does not provide a 'pages' dict.")
258      pages = config["pages"]
259      if isinstance(pages, str):
260        pages = [pages]
261      if not isinstance(pages, (list, tuple)):
262        raise argparse.ArgumentTypeError(
263            f"Expected list/tuple for pages, but got {type(pages)}")
264      return cls.parse_sequence(pages)
265    raise exception.UnreachableError()
266