1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7from typing import TYPE_CHECKING, Iterable, Optional 8 9from crossbench import exception 10from crossbench.action_runner.action_runner_listener import \ 11 ActionRunnerListener 12from crossbench.benchmarks.loading.input_source import InputSource 13 14if TYPE_CHECKING: 15 from crossbench.action_runner.action import all as i_action 16 from crossbench.benchmarks.loading.config.pages import ActionBlock 17 from crossbench.benchmarks.loading.page.base import Page 18 from crossbench.benchmarks.loading.page.combined import CombinedPage 19 from crossbench.benchmarks.loading.page.interactive import InteractivePage 20 from crossbench.benchmarks.loading.tab_controller import TabController 21 from crossbench.path import LocalPath 22 from crossbench.runner.run import Run 23 24 25class ActionNotImplementedError(NotImplementedError): 26 27 def __init__(self, 28 runner: ActionRunner, 29 action: i_action.Action, 30 msg_context: str = "") -> None: 31 self.runner = runner 32 self.action = action 33 34 if msg_context: 35 msg_context = ". Context: " + msg_context 36 37 message = (f"{str(action.TYPE).capitalize()}-action " 38 f"not implemented in {type(runner).__name__}{msg_context}") 39 super().__init__(message) 40 41 42class InputSourceNotImplementedError(ActionNotImplementedError): 43 44 def __init__(self, 45 runner: ActionRunner, 46 action: i_action.Action, 47 input_source: InputSource, 48 msg_context: str = "") -> None: 49 50 if msg_context: 51 msg_context = ". Context: " + msg_context 52 53 input_source_message = (f"Source: '{input_source}'" 54 f"not implemented{msg_context}") 55 56 super().__init__(runner, action, input_source_message) 57 58 59class ActionRunner: 60 61 def __init__(self): 62 self._listener = ActionRunnerListener() 63 64 def set_listener(self, listener): 65 self._listener = listener 66 67 # TODO: Don't share state across runs 68 _info_stack: Optional[exception.TInfoStack] 69 70 # info_stack is a unique identifier for the currently running or most recently 71 # run action. 72 @property 73 def info_stack(self) -> exception.TInfoStack: 74 if not self._info_stack: 75 raise RuntimeError("info_stack can not be called before run_blocks") 76 return self._info_stack 77 78 def run_blocks(self, run: Run, page: InteractivePage, 79 blocks: Iterable[ActionBlock]) -> None: 80 for block in blocks: 81 block.run_with(self, run, page) 82 83 def run_block(self, run, block: ActionBlock) -> None: 84 block_index = block.index 85 # TODO: Instead maybe just pass context down. 86 # Or pass unique path to every action __init__ 87 with exception.annotate(f"Running block {block_index}: {block.label}"): 88 for action_index, action in enumerate(block, start=1): 89 self._info_stack = (f"block_{block_index}", f"action_{action_index}") 90 action.run_with(run, self) 91 92 def wait(self, run: Run, action: i_action.WaitAction) -> None: 93 with run.actions("WaitAction", measure=False) as actions: 94 actions.wait(action.duration) 95 96 def js(self, run: Run, action: i_action.JsAction) -> None: 97 with run.actions("JS", measure=False) as actions: 98 actions.js(action.script, action.timeout) 99 100 def click(self, run: Run, action: i_action.ClickAction) -> None: 101 input_source = action.input_source 102 if input_source is InputSource.JS: 103 self.click_js(run, action) 104 elif input_source is InputSource.TOUCH: 105 self.click_touch(run, action) 106 elif input_source is InputSource.MOUSE: 107 self.click_mouse(run, action) 108 else: 109 raise RuntimeError(f"Unsupported input source: '{input_source}'") 110 111 def scroll(self, run: Run, action: i_action.ScrollAction) -> None: 112 input_source = action.input_source 113 if input_source is InputSource.JS: 114 self.scroll_js(run, action) 115 elif input_source is InputSource.TOUCH: 116 self.scroll_touch(run, action) 117 elif input_source is InputSource.MOUSE: 118 self.scroll_mouse(run, action) 119 else: 120 raise RuntimeError(f"Unsupported input source: '{input_source}'") 121 122 def get(self, run: Run, action: i_action.GetAction) -> None: 123 raise ActionNotImplementedError(self, action) 124 125 def text_input(self, run: Run, action: i_action.TextInputAction) -> None: 126 input_source = action.input_source 127 if input_source is InputSource.JS: 128 self.text_input_js(run, action) 129 elif input_source is InputSource.KEYBOARD: 130 self.text_input_keyboard(run, action) 131 else: 132 raise RuntimeError(f"Unsupported input source: '{input_source}'") 133 134 def click_js(self, run: Run, action: i_action.ClickAction) -> None: 135 raise InputSourceNotImplementedError(self, action, action.input_source) 136 137 def click_touch(self, run: Run, action: i_action.ClickAction) -> None: 138 raise InputSourceNotImplementedError(self, action, action.input_source) 139 140 def click_mouse(self, run: Run, action: i_action.ClickAction) -> None: 141 raise InputSourceNotImplementedError(self, action, action.input_source) 142 143 def scroll_js(self, run: Run, action: i_action.ScrollAction) -> None: 144 raise InputSourceNotImplementedError(self, action, action.input_source) 145 146 def scroll_touch(self, run: Run, action: i_action.ScrollAction) -> None: 147 raise InputSourceNotImplementedError(self, action, action.input_source) 148 149 def scroll_mouse(self, run: Run, action: i_action.ScrollAction) -> None: 150 raise InputSourceNotImplementedError(self, action, action.input_source) 151 152 def text_input_js(self, run: Run, action: i_action.TextInputAction) -> None: 153 raise InputSourceNotImplementedError(self, action, action.input_source) 154 155 def text_input_keyboard(self, run: Run, 156 action: i_action.TextInputAction) -> None: 157 raise InputSourceNotImplementedError(self, action, action.input_source) 158 159 def swipe(self, run: Run, action: i_action.SwipeAction) -> None: 160 raise ActionNotImplementedError(self, action) 161 162 def wait_for_element(self, run: Run, 163 action: i_action.WaitForElementAction) -> None: 164 raise ActionNotImplementedError(self, action) 165 166 def wait_for_ready_state(self, run: Run, 167 action: i_action.WaitForReadyStateAction) -> None: 168 raise ActionNotImplementedError(self, action) 169 170 def inject_new_document_script( 171 self, run: Run, action: i_action.InjectNewDocumentScriptAction) -> None: 172 raise ActionNotImplementedError(self, action) 173 174 def screenshot_impl(self, run: Run, suffix: str) -> None: 175 del run, suffix 176 raise NotImplementedError("screenshot_impl not implemented") 177 178 def screenshot(self, run: Run, action: i_action.ScreenshotAction) -> None: 179 del action 180 with run.actions("Screenshot", measure=False): 181 self.screenshot_impl(run, "screenshot") 182 183 def dump_html_impl(self, run: Run, suffix: str) -> None: 184 del run, suffix 185 raise NotImplementedError("dump_html_impl not implemented") 186 187 def dump_html(self, run: Run, action: i_action.DumpHtmlAction) -> None: 188 del action 189 with run.actions("Dump HTML", measure=False): 190 self.dump_html_impl(run, "dump") 191 192 def _maybe_navigate_to_about_blank(self, run: Run, page: Page) -> None: 193 if duration := page.about_blank_duration: 194 run.browser.show_url("about:blank") 195 run.runner.wait(duration) 196 197 def run_page_multiple_tabs(self, run: Run, tabs: TabController, 198 pages: Iterable[Page]): 199 # TODO: refactor possible logics to TabController. 200 browser = run.browser 201 for _ in tabs: 202 try: 203 for i, page in enumerate(pages): 204 # Create a new tab for the multiple_tab case. 205 if i > 0: 206 browser.switch_to_new_tab() 207 self._listener.handle_new_tab(run) 208 page.run_with(run, self, False) 209 self._listener.handle_page_run(run) 210 browser.switch_to_new_tab() 211 self._listener.handle_new_tab(run) 212 except Exception as e: 213 self._listener.handle_error(run, e) 214 raise 215 216 def run_combined_page(self, run: Run, page: CombinedPage, 217 multiple_tabs: bool): 218 if multiple_tabs: 219 self.run_page_multiple_tabs(run, page.tabs, page.pages) 220 else: 221 for sub_page in page.pages: 222 sub_page.run_with(run, self, False) 223 224 def run_interactive_page_once(self, run: Run, page: InteractivePage): 225 try: 226 self.run_blocks(run, page, page.blocks) 227 self._maybe_navigate_to_about_blank(run, page) 228 except Exception: 229 page.create_failure_artifacts(run) 230 raise 231 232 def run_interactive_page(self, run: Run, page: InteractivePage, 233 multiple_tabs: bool): 234 if multiple_tabs: 235 self.run_page_multiple_tabs(run, page.tabs, [page]) 236 else: 237 self.run_interactive_page_once(run, page) 238 239 def run_setup(self, run: Run, page: InteractivePage, setup: ActionBlock): 240 try: 241 with exception.annotate("setup"): 242 setup.run_with(self, run, page) 243 except Exception: 244 page.create_failure_artifacts(run, "setup-failure") 245 raise 246 247 def run_login(self, run: Run, page: InteractivePage, login: ActionBlock): 248 try: 249 with exception.annotate("login"): 250 login.run_with(self, run, page) 251 except Exception: 252 page.create_failure_artifacts(run, "login-failure") 253 raise 254 255 def switch_tab(self, run: Run, action: i_action.SwitchTabAction): 256 raise ActionNotImplementedError(self, action) 257