1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import atexit 9import logging 10import os 11import time 12import traceback 13from typing import TYPE_CHECKING, Any, List, Optional, Sequence, cast 14 15import selenium.common.exceptions 16import urllib3 17from selenium import webdriver 18from selenium.webdriver.remote.remote_connection import RemoteConnection 19 20from crossbench.browsers.attributes import BrowserAttributes 21from crossbench.browsers.browser import Browser 22from crossbench.types import JsonDict 23 24if TYPE_CHECKING: 25 import datetime as dt 26 27 from selenium.webdriver.common.timeouts import Timeouts 28 29 from crossbench.browsers.settings import Settings 30 from crossbench.env import HostEnvironment 31 from crossbench.path import AnyPath, LocalPath 32 from crossbench.runner.groups.session import BrowserSessionRunGroup 33 34 35class DriverException(RuntimeError): 36 """Wrapper for more readable error messages than the default 37 WebDriver exceptions.""" 38 39 def __init__(self, msg: str, browser: Optional[Browser] = None) -> None: 40 self._browser = browser 41 self._msg = msg 42 super().__init__(msg) 43 44 def __str__(self) -> str: 45 browser_prefix = "" 46 if self._browser: 47 browser_prefix = f"browser={self._browser}: " 48 return f"{browser_prefix}{self._msg}" 49 50 51class WebDriverBrowser(Browser, metaclass=abc.ABCMeta): 52 _private_driver: webdriver.Remote 53 _driver_path: Optional[AnyPath] 54 _driver_pid: int 55 _pid: int 56 log_file: Optional[LocalPath] 57 58 def __init__(self, 59 label: str, 60 path: Optional[AnyPath] = None, 61 settings: Optional[Settings] = None): 62 super().__init__(label, path, settings) 63 self._driver_path = self._settings.driver_path 64 65 @property 66 def attributes(self) -> BrowserAttributes: 67 return BrowserAttributes.WEBDRIVER 68 69 @property 70 def driver_log_file(self) -> LocalPath: 71 log_file = self.log_file 72 assert log_file 73 return log_file.with_suffix(".driver.log") 74 75 def validate_binary(self) -> None: 76 super().validate_binary() 77 self._driver_path = self.platform.absolute(self._find_driver()) 78 # TODO: support remote chromedriver as well 79 assert self.host_platform.exists(self._driver_path), ( 80 f"Webdriver path '{self._driver_path}' does not exist") 81 82 @abc.abstractmethod 83 def _find_driver(self) -> AnyPath: 84 pass 85 86 @abc.abstractmethod 87 def _validate_driver_version(self) -> None: 88 pass 89 90 def validate_env(self, env: HostEnvironment) -> None: 91 super().validate_env(env) 92 self._validate_driver_version() 93 94 def start(self, session: BrowserSessionRunGroup) -> None: 95 assert self._driver_path 96 if timeout := self.http_request_timeout: 97 logging.debug("Setting http request timeout to %s", timeout) 98 RemoteConnection.set_timeout(timeout.total_seconds()) 99 try: 100 self._private_driver = self._start_driver(session, self._driver_path) 101 except selenium.common.exceptions.SessionNotCreatedException as e: 102 msg = e.msg or "Could not create Webdriver session." 103 raise DriverException(msg, self) from e 104 self._is_running = True 105 atexit.register(self.force_quit) 106 self._find_driver_pid() 107 self._set_driver_timeouts(session) 108 self._setup_window() 109 110 def _find_driver_pid(self) -> None: 111 service = getattr(self._private_driver, "service", None) 112 if not service: 113 return 114 self._driver_pid = service.process.pid 115 candidates: List[int] = [] 116 for child in self.platform.process_children(self._driver_pid): 117 if str(child["exe"]) == str(self.path): 118 candidates.append(child["pid"]) 119 if len(candidates) == 1: 120 self._pid = candidates[0] 121 else: 122 logging.debug( 123 "Could not find unique browser process for webdriver: %s, got %s", 124 self, candidates) 125 126 def _set_driver_timeouts(self, session: BrowserSessionRunGroup) -> None: 127 """Adjust the global webdriver timeouts if the runner has custom timeout 128 unit values. 129 If timing.has_no_timeout each value is set to SAFE_MAX_TIMEOUT_TIMEDELTA.""" 130 timing = session.timing 131 if not timing.timeout_unit: 132 return 133 if timing.has_no_timeout: 134 logging.info("Disabling webdriver timeouts") 135 else: 136 factor = timing.timeout_unit.total_seconds() 137 if factor != 1.0: 138 logging.info("Increasing webdriver timeouts by %fx", factor) 139 timeouts: Timeouts = self._private_driver.timeouts 140 if implicit_wait := getattr(timeouts, "implicit_wait", None): 141 timeouts.implicit_wait = timing.timeout_timedelta( 142 implicit_wait).total_seconds() 143 if script := getattr(timeouts, "script", None): 144 timeouts.script = timing.timeout_timedelta(script).total_seconds() 145 if page_load := getattr(timeouts, "page_load", None): 146 timeouts.page_load = timing.timeout_timedelta(page_load).total_seconds() 147 self._private_driver.timeouts = timeouts 148 149 def _setup_window(self) -> None: 150 # Force main window to foreground. 151 self._private_driver.switch_to.window( 152 self._private_driver.current_window_handle) 153 if self.viewport.is_headless: 154 return 155 if self.viewport.is_fullscreen: 156 self._private_driver.fullscreen_window() 157 elif self.viewport.is_maximized: 158 self._private_driver.maximize_window() 159 else: 160 self._private_driver.set_window_position(self.viewport.x, self.viewport.y) 161 self._private_driver.set_window_size(self.viewport.width, 162 self.viewport.height) 163 164 @abc.abstractmethod 165 def _start_driver(self, session: BrowserSessionRunGroup, 166 driver_path: AnyPath) -> webdriver.Remote: 167 pass 168 169 def details_json(self) -> JsonDict: 170 details: JsonDict = super().details_json() 171 log = cast(JsonDict, details["log"]) 172 if self.log_file: 173 log["driver"] = os.fspath(self.driver_log_file) 174 return details 175 176 def show_url(self, url: str, target: Optional[str] = None) -> None: 177 logging.debug("WebDriverBrowser.show_url(%s, %s)", url, target) 178 try: 179 if target in ("_self", None): 180 handles = self._private_driver.window_handles 181 assert handles, "Browser has no more opened windows." 182 self._private_driver.switch_to.window(handles[-1]) 183 elif target == "_new_tab": 184 self._private_driver.switch_to.new_window("tab") 185 elif target == "_new_window": 186 self._private_driver.switch_to.new_window("window") 187 else: 188 raise RuntimeError(f"unexpected target {target}") 189 self._private_driver.get(url) 190 except selenium.common.exceptions.WebDriverException as e: 191 if msg := e.msg: 192 self._wrap_webdriver_exception(e, msg, url) 193 raise 194 195 def switch_to_new_tab(self) -> None: 196 self._private_driver.switch_to.new_window("tab") 197 198 def screenshot(self, path: LocalPath) -> None: 199 if not self._private_driver.get_screenshot_as_file(path.as_posix()): 200 raise DriverException( 201 f"Browser failed to get_screenshot_as_file to file '{path}'", self) 202 203 def _wrap_webdriver_exception( 204 self, e: selenium.common.exceptions.WebDriverException, msg: str, 205 url: str) -> None: 206 if "net::ERR_CONNECTION_REFUSED" in msg: 207 raise DriverException( 208 f"Browser failed to load URL={url}. The URL is likely unreachable.", 209 self) from e 210 if "net::ERR_INTERNET_DISCONNECTED" in msg: 211 raise DriverException( 212 f"Browser failed to load URL={url}. " 213 f"The device is not connected to the internet.", self) from e 214 215 def js( 216 self, 217 script: str, 218 timeout: Optional[dt.timedelta] = None, 219 arguments: Sequence[object] = () 220 ) -> Any: 221 logging.debug("WebDriverBrowser.js() timeout=%s, script: %s", timeout, 222 script) 223 assert self._is_running 224 try: 225 if timeout is not None: 226 assert timeout.total_seconds() > 0, ( 227 f"timeout must be a positive number, got: {timeout}") 228 self._private_driver.set_script_timeout(timeout.total_seconds()) 229 return self._private_driver.execute_script(script, *arguments) 230 except selenium.common.exceptions.WebDriverException as e: 231 # pylint: disable=raise-missing-from 232 raise ValueError(f"Could not execute JS: {e.msg}") 233 234 def close_all_tabs(self) -> None: 235 try: 236 all_handles = self._private_driver.window_handles 237 for handle in all_handles: 238 self._private_driver.switch_to.window(handle) 239 self._private_driver.close() 240 except (selenium.common.exceptions.InvalidSessionIdException, 241 urllib3.exceptions.MaxRetryError) as e: 242 logging.debug("%s: Got errors while closing all tabs: {%s}", self, e) 243 244 def quit(self) -> None: 245 assert self._is_running 246 self.close_all_tabs() 247 self.force_quit() 248 249 def force_quit(self) -> None: 250 if getattr(self, "_private_driver", None) is None or not self._is_running: 251 return 252 atexit.unregister(self.force_quit) 253 logging.debug("WebDriverBrowser.force_quit()") 254 try: 255 try: 256 # Close the current window. 257 self._private_driver.close() 258 time.sleep(0.1) 259 except selenium.common.exceptions.NoSuchWindowException: 260 # No window is good. 261 pass 262 except selenium.common.exceptions.InvalidSessionIdException: 263 # Closing the last tab will close the session as well. 264 return 265 try: 266 self._private_driver.quit() 267 except selenium.common.exceptions.InvalidSessionIdException: 268 return 269 # Sometimes a second quit is needed, ignore any warnings there 270 try: 271 self._private_driver.quit() 272 except Exception as e: # pylint: disable=broad-except 273 logging.debug("Driver raised exception on quit: %s\n%s", e, 274 traceback.format_exc()) 275 return 276 except Exception as e: # pylint: disable=broad-except 277 logging.debug("Could not quit browser: %s\n%s", e, traceback.format_exc()) 278 finally: 279 self._is_running = False 280 281 282class RemoteWebDriver(WebDriverBrowser, Browser): 283 """Represent a remote WebDriver that has already been started""" 284 285 def __init__(self, label: str, driver: webdriver.Remote) -> None: 286 super().__init__(label=label, path=None) 287 self._private_driver = driver 288 self.version: str = driver.capabilities["browserVersion"] 289 self.major_version: int = int(self.version.split(".")[0]) 290 291 @property 292 def type_name(self) -> str: 293 return "remote" 294 295 @property 296 def attributes(self) -> BrowserAttributes: 297 return BrowserAttributes.WEBDRIVER | BrowserAttributes.REMOTE 298 299 def _validate_driver_version(self) -> None: 300 pass 301 302 def _extract_version(self) -> str: 303 raise NotImplementedError() 304 305 def _find_driver(self) -> LocalPath: 306 raise NotImplementedError() 307 308 def _start_driver(self, session: BrowserSessionRunGroup, 309 driver_path: AnyPath) -> webdriver.Remote: 310 raise NotImplementedError() 311 312 def setup_binary(self) -> None: 313 pass 314 315 def start(self, session: BrowserSessionRunGroup) -> None: 316 # Driver has already been started. We just need to mark it as running. 317 self._is_running = True 318 if self.viewport.is_fullscreen: 319 self._private_driver.fullscreen_window() 320 elif self.viewport.is_maximized: 321 self._private_driver.maximize_window() 322 else: 323 self._private_driver.set_window_position(self.viewport.x, self.viewport.y) 324 self._private_driver.set_window_size(self.viewport.width, 325 self.viewport.height) 326 327 def quit(self) -> None: 328 # External code that started the driver is responsible for shutting it down. 329 self._is_running = False 330