• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import datetime as dt
8import logging
9import time
10from typing import TYPE_CHECKING, Callable, Tuple
11
12from crossbench.action_runner.action import all as i_action
13from crossbench.action_runner.action.enums import ReadyState
14from crossbench.action_runner.base import (ActionRunner,
15                                           InputSourceNotImplementedError)
16from crossbench.action_runner.element_not_found_error import \
17    ElementNotFoundError
18from crossbench.probes.dump_html import DumpHtmlProbe, DumpHtmlProbeContext
19from crossbench.probes.screenshot import ScreenshotProbe, ScreenshotProbeContext
20
21if TYPE_CHECKING:
22  from crossbench.runner.actions import Actions
23  from crossbench.runner.run import Run
24
25
26class BasicActionRunner(ActionRunner):
27  XPATH_SELECT_ELEMENT = """
28      let element = document.evaluate(arguments[0], document).iterateNext();
29  """
30
31  CSS_SELECT_ELEMENT = """
32      let element = document.querySelector(arguments[0]);
33  """
34
35  CHECK_ELEMENT_EXISTS = """
36      if (!element) return false;
37  """
38
39  ELEMENT_SCROLL_INTO_VIEW = """
40      element.scrollIntoView();
41  """
42
43  ELEMENT_CLICK = """
44      element.click();
45  """
46
47  RETURN_SUCCESS = """
48      return true;
49  """
50
51  SELECT_WINDOW = """
52      let element = window;
53  """
54
55  SCROLL_ELEMENT_TO = """
56      element.scrollTo({top:arguments[1], behavior:'smooth'});
57  """
58
59  GET_CURRENT_SCROLL_POSITION = """
60      if (!element) return [false, 0];
61      return [true, element[arguments[1]]];
62  """
63
64  def get_selector_script(self,
65                          selector: str,
66                          check_element_exists=False,
67                          scroll_into_view=False,
68                          click=False,
69                          return_on_success=False) -> Tuple[str, str]:
70    # TODO: support more selector types
71
72    script: str = ""
73
74    prefix = "xpath/"
75    if selector.startswith(prefix):
76      selector = selector[len(prefix):]
77      script = self.XPATH_SELECT_ELEMENT
78    else:
79      script = self.CSS_SELECT_ELEMENT
80
81    if check_element_exists:
82      script += self.CHECK_ELEMENT_EXISTS
83
84    if scroll_into_view:
85      script += self.ELEMENT_SCROLL_INTO_VIEW
86
87    if click:
88      script += self.ELEMENT_CLICK
89
90    if return_on_success:
91      script += self.RETURN_SUCCESS
92
93    return selector, script
94
95  def _wait_for_ready_state(self, actions: Actions, ready_state: ReadyState,
96                            timeout: dt.timedelta) -> None:
97    # Make sure we also finish if readyState jumps directly
98    # from "loading" to "complete"
99    actions.wait_js_condition(
100        f"""
101          let state = document.readyState;
102          return state === '{ready_state}' || state === "complete";
103        """, 0.2, timeout.total_seconds())
104
105  def get(self, run: Run, action: i_action.GetAction) -> None:
106    # TODO: potentially refactor the timing and logging out to the base class.
107    start_time = time.time()
108    expected_end_time = start_time + action.duration.total_seconds()
109
110    with run.actions(f"Get {action.url}", measure=False) as actions:
111      actions.show_url(action.url, str(action.target))
112
113      if action.ready_state != ReadyState.ANY:
114        self._wait_for_ready_state(actions, action.ready_state, action.timeout)
115        return
116      # Wait for the given duration from the start of the action.
117      wait_time_seconds = expected_end_time - time.time()
118      if wait_time_seconds > 0:
119        actions.wait(wait_time_seconds)
120      elif action.duration:
121        run_duration = dt.timedelta(seconds=time.time() - start_time)
122        logging.info("%s took longer (%s) than expected action duration (%s).",
123                     action, run_duration, action.duration)
124
125  def click_js(self, run: Run, action: i_action.ClickAction) -> None:
126
127    if action.duration > dt.timedelta():
128      raise InputSourceNotImplementedError(self, action, action.input_source,
129                                           "Non-zero duration not implemented")
130    selector = action.selector
131    if not selector:
132      raise RuntimeError("Missing selector")
133
134    selector, script = self.get_selector_script(
135        selector,
136        check_element_exists=True,
137        scroll_into_view=action.scroll_into_view,
138        click=True,
139        return_on_success=True)
140
141    with run.actions("ClickAction", measure=False) as actions:
142      if not actions.js(script, arguments=[selector]) and action.required:
143        raise ElementNotFoundError(selector)
144
145  def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None:
146    with run.actions("ScrollAction", measure=False) as actions:
147      selector = ""
148      selector_script = self.SELECT_WINDOW
149
150      if action.selector:
151        selector, selector_script = self.get_selector_script(action.selector)
152
153      current_scroll_position_script = (
154          selector_script + self.GET_CURRENT_SCROLL_POSITION)
155
156      found_element, initial_scroll_y = actions.js(
157          current_scroll_position_script,
158          arguments=[selector,
159                     self._get_scroll_field(bool(action.selector))])
160
161      if not found_element:
162        if action.required:
163          raise ElementNotFoundError(selector)
164        return
165
166      do_scroll_script = selector_script + self.SCROLL_ELEMENT_TO
167
168      duration_s = action.duration.total_seconds()
169      distance = action.distance
170
171      start_time = time.time()
172      # TODO: use the chrome.gpuBenchmarking.smoothScrollBy extension
173      # if available.
174      while True:
175        time_delta = time.time() - start_time
176        if time_delta >= duration_s:
177          break
178        scroll_y = initial_scroll_y + time_delta / duration_s * distance
179        actions.js(do_scroll_script, arguments=[selector, scroll_y])
180        actions.wait(0.2)
181      scroll_y = initial_scroll_y + distance
182      actions.js(do_scroll_script, arguments=[selector, scroll_y])
183
184  def wait_for_element(self, run: Run,
185                       action: i_action.WaitForElementAction) -> None:
186    with run.actions("WaitForElementAction", measure=False) as actions:
187      actions.wait_js_condition(
188          f"return !!document.querySelector({repr(action.selector)})", 0.2,
189          action.timeout)
190
191  def wait_for_ready_state(self, run: Run,
192                           action: i_action.WaitForReadyStateAction) -> None:
193    with run.actions(
194        f"Wait for ready state {action.ready_state}", measure=False) as actions:
195      self._wait_for_ready_state(actions, action.ready_state, action.timeout)
196
197  def inject_new_document_script(
198      self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None:
199    run.browser.run_script_on_new_document(action.script)
200
201  def switch_tab(self, run: Run, action: i_action.SwitchTabAction) -> None:
202    with run.actions("SwitchTabAction", measure=False):
203      run.browser.switch_tab(action.title, action.url, action.tab_index,
204                             action.timeout)
205
206  def _get_scroll_field(self, has_selector: bool) -> str:
207    if has_selector:
208      return "scrollTop"
209    return "scrollY"
210
211  def _rate_limit_keystrokes(
212      self, run: Run, action: i_action.TextInputAction,
213      do_type_function: Callable[[Run, Actions, str], None]) -> None:
214    character_delay_s = (action.duration / len(action.text)).total_seconds()
215
216    start_time = time.time()
217
218    action_expected_end_time = start_time + action.duration.total_seconds()
219
220    with run.actions("TextInput", measure=False) as actions:
221
222      # When no duration is specified, input the entire text at once.
223      if action.duration == dt.timedelta():
224        do_type_function(run, actions, action.text)
225        return
226
227      character_expected_end_time = start_time
228
229      for character in action.text:
230        character_expected_end_time += character_delay_s
231
232        do_type_function(run, actions, character)
233
234        expected_end_delta = character_expected_end_time - time.time()
235
236        if expected_end_delta > 0:
237          actions.wait(expected_end_delta)
238
239      overrun_time = time.time() - action_expected_end_time
240
241      # There will always be a slight overrun due to the overhead of the final
242      # actions.wait() call, but that is acceptable. Check if the overrun was
243      # significant.
244      if overrun_time > 0.01:
245        logging.warning(
246            "text_input action is behind schedule! Consider extending this "
247            "action's duration otherwise the action may timeout.")
248
249  def screenshot_impl(self, run: Run, suffix: str) -> None:
250    ctx = run.find_probe_context(ScreenshotProbe)
251    if not ctx:
252      logging.warning("No screenshot probe for screenshot on %s",
253                      repr(self.info_stack))
254      return
255    assert isinstance(ctx, ScreenshotProbeContext)
256    ctx.screenshot("_".join(self.info_stack) + f"_{suffix}")
257
258  def dump_html_impl(self, run: Run, suffix: str) -> None:
259    ctx = run.find_probe_context(DumpHtmlProbe)
260    if not ctx:
261      logging.warning("No dump_html probe for dump on %s",
262                      repr(self.info_stack))
263      return
264    assert isinstance(ctx, DumpHtmlProbeContext)
265    ctx.dump_html("_".join(self.info_stack) + f"_{suffix}")
266