1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import datetime as dt 8import logging 9import time 10from typing import TYPE_CHECKING, Callable, Tuple 11 12from crossbench.action_runner.action import all as i_action 13from crossbench.action_runner.action.enums import ReadyState 14from crossbench.action_runner.base import (ActionRunner, 15 InputSourceNotImplementedError) 16from crossbench.action_runner.element_not_found_error import \ 17 ElementNotFoundError 18from crossbench.probes.dump_html import DumpHtmlProbe, DumpHtmlProbeContext 19from crossbench.probes.screenshot import ScreenshotProbe, ScreenshotProbeContext 20 21if TYPE_CHECKING: 22 from crossbench.runner.actions import Actions 23 from crossbench.runner.run import Run 24 25 26class BasicActionRunner(ActionRunner): 27 XPATH_SELECT_ELEMENT = """ 28 let element = document.evaluate(arguments[0], document).iterateNext(); 29 """ 30 31 CSS_SELECT_ELEMENT = """ 32 let element = document.querySelector(arguments[0]); 33 """ 34 35 CHECK_ELEMENT_EXISTS = """ 36 if (!element) return false; 37 """ 38 39 ELEMENT_SCROLL_INTO_VIEW = """ 40 element.scrollIntoView(); 41 """ 42 43 ELEMENT_CLICK = """ 44 element.click(); 45 """ 46 47 RETURN_SUCCESS = """ 48 return true; 49 """ 50 51 SELECT_WINDOW = """ 52 let element = window; 53 """ 54 55 SCROLL_ELEMENT_TO = """ 56 element.scrollTo({top:arguments[1], behavior:'smooth'}); 57 """ 58 59 GET_CURRENT_SCROLL_POSITION = """ 60 if (!element) return [false, 0]; 61 return [true, element[arguments[1]]]; 62 """ 63 64 def get_selector_script(self, 65 selector: str, 66 check_element_exists=False, 67 scroll_into_view=False, 68 click=False, 69 return_on_success=False) -> Tuple[str, str]: 70 # TODO: support more selector types 71 72 script: str = "" 73 74 prefix = "xpath/" 75 if selector.startswith(prefix): 76 selector = selector[len(prefix):] 77 script = self.XPATH_SELECT_ELEMENT 78 else: 79 script = self.CSS_SELECT_ELEMENT 80 81 if check_element_exists: 82 script += self.CHECK_ELEMENT_EXISTS 83 84 if scroll_into_view: 85 script += self.ELEMENT_SCROLL_INTO_VIEW 86 87 if click: 88 script += self.ELEMENT_CLICK 89 90 if return_on_success: 91 script += self.RETURN_SUCCESS 92 93 return selector, script 94 95 def _wait_for_ready_state(self, actions: Actions, ready_state: ReadyState, 96 timeout: dt.timedelta) -> None: 97 # Make sure we also finish if readyState jumps directly 98 # from "loading" to "complete" 99 actions.wait_js_condition( 100 f""" 101 let state = document.readyState; 102 return state === '{ready_state}' || state === "complete"; 103 """, 0.2, timeout.total_seconds()) 104 105 def get(self, run: Run, action: i_action.GetAction) -> None: 106 # TODO: potentially refactor the timing and logging out to the base class. 107 start_time = time.time() 108 expected_end_time = start_time + action.duration.total_seconds() 109 110 with run.actions(f"Get {action.url}", measure=False) as actions: 111 actions.show_url(action.url, str(action.target)) 112 113 if action.ready_state != ReadyState.ANY: 114 self._wait_for_ready_state(actions, action.ready_state, action.timeout) 115 return 116 # Wait for the given duration from the start of the action. 117 wait_time_seconds = expected_end_time - time.time() 118 if wait_time_seconds > 0: 119 actions.wait(wait_time_seconds) 120 elif action.duration: 121 run_duration = dt.timedelta(seconds=time.time() - start_time) 122 logging.info("%s took longer (%s) than expected action duration (%s).", 123 action, run_duration, action.duration) 124 125 def click_js(self, run: Run, action: i_action.ClickAction) -> None: 126 127 if action.duration > dt.timedelta(): 128 raise InputSourceNotImplementedError(self, action, action.input_source, 129 "Non-zero duration not implemented") 130 selector = action.selector 131 if not selector: 132 raise RuntimeError("Missing selector") 133 134 selector, script = self.get_selector_script( 135 selector, 136 check_element_exists=True, 137 scroll_into_view=action.scroll_into_view, 138 click=True, 139 return_on_success=True) 140 141 with run.actions("ClickAction", measure=False) as actions: 142 if not actions.js(script, arguments=[selector]) and action.required: 143 raise ElementNotFoundError(selector) 144 145 def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None: 146 with run.actions("ScrollAction", measure=False) as actions: 147 selector = "" 148 selector_script = self.SELECT_WINDOW 149 150 if action.selector: 151 selector, selector_script = self.get_selector_script(action.selector) 152 153 current_scroll_position_script = ( 154 selector_script + self.GET_CURRENT_SCROLL_POSITION) 155 156 found_element, initial_scroll_y = actions.js( 157 current_scroll_position_script, 158 arguments=[selector, 159 self._get_scroll_field(bool(action.selector))]) 160 161 if not found_element: 162 if action.required: 163 raise ElementNotFoundError(selector) 164 return 165 166 do_scroll_script = selector_script + self.SCROLL_ELEMENT_TO 167 168 duration_s = action.duration.total_seconds() 169 distance = action.distance 170 171 start_time = time.time() 172 # TODO: use the chrome.gpuBenchmarking.smoothScrollBy extension 173 # if available. 174 while True: 175 time_delta = time.time() - start_time 176 if time_delta >= duration_s: 177 break 178 scroll_y = initial_scroll_y + time_delta / duration_s * distance 179 actions.js(do_scroll_script, arguments=[selector, scroll_y]) 180 actions.wait(0.2) 181 scroll_y = initial_scroll_y + distance 182 actions.js(do_scroll_script, arguments=[selector, scroll_y]) 183 184 def wait_for_element(self, run: Run, 185 action: i_action.WaitForElementAction) -> None: 186 with run.actions("WaitForElementAction", measure=False) as actions: 187 actions.wait_js_condition( 188 f"return !!document.querySelector({repr(action.selector)})", 0.2, 189 action.timeout) 190 191 def wait_for_ready_state(self, run: Run, 192 action: i_action.WaitForReadyStateAction) -> None: 193 with run.actions( 194 f"Wait for ready state {action.ready_state}", measure=False) as actions: 195 self._wait_for_ready_state(actions, action.ready_state, action.timeout) 196 197 def inject_new_document_script( 198 self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None: 199 run.browser.run_script_on_new_document(action.script) 200 201 def switch_tab(self, run: Run, action: i_action.SwitchTabAction) -> None: 202 with run.actions("SwitchTabAction", measure=False): 203 run.browser.switch_tab(action.title, action.url, action.tab_index, 204 action.timeout) 205 206 def _get_scroll_field(self, has_selector: bool) -> str: 207 if has_selector: 208 return "scrollTop" 209 return "scrollY" 210 211 def _rate_limit_keystrokes( 212 self, run: Run, action: i_action.TextInputAction, 213 do_type_function: Callable[[Run, Actions, str], None]) -> None: 214 character_delay_s = (action.duration / len(action.text)).total_seconds() 215 216 start_time = time.time() 217 218 action_expected_end_time = start_time + action.duration.total_seconds() 219 220 with run.actions("TextInput", measure=False) as actions: 221 222 # When no duration is specified, input the entire text at once. 223 if action.duration == dt.timedelta(): 224 do_type_function(run, actions, action.text) 225 return 226 227 character_expected_end_time = start_time 228 229 for character in action.text: 230 character_expected_end_time += character_delay_s 231 232 do_type_function(run, actions, character) 233 234 expected_end_delta = character_expected_end_time - time.time() 235 236 if expected_end_delta > 0: 237 actions.wait(expected_end_delta) 238 239 overrun_time = time.time() - action_expected_end_time 240 241 # There will always be a slight overrun due to the overhead of the final 242 # actions.wait() call, but that is acceptable. Check if the overrun was 243 # significant. 244 if overrun_time > 0.01: 245 logging.warning( 246 "text_input action is behind schedule! Consider extending this " 247 "action's duration otherwise the action may timeout.") 248 249 def screenshot_impl(self, run: Run, suffix: str) -> None: 250 ctx = run.find_probe_context(ScreenshotProbe) 251 if not ctx: 252 logging.warning("No screenshot probe for screenshot on %s", 253 repr(self.info_stack)) 254 return 255 assert isinstance(ctx, ScreenshotProbeContext) 256 ctx.screenshot("_".join(self.info_stack) + f"_{suffix}") 257 258 def dump_html_impl(self, run: Run, suffix: str) -> None: 259 ctx = run.find_probe_context(DumpHtmlProbe) 260 if not ctx: 261 logging.warning("No dump_html probe for dump on %s", 262 repr(self.info_stack)) 263 return 264 assert isinstance(ctx, DumpHtmlProbeContext) 265 ctx.dump_html("_".join(self.info_stack) + f"_{suffix}") 266