• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7from typing import TYPE_CHECKING, Iterable, Optional
8
9from crossbench import exception
10from crossbench.action_runner.action_runner_listener import \
11    ActionRunnerListener
12from crossbench.benchmarks.loading.input_source import InputSource
13
14if TYPE_CHECKING:
15  from crossbench.action_runner.action import all as i_action
16  from crossbench.benchmarks.loading.config.pages import ActionBlock
17  from crossbench.benchmarks.loading.page.base import Page
18  from crossbench.benchmarks.loading.page.combined import CombinedPage
19  from crossbench.benchmarks.loading.page.interactive import InteractivePage
20  from crossbench.benchmarks.loading.tab_controller import TabController
21  from crossbench.path import LocalPath
22  from crossbench.runner.run import Run
23
24
25class ActionNotImplementedError(NotImplementedError):
26
27  def __init__(self,
28               runner: ActionRunner,
29               action: i_action.Action,
30               msg_context: str = "") -> None:
31    self.runner = runner
32    self.action = action
33
34    if msg_context:
35      msg_context = ". Context: " + msg_context
36
37    message = (f"{str(action.TYPE).capitalize()}-action "
38               f"not implemented in {type(runner).__name__}{msg_context}")
39    super().__init__(message)
40
41
42class InputSourceNotImplementedError(ActionNotImplementedError):
43
44  def __init__(self,
45               runner: ActionRunner,
46               action: i_action.Action,
47               input_source: InputSource,
48               msg_context: str = "") -> None:
49
50    if msg_context:
51      msg_context = ". Context: " + msg_context
52
53    input_source_message = (f"Source: '{input_source}'"
54                            f"not implemented{msg_context}")
55
56    super().__init__(runner, action, input_source_message)
57
58
59class ActionRunner:
60
61  def __init__(self):
62    self._listener = ActionRunnerListener()
63
64  def set_listener(self, listener):
65    self._listener = listener
66
67  # TODO: Don't share state across runs
68  _info_stack: Optional[exception.TInfoStack]
69
70  # info_stack is a unique identifier for the currently running or most recently
71  # run action.
72  @property
73  def info_stack(self) -> exception.TInfoStack:
74    if not self._info_stack:
75      raise RuntimeError("info_stack can not be called before run_blocks")
76    return self._info_stack
77
78  def run_blocks(self, run: Run, page: InteractivePage,
79                 blocks: Iterable[ActionBlock]) -> None:
80    for block in blocks:
81      block.run_with(self, run, page)
82
83  def run_block(self, run, block: ActionBlock) -> None:
84    block_index = block.index
85    # TODO: Instead maybe just pass context down.
86    # Or pass unique path to every action __init__
87    with exception.annotate(f"Running block {block_index}: {block.label}"):
88      for action_index, action in enumerate(block, start=1):
89        self._info_stack = (f"block_{block_index}", f"action_{action_index}")
90        action.run_with(run, self)
91
92  def wait(self, run: Run, action: i_action.WaitAction) -> None:
93    with run.actions("WaitAction", measure=False) as actions:
94      actions.wait(action.duration)
95
96  def js(self, run: Run, action: i_action.JsAction) -> None:
97    with run.actions("JS", measure=False) as actions:
98      actions.js(action.script, action.timeout)
99
100  def click(self, run: Run, action: i_action.ClickAction) -> None:
101    input_source = action.input_source
102    if input_source is InputSource.JS:
103      self.click_js(run, action)
104    elif input_source is InputSource.TOUCH:
105      self.click_touch(run, action)
106    elif input_source is InputSource.MOUSE:
107      self.click_mouse(run, action)
108    else:
109      raise RuntimeError(f"Unsupported input source: '{input_source}'")
110
111  def scroll(self, run: Run, action: i_action.ScrollAction) -> None:
112    input_source = action.input_source
113    if input_source is InputSource.JS:
114      self.scroll_js(run, action)
115    elif input_source is InputSource.TOUCH:
116      self.scroll_touch(run, action)
117    elif input_source is InputSource.MOUSE:
118      self.scroll_mouse(run, action)
119    else:
120      raise RuntimeError(f"Unsupported input source: '{input_source}'")
121
122  def get(self, run: Run, action: i_action.GetAction) -> None:
123    raise ActionNotImplementedError(self, action)
124
125  def text_input(self, run: Run, action: i_action.TextInputAction) -> None:
126    input_source = action.input_source
127    if input_source is InputSource.JS:
128      self.text_input_js(run, action)
129    elif input_source is InputSource.KEYBOARD:
130      self.text_input_keyboard(run, action)
131    else:
132      raise RuntimeError(f"Unsupported input source: '{input_source}'")
133
134  def click_js(self, run: Run, action: i_action.ClickAction) -> None:
135    raise InputSourceNotImplementedError(self, action, action.input_source)
136
137  def click_touch(self, run: Run, action: i_action.ClickAction) -> None:
138    raise InputSourceNotImplementedError(self, action, action.input_source)
139
140  def click_mouse(self, run: Run, action: i_action.ClickAction) -> None:
141    raise InputSourceNotImplementedError(self, action, action.input_source)
142
143  def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None:
144    raise InputSourceNotImplementedError(self, action, action.input_source)
145
146  def scroll_touch(self, run: Run, action: i_action.ScrollAction) -> None:
147    raise InputSourceNotImplementedError(self, action, action.input_source)
148
149  def scroll_mouse(self, run: Run, action: i_action.ScrollAction) -> None:
150    raise InputSourceNotImplementedError(self, action, action.input_source)
151
152  def text_input_js(self, run: Run, action: i_action.TextInputAction) -> None:
153    raise InputSourceNotImplementedError(self, action, action.input_source)
154
155  def text_input_keyboard(self, run: Run,
156                          action: i_action.TextInputAction) -> None:
157    raise InputSourceNotImplementedError(self, action, action.input_source)
158
159  def swipe(self, run: Run, action: i_action.SwipeAction) -> None:
160    raise ActionNotImplementedError(self, action)
161
162  def wait_for_element(self, run: Run,
163                       action: i_action.WaitForElementAction) -> None:
164    raise ActionNotImplementedError(self, action)
165
166  def wait_for_ready_state(self, run: Run,
167                           action: i_action.WaitForReadyStateAction) -> None:
168    raise ActionNotImplementedError(self, action)
169
170  def inject_new_document_script(
171      self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None:
172    raise ActionNotImplementedError(self, action)
173
174  def screenshot_impl(self, run: Run, suffix: str) -> None:
175    del run, suffix
176    raise NotImplementedError("screenshot_impl not implemented")
177
178  def screenshot(self, run: Run, action: i_action.ScreenshotAction) -> None:
179    del action
180    with run.actions("Screenshot", measure=False):
181      self.screenshot_impl(run, "screenshot")
182
183  def dump_html_impl(self, run: Run, suffix: str) -> None:
184    del run, suffix
185    raise NotImplementedError("dump_html_impl not implemented")
186
187  def dump_html(self, run: Run, action: i_action.DumpHtmlAction) -> None:
188    del action
189    with run.actions("Dump HTML", measure=False):
190      self.dump_html_impl(run, "dump")
191
192  def _maybe_navigate_to_about_blank(self, run: Run, page: Page) -> None:
193    if duration := page.about_blank_duration:
194      run.browser.show_url("about:blank")
195      run.runner.wait(duration)
196
197  def run_page_multiple_tabs(self, run: Run, tabs: TabController,
198                             pages: Iterable[Page]):
199    # TODO: refactor possible logics to TabController.
200    browser = run.browser
201    for _ in tabs:
202      try:
203        for i, page in enumerate(pages):
204          # Create a new tab for the multiple_tab case.
205          if i > 0:
206            browser.switch_to_new_tab()
207            self._listener.handle_new_tab(run)
208          page.run_with(run, self, False)
209          self._listener.handle_page_run(run)
210        browser.switch_to_new_tab()
211        self._listener.handle_new_tab(run)
212      except Exception as e:
213        self._listener.handle_error(run, e)
214        raise
215
216  def run_combined_page(self, run: Run, page: CombinedPage,
217                        multiple_tabs: bool):
218    if multiple_tabs:
219      self.run_page_multiple_tabs(run, page.tabs, page.pages)
220    else:
221      for sub_page in page.pages:
222        sub_page.run_with(run, self, False)
223
224  def run_interactive_page_once(self, run: Run, page: InteractivePage):
225    try:
226      self.run_blocks(run, page, page.blocks)
227      self._maybe_navigate_to_about_blank(run, page)
228    except Exception:
229      page.create_failure_artifacts(run)
230      raise
231
232  def run_interactive_page(self, run: Run, page: InteractivePage,
233                           multiple_tabs: bool):
234    if multiple_tabs:
235      self.run_page_multiple_tabs(run, page.tabs, [page])
236    else:
237      self.run_interactive_page_once(run, page)
238
239  def run_setup(self, run: Run, page: InteractivePage, setup: ActionBlock):
240    try:
241      with exception.annotate("setup"):
242        setup.run_with(self, run, page)
243    except Exception:
244      page.create_failure_artifacts(run, "setup-failure")
245      raise
246
247  def run_login(self, run: Run, page: InteractivePage, login: ActionBlock):
248    try:
249      with exception.annotate("login"):
250        login.run_with(self, run, page)
251    except Exception:
252      page.create_failure_artifacts(run, "login-failure")
253      raise
254
255  def switch_tab(self, run: Run, action: i_action.SwitchTabAction):
256    raise ActionNotImplementedError(self, action)
257