1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import atexit 9import datetime as dt 10import json 11import logging 12import os 13import re 14import shutil 15import stat 16import tempfile 17import urllib.error 18import zipfile 19from typing import (TYPE_CHECKING, Any, Dict, Final, Iterable, List, Optional, 20 Sequence, Tuple, Type, cast) 21 22import hjson 23from immutabledict import immutabledict 24from selenium.webdriver.chromium.options import ChromiumOptions 25from selenium.webdriver.chromium.service import ChromiumService 26from selenium.webdriver.chromium.webdriver import ChromiumDriver 27from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver 28 29from crossbench import exception, helper 30from crossbench import path as pth 31from crossbench import plt 32from crossbench.browsers.attributes import BrowserAttributes 33from crossbench.browsers.chromium.chromium import Chromium 34from crossbench.browsers.chromium.version import (ChromeDriverVersion, 35 ChromiumVersion) 36from crossbench.browsers.webdriver import WebDriverBrowser 37from crossbench.cli.config.secret_type import SecretType 38from crossbench.flags.chrome import ChromeFlags 39from crossbench.plt.android_adb import AndroidAdbPlatform 40from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform 41from crossbench.plt.linux_ssh import LinuxSshPlatform 42 43if TYPE_CHECKING: 44 from selenium import webdriver 45 46 from crossbench.browsers.settings import Settings 47 from crossbench.cli.config.secrets import Secret 48 from crossbench.flags.base import FlagsT 49 from crossbench.plt.base import Platform 50 from crossbench.runner.groups.session import BrowserSessionRunGroup 51 52 53class ChromiumWebDriver(WebDriverBrowser, Chromium, metaclass=abc.ABCMeta): 54 55 WEB_DRIVER_OPTIONS: Type[ChromiumOptions] = ChromiumOptions 56 WEB_DRIVER_SERVICE: Type[ChromiumService] = ChromiumService 57 58 @property 59 def attributes(self) -> BrowserAttributes: 60 return (BrowserAttributes.CHROMIUM | BrowserAttributes.CHROMIUM_BASED 61 | BrowserAttributes.WEBDRIVER) 62 63 def use_local_chromedriver(self) -> bool: 64 return self.major_version == 0 or self.is_locally_compiled() 65 66 def is_locally_compiled(self) -> bool: 67 return pth.LocalPath(self.app_path.parent / "args.gn").exists() 68 69 def _execute_cdp_cmd(self, driver: webdriver.Remote, cmd: str, 70 cmd_args: dict): 71 return driver.execute("executeCdpCommand", { 72 "cmd": cmd, 73 "params": cmd_args 74 })["value"] 75 76 def _find_driver(self) -> pth.AnyPath: 77 if self._driver_path: 78 return self._driver_path 79 finder = ChromeDriverFinder(self) 80 assert self.app_path 81 if self.use_local_chromedriver(): 82 return finder.find_local_build() 83 try: 84 return finder.download() 85 except DriverNotFoundError as original_download_error: 86 logging.debug( 87 "Could not download chromedriver, " 88 "falling back to finding local build: %s", original_download_error) 89 try: 90 return finder.find_local_build() 91 except DriverNotFoundError as e: 92 logging.debug("Could not find fallback chromedriver: %s", e) 93 raise original_download_error from e 94 # to make an old pytype version happy 95 return pth.LocalPath() 96 97 def _start_driver(self, session: BrowserSessionRunGroup, 98 driver_path: pth.AnyPath) -> webdriver.Remote: 99 return self._start_chromedriver(session, driver_path) 100 101 def _start_chromedriver(self, session: BrowserSessionRunGroup, 102 driver_path: pth.AnyPath) -> ChromiumDriver: 103 assert not self._is_running 104 assert self.log_file 105 args = self._get_browser_flags_for_session(session) 106 options = self._create_options(session, args) 107 108 self._log_browser_start(args, driver_path) 109 service_args: List[str] = [] 110 log_path: Optional[str] = None 111 if self._settings.driver_logging: 112 service_args += ["--verbose"] 113 log_path = os.fspath(self.driver_log_file) 114 # pytype: disable=wrong-keyword-args 115 service = self.WEB_DRIVER_SERVICE( 116 executable_path=os.fspath(driver_path), 117 log_path=log_path, 118 service_args=service_args) 119 # TODO: support remote platforms 120 service.log_file = pth.LocalPath(self.stdout_log_file).open( # pylint: disable=consider-using-with 121 "w", encoding="utf-8") 122 driver = self._create_driver(options, service) 123 # pytype: enable=wrong-keyword-args 124 # Prevent debugging overhead. 125 self._execute_cdp_cmd(driver, "Runtime.setMaxCallStackSizeToCapture", 126 {"size": 0}) 127 return driver 128 129 def _create_options(self, session: BrowserSessionRunGroup, 130 args: Sequence[str]) -> ChromiumOptions: 131 assert not self._is_running 132 options: ChromiumOptions = self.WEB_DRIVER_OPTIONS() 133 options.set_capability("browserVersion", str(self.major_version)) 134 # Don't wait for document-ready. 135 options.set_capability("pageLoadStrategy", "eager") 136 for arg in args: 137 options.add_argument(arg) 138 options.binary_location = os.fspath(self.path) 139 session.setup_selenium_options(options) 140 return options 141 142 @abc.abstractmethod 143 def _create_driver(self, options: ChromiumOptions, 144 service: ChromiumService) -> ChromiumDriver: 145 pass 146 147 def _validate_driver_version(self) -> None: 148 assert self._driver_path, "No driver available" 149 error_message = None 150 if self.is_local and is_build_dir( 151 self.platform.local_path(self.app_path.parent)): 152 error_message = self._validate_locally_built_driver( 153 self.platform.local_path(self._driver_path)) 154 else: 155 error_message = self._validate_any_driver_version(self._driver_path) 156 if error_message: 157 raise RuntimeError("\n".join(error_message)) 158 159 def _validate_locally_built_driver( 160 self, driver_path: pth.LocalPath) -> Optional[Iterable[str]]: 161 # TODO: migrate to version object on the browser 162 browser_version = ChromiumVersion.parse(self.version) 163 driver_version = ChromeDriverVersion.parse( 164 self.platform.app_version(driver_path)) 165 if browser_version.parts == driver_version.parts: 166 return None 167 return (f"Chromedriver version mismatch: driver={driver_version.parts_str} " 168 f"browser={browser_version.parts_str} ({self}).", 169 build_chromedriver_instructions(driver_path.parent)) 170 171 def _validate_any_driver_version( 172 self, driver_path: pth.AnyPath) -> Optional[Iterable[str]]: 173 raw_version_str = self.host_platform.sh_stdout(driver_path, "--version") 174 driver_version = ChromeDriverVersion.parse(raw_version_str) 175 if driver_version.major == self.major_version: 176 return None 177 return (f"Chromedriver version mismatch: driver={driver_version} " 178 f"browser={self.version} ({self})",) 179 180 def run_script_on_new_document(self, script: str) -> None: 181 self._execute_cdp_cmd(self._private_driver, 182 "Page.addScriptToEvaluateOnNewDocument", 183 {"source": script}) 184 185 def current_window_id(self) -> str: 186 return str(self._private_driver.current_window_handle) 187 188 def switch_window(self, window_id: str) -> None: 189 self._private_driver.switch_to.window(window_id) 190 191 def switch_tab( 192 self, 193 title: Optional[re.Pattern] = None, 194 url: Optional[re.Pattern] = None, 195 tab_index: Optional[int] = None, 196 timeout: dt.timedelta = dt.timedelta(seconds=0) 197 ) -> None: 198 driver = self._private_driver 199 original_handle = driver.current_window_handle 200 for _ in helper.wait_with_backoff(timeout, self.platform): 201 # Search through other handles starting from current_window_handle + 1 202 try: 203 i = driver.window_handles.index(original_handle) 204 except ValueError as e: 205 raise RuntimeError("Original starting tab no longer exists") from e 206 207 if tab_index is not None: 208 handles = [driver.window_handles[tab_index]] 209 else: 210 handles = driver.window_handles[i + 1:] + driver.window_handles[:i] 211 212 for handle in handles: 213 driver.switch_to.window(handle) 214 if title is not None: 215 if title.match(driver.title) is None: 216 continue 217 if url is not None: 218 if url.match(driver.current_url) is None: 219 continue 220 return 221 error = "No new tab found" 222 if title is not None: 223 error += f" with title matching {repr(title.pattern)}" 224 if url is not None: 225 error += f" with url matching {repr(url.pattern)}" 226 if tab_index is not None: 227 error += f" with tab_index matching {tab_index}" 228 raise RuntimeError(error) 229 230 def start_profiling(self) -> None: 231 assert isinstance(self._private_driver, ChromiumDriver) 232 # TODO: reuse the TraceProbe categories, 233 self._execute_cdp_cmd( 234 self._private_driver, "Tracing.start", { 235 "transferMode": 236 "ReturnAsStream", 237 "includedCategories": [ 238 "devtools.timeline", 239 "v8.execute", 240 "disabled-by-default-devtools.timeline", 241 "disabled-by-default-devtools.timeline.frame", 242 "toplevel", 243 "blink.console", 244 "blink.user_timing", 245 "latencyInfo", 246 "disabled-by-default-devtools.timeline.stack", 247 "disabled-by-default-v8.cpu_profiler", 248 ], 249 }) 250 251 def stop_profiling(self) -> Any: 252 assert isinstance(self._private_driver, ChromiumDriver) 253 data = self._execute_cdp_cmd(self._private_driver, 254 "Tracing.tracingComplete", {}) 255 # TODO: use webdriver bidi to get the async Tracing.end event. 256 # self._execute_cdp_cmd(self._driver, "Tracing.end", {}) 257 return data 258 259 260# Android is high-tech and reads chrome flags from an app-specific file. 261# TODO: extend support to more than just chrome. 262_FLAG_ROOT: pth.AnyPosixPath = pth.AnyPosixPath("/data/local/tmp/") 263FLAGS_WEBLAYER: pth.AnyPosixPath = _FLAG_ROOT / "weblayer-command-line" 264FLAGS_WEBVIEW: pth.AnyPosixPath = _FLAG_ROOT / "webview-command-line" 265FLAGS_CONTENT_SHELL: pth.AnyPosixPath = ( 266 _FLAG_ROOT / "content-shell-command-line") 267FLAGS_CHROME: pth.AnyPosixPath = _FLAG_ROOT / "chrome-command-line" 268 269 270class ChromiumWebDriverAndroid(ChromiumWebDriver): 271 272 def __init__(self, 273 label: str, 274 path: Optional[pth.AnyPath] = None, 275 settings: Optional[Settings] = None): 276 assert settings, "Android browser needs custom settings and platform" 277 self._chrome_command_line_path: pth.AnyPath = FLAGS_CHROME 278 self._previous_command_line_contents: Optional[str] = None 279 super().__init__(label, path, settings) 280 self._android_package: str = self._lookup_android_package(self.path) 281 if not self._android_package: 282 raise RuntimeError("Could not find matching adb package for " 283 f"{self.path} on {self.platform}") 284 285 def _lookup_android_package(self, path: pth.AnyPath) -> str: 286 return self.platform.app_path_to_package(path) 287 288 @property 289 def android_package(self) -> str: 290 return self._android_package 291 292 @property 293 def platform(self) -> AndroidAdbPlatform: 294 assert isinstance( 295 self._platform, 296 AndroidAdbPlatform), (f"Invalid platform: {self._platform}") 297 return cast(AndroidAdbPlatform, self._platform) 298 299 def _resolve_binary(self, path: pth.AnyPath) -> pth.AnyPath: 300 return path 301 302 # TODO: implement setting a clean profile on android 303 _UNSUPPORTED_FLAGS: Tuple[str, ...] = ( 304 "--user-data-dir", 305 "--disable-sync", 306 "--window-size", 307 "--window-position", 308 ) 309 310 def _filter_flags_for_run(self, flags: FlagsT) -> FlagsT: 311 assert isinstance(flags, ChromeFlags) 312 chrome_flags = cast(ChromeFlags, flags) 313 for flag in self._UNSUPPORTED_FLAGS: 314 if flag not in chrome_flags: 315 continue 316 flag_value = chrome_flags.pop(flag, None) 317 logging.debug("Chrome Android: Removed unsupported flag: %s=%s", flag, 318 flag_value) 319 return chrome_flags 320 321 def _start_driver(self, session: BrowserSessionRunGroup, 322 driver_path: pth.AnyPath) -> webdriver.Remote: 323 self.adb_force_stop() 324 if session.browser.wipe_system_user_data: 325 self.adb_force_clear() 326 self.platform.adb.grant_notification_permissions(self.android_package) 327 self._backup_chrome_flags() 328 atexit.register(self._restore_chrome_flags) 329 return self._start_chromedriver(session, driver_path) 330 331 def _backup_chrome_flags(self) -> None: 332 assert self._previous_command_line_contents is None 333 self._previous_command_line_contents = self._read_device_flags() 334 335 def _read_device_flags(self) -> Optional[str]: 336 if not self.platform.exists(self._chrome_command_line_path): 337 return None 338 return self.platform.cat(self._chrome_command_line_path) 339 340 def adb_force_stop(self) -> None: 341 self.platform.adb.force_stop(self.android_package) 342 343 def adb_force_clear(self) -> None: 344 self.platform.adb.force_clear(self.android_package) 345 346 def force_quit(self) -> None: 347 try: 348 try: 349 super().force_quit() 350 finally: 351 self.adb_force_stop() 352 finally: 353 self._restore_chrome_flags() 354 355 def _restore_chrome_flags(self) -> None: 356 atexit.unregister(self._restore_chrome_flags) 357 current_flags = self._read_device_flags() 358 if current_flags != self._previous_command_line_contents: 359 logging.warning("%s: flags file changed during run", self) 360 logging.debug("before: %s", self._previous_command_line_contents) 361 logging.debug("current: %s", current_flags) 362 if self._previous_command_line_contents is None: 363 logging.debug("%s: deleting chrome flags file: %s", self, 364 self._chrome_command_line_path) 365 self.platform.rm(self._chrome_command_line_path, missing_ok=True) 366 else: 367 logging.debug("%s: restoring previous flags file contents in %s", self, 368 self._chrome_command_line_path) 369 self.platform.set_file_contents(self._chrome_command_line_path, 370 self._previous_command_line_contents) 371 self._previous_command_line_contents = None 372 373 def _create_options(self, session: BrowserSessionRunGroup, 374 args: Sequence[str]) -> ChromiumOptions: 375 options: ChromiumOptions = super()._create_options(session, args) 376 options.binary_location = "" 377 options.add_experimental_option("androidPackage", self.android_package) 378 options.add_experimental_option("androidDeviceSerial", 379 self.platform.adb.serial_id) 380 return options 381 382 def setup_binary(self) -> None: 383 super().setup_binary() 384 self.platform.adb.grant_notification_permissions(self.android_package) 385 386 387class LocalChromiumWebDriverAndroid(ChromiumWebDriverAndroid): 388 """ 389 Custom version that uses a locally built bundle wrapper. 390 https://chromium.googlesource.com/chromium/src/+/HEAD/docs/android_build_instructions.md 391 """ 392 393 @classmethod 394 def is_apk_helper(cls, path: Optional[pth.AnyPath]) -> bool: 395 if not path or len(path.parts) == 1: 396 return False 397 return path.name.endswith("_apk") 398 399 def __init__(self, 400 label: str, 401 path: Optional[pth.AnyPath] = None, 402 settings: Optional[Settings] = None): 403 if self.is_apk_helper(path): 404 raise ValueError( 405 "Locally built chrome version needs package, got empty path") 406 assert settings, "Android browser needs custom settings and platform" 407 self._package_info: immutabledict[str, Any] = self._parse_package_info( 408 settings.platform, path) 409 super().__init__(label, path, settings) 410 411 def _lookup_android_package(self, path: pth.AnyPath) -> str: 412 return self._package_info["Package name"] 413 414 def _extract_version(self) -> str: 415 return self._package_info["versionName"] 416 417 def _parse_package_info(self, platform: plt.Platform, 418 path: pth.AnyPath) -> immutabledict[str, Any]: 419 output = platform.host_platform.sh_stdout( 420 path, "package-info").rstrip().splitlines() 421 package_info = {} 422 for line in output: 423 key, value = line.split(": ") 424 package_info[key] = hjson.loads(value) 425 return immutabledict(package_info) 426 427 def setup_binary(self) -> None: 428 super().setup_binary() 429 self.host_platform.sh_stdout(self.path, "install", 430 f"--device={self.platform.serial_id}") 431 432 433class ChromiumWebDriverSsh(ChromiumWebDriver): 434 435 @property 436 def platform(self) -> LinuxSshPlatform: 437 assert isinstance(self._platform, 438 LinuxSshPlatform), (f"Invalid platform: {self._platform}") 439 return cast(LinuxSshPlatform, self._platform) 440 441 def _start_driver(self, session: BrowserSessionRunGroup, 442 driver_path: pth.AnyPath) -> RemoteWebDriver: 443 del driver_path 444 args = self._get_browser_flags_for_session(session) 445 options = self._create_options(session, args) 446 platform = self.platform 447 host = platform.host 448 port = platform.port 449 driver = RemoteWebDriver(f"http://{host}:{port}", options=options) 450 return driver 451 452 453class ChromiumWebDriverChromeOsSsh(ChromiumWebDriver): 454 455 @property 456 def platform(self) -> ChromeOsSshPlatform: 457 assert isinstance( 458 self._platform, 459 ChromeOsSshPlatform), (f"Invalid platform: {self._platform}") 460 return cast(ChromeOsSshPlatform, self._platform) 461 462 def _start_driver(self, session: BrowserSessionRunGroup, 463 driver_path: pth.AnyPath) -> RemoteWebDriver: 464 del driver_path 465 platform = self.platform 466 host = platform.host 467 port = platform.port 468 args = self._get_browser_flags_for_session(session) 469 # TODO(spadhi): correctly handle flags: 470 # 1. decide which flags to pass to chrome vs chromedriver 471 # 2. investigate irrelevant / unsupported flags on ChromeOS 472 # 3. filter out and pass the chrome flags to the debugging session below 473 # 4. pass the remaining flags to RemoteWebDriver options 474 google_login = session.browser.secrets.get(SecretType.GOOGLE) 475 if google_login: 476 dbg_port = platform.create_debugging_session( 477 username=google_login.username, password=google_login.password) 478 else: 479 dbg_port = platform.create_debugging_session() 480 options = self._create_options(session, args) 481 options.add_experimental_option("debuggerAddress", f"127.0.0.1:{dbg_port}") 482 driver = RemoteWebDriver(f"http://{host}:{port}", options=options) 483 return driver 484 485 # On ChromeOS, the system profile is the same as the browser profile. 486 def is_logged_in(self, secret: Secret, strict: bool = False) -> bool: 487 if secret.type != SecretType.GOOGLE: 488 return False 489 if secret.username == self.platform.username: 490 return True 491 if not strict: 492 return False 493 raise RuntimeError("Login of non-primary Google accounts not supported") 494 495 496class DriverNotFoundError(ValueError): 497 pass 498 499 500def build_chromedriver_instructions(build_dir: pth.AnyPath) -> str: 501 return ("Please build 'chromedriver' manually for local builds:\n" 502 f" autoninja -C {build_dir} chromedriver") 503 504 505def is_build_dir(path: pth.LocalPath, 506 platform: plt.Platform = plt.PLATFORM) -> bool: 507 return platform.is_file(path / "args.gn") 508 509 510class ChromeDriverFinder: 511 driver_path: pth.LocalPath 512 513 def __init__(self, browser: ChromiumWebDriver): 514 self.browser = browser 515 self.platform: Platform = browser.platform 516 self.host_platform: Platform = browser.platform.host_platform 517 extension: str = "" 518 if self.host_platform.is_win: 519 extension = ".exe" 520 cache_dir = self.host_platform.local_cache_dir("driver") 521 self.driver_path: pth.LocalPath = ( 522 cache_dir / f"chromedriver-{self.browser.major_version}{extension}") 523 self._validate_browser() 524 525 def _validate_browser(self) -> None: 526 browser_platform = self.browser.platform 527 if browser_platform.is_local: 528 return 529 # Some remote platforms rely on a local chromedriver 530 if (browser_platform.is_android or browser_platform.is_remote_ssh): 531 return 532 raise RuntimeError("Cannot download chromedriver for remote browser yet") 533 534 def find_local_build(self) -> pth.LocalPath: 535 assert self.browser.app_path 536 # assume it's a local build 537 lookup_dir = pth.LocalPath(self.browser.app_path.parent) 538 driver_path = lookup_dir / "chromedriver" 539 if self.platform.is_win: 540 driver_path = driver_path.with_suffix(".exe") 541 if self.platform.is_file(driver_path): 542 return driver_path 543 error_message: List[str] = [f"Driver '{driver_path}' does not exist."] 544 if is_build_dir(lookup_dir, self.platform): 545 error_message += [build_chromedriver_instructions(lookup_dir)] 546 else: 547 error_message += ["Please manually provide a chromedriver binary."] 548 raise DriverNotFoundError("\n".join(error_message)) 549 550 def download(self) -> pth.LocalPath: 551 if not self.platform.is_file(self.driver_path): 552 with exception.annotate( 553 f"Downloading chromedriver for {self.browser.version}"): 554 self._download() 555 return self.driver_path 556 557 def _download(self) -> None: 558 milestone = self.browser.major_version 559 logging.info("CHROMEDRIVER Downloading from %s v%s", self.browser.type_name, 560 milestone) 561 url: Optional[str] = None 562 listing_url: Optional[str] = None 563 if milestone >= self.CFT_MIN_MILESTONE: 564 listing_url, url = self._get_cft_url(milestone) 565 if not url: 566 listing_url, url = self._get_pre_115_stable_url(milestone) 567 if not url: 568 listing_url, url = self._get_canary_url() 569 570 if not url: 571 raise DriverNotFoundError( 572 "Please manually compile/download chromedriver for " 573 f"{self.browser.type_name} {self.browser.version}") 574 575 logging.info("CHROMEDRIVER Downloading M%s: %s", milestone, listing_url or 576 url) 577 with tempfile.TemporaryDirectory() as tmp_dir: 578 if ".zip" not in url: 579 maybe_driver = pth.LocalPath(tmp_dir) / "chromedriver" 580 self.host_platform.download_to(url, maybe_driver) 581 else: 582 zip_file = pth.LocalPath(tmp_dir) / "download.zip" 583 self.host_platform.download_to(url, zip_file) 584 with zipfile.ZipFile(zip_file, "r") as zip_ref: 585 zip_ref.extractall(zip_file.parent) 586 zip_file.unlink() 587 maybe_driver = None 588 candidates: List[pth.LocalPath] = [ 589 path for path in zip_file.parent.glob("**/*") 590 if path.is_file() and "chromedriver" in path.name 591 ] 592 # Find exact match first: 593 maybe_drivers: List[pth.LocalPath] = [ 594 path for path in candidates if path.stem == "chromedriver" 595 ] 596 # Backup less strict matching: 597 maybe_drivers += candidates 598 if len(maybe_drivers) > 0: 599 maybe_driver = maybe_drivers[0] 600 if not maybe_driver or not maybe_driver.is_file(): 601 raise DriverNotFoundError( 602 f"Extracted driver at {maybe_driver} does not exist.") 603 self.driver_path.parent.mkdir(parents=True, exist_ok=True) 604 shutil.move(os.fspath(maybe_driver), os.fspath(self.driver_path)) 605 self.driver_path.chmod(self.driver_path.stat().st_mode | stat.S_IEXEC) 606 607 # Using CFT as abbreviation for Chrome For Testing here. 608 CFT_MIN_MILESTONE = 115 609 CFT_BASE_URL: str = "https://googlechromelabs.github.io/chrome-for-testing" 610 CFT_VERSION_URL: str = f"{CFT_BASE_URL}/{{version}}.json" 611 CFT_LATEST_URL: str = f"{CFT_BASE_URL}/LATEST_RELEASE_{{major}}" 612 613 CFT_PLATFORM: Final[Dict[Tuple[str, str], str]] = { 614 ("linux", "x64"): "linux64", 615 ("macos", "x64"): "mac-x64", 616 ("macos", "arm64"): "mac-arm64", 617 ("win", "ia32"): "win32", 618 ("win", "x64"): "win64" 619 } 620 621 def _get_cft_url(self, milestone: int) -> Tuple[str, Optional[str]]: 622 logging.debug("ChromeDriverFinder: Looking up chrome-for-testing version.") 623 platform_name: Optional[str] = self.CFT_PLATFORM.get(self.host_platform.key) 624 if not platform_name: 625 raise DriverNotFoundError( 626 f"Unsupported platform {self.host_platform.key} for chromedriver.") 627 listing_url, version_data = self._get_cft_version_data(milestone) 628 download_url: Optional[str] = None 629 if version_data: 630 download_url = self._get_cft_driver_download_url(version_data, 631 platform_name) 632 return (listing_url, download_url) 633 634 def _get_cft_version_data(self, milestone: int) -> Tuple[str, Optional[Dict]]: 635 logging.debug("ChromeDriverFinder: Trying direct download url") 636 listing_url, data = self._get_cft_precise_version_data(self.browser.version) 637 if data: 638 return listing_url, data 639 logging.debug( 640 "ChromeDriverFinder: Invalid precise version url %s, " 641 "using M%s", listing_url, milestone) 642 return self._get_ctf_milestone_data(milestone) 643 644 def _get_cft_precise_version_data(self, 645 version: str) -> Tuple[str, Optional[Dict]]: 646 version_url = self.CFT_VERSION_URL.format(version=version) 647 try: 648 with helper.urlopen(version_url) as response: 649 version_data = json.loads(response.read().decode("utf-8")) 650 return (version_url, version_data) 651 except urllib.error.HTTPError as e: 652 logging.debug("ChromeDriverFinder: " 653 "Precise version download failed %s", e) 654 return (version_url, None) 655 656 def _get_ctf_milestone_data(self, 657 milestone: int) -> Tuple[str, Optional[Dict]]: 658 latest_version_url = self.CFT_LATEST_URL.format(major=milestone) 659 try: 660 with helper.urlopen(latest_version_url) as response: 661 alternative_version = response.read().decode("utf-8").strip() 662 logging.debug( 663 "ChromeDriverFinder: Using alternative version %s " 664 "for M%s", alternative_version, milestone) 665 return self._get_cft_precise_version_data(alternative_version) 666 except urllib.error.HTTPError: 667 return (self.CFT_BASE_URL, None) 668 669 def _get_cft_driver_download_url(self, version_data, 670 platform_name) -> Optional[str]: 671 if all_downloads := version_data.get("downloads"): 672 driver_downloads: Dict = all_downloads.get("chromedriver", []) 673 for download in driver_downloads: 674 if isinstance(download, dict) and download["platform"] == platform_name: 675 return download["url"] 676 return None 677 678 PRE_115_STABLE_URL: str = "http://chromedriver.storage.googleapis.com" 679 680 def _get_pre_115_stable_url(self, 681 milestone: int) -> Tuple[str, Optional[str]]: 682 logging.debug( 683 "ChromeDriverFinder: " 684 "Looking upe old-style stable version M%s", milestone) 685 assert milestone < self.CFT_MIN_MILESTONE 686 listing_url = f"{self.PRE_115_STABLE_URL}/index.html" 687 driver_version: Optional[str] = self._get_pre_115_driver_version(milestone) 688 if not driver_version: 689 return listing_url, None 690 if self.host_platform.is_linux: 691 arch_suffix = "linux64" 692 elif self.host_platform.is_macos: 693 arch_suffix = "mac64" 694 if self.host_platform.is_arm64: 695 # The uploaded chromedriver archives changed the naming scheme after 696 # chrome version 106.0.5249.21 for Arm64 (previously m1): 697 # before: chromedriver_mac64_m1.zip 698 # after: chromedriver_mac_arm64.zip 699 last_old_naming_version = (106, 0, 5249, 21) 700 version_tuple = tuple(map(int, driver_version.split("."))) 701 if version_tuple <= last_old_naming_version: 702 arch_suffix = "mac64_m1" 703 else: 704 arch_suffix = "mac_arm64" 705 elif self.host_platform.is_win: 706 arch_suffix = "win32" 707 else: 708 raise DriverNotFoundError("Unsupported chromedriver platform") 709 url = (f"{self.PRE_115_STABLE_URL}/{driver_version}/" 710 f"chromedriver_{arch_suffix}.zip") 711 return listing_url, url 712 713 def _get_pre_115_driver_version(self, milestone) -> Optional[str]: 714 if milestone < 70: 715 return self._get_pre_70_driver_version(milestone) 716 url = f"{self.PRE_115_STABLE_URL}/LATEST_RELEASE_{milestone}" 717 try: 718 with helper.urlopen(url) as response: 719 return response.read().decode("utf-8") 720 except urllib.error.HTTPError as e: 721 if e.code != 404: 722 raise DriverNotFoundError(f"Could not query {url}") from e 723 logging.debug("ChromeDriverFinder: Could not load latest release url %s", 724 e) 725 return None 726 727 def _get_pre_70_driver_version(self, milestone) -> Optional[str]: 728 with helper.urlopen( 729 f"{self.PRE_115_STABLE_URL}/2.46/notes.txt") as response: 730 lines = response.read().decode("utf-8").splitlines() 731 for i, line in enumerate(lines): 732 if not line.startswith("---"): 733 continue 734 [min_version, max_version] = map(int, re.findall(r"\d+", lines[i + 1])) 735 if min_version <= milestone <= max_version: 736 match = re.search(r"\d\.\d+", line) 737 if not match: 738 raise DriverNotFoundError(f"Could not parse version number: {line}") 739 return match.group(0) 740 return None 741 742 CHROMIUM_DASH_URL: str = "https://chromiumdash.appspot.com/fetch_releases" 743 CHROMIUM_LISTING_URL: str = ( 744 "https://www.googleapis.com/storage/v1/b/chromium-browser-snapshots/o/") 745 CHROMIUM_DASH_PARAMS: Dict[Tuple[str, str], Dict] = { 746 ("linux", "x64"): { 747 "dash_platform": "linux", 748 "dash_channel": "dev", 749 "dash_limit": 10, 750 }, 751 ("macos", "x64"): { 752 "dash_platform": "mac", 753 }, 754 ("macos", "arm64"): { 755 "dash_platform": "mac", 756 }, 757 ("win", "ia32"): { 758 "dash_platform": "win", 759 }, 760 ("win", "x64"): { 761 "dash_platform": "win64", 762 }, 763 } 764 CHROMIUM_LISTING_PREFIX: Dict[Tuple[str, str], str] = { 765 ("linux", "x64"): "Linux_x64", 766 ("macos", "x64"): "Mac", 767 ("macos", "arm64"): "Mac_Arm", 768 ("win", "ia32"): "Win", 769 ("win", "x64"): "Win_x64", 770 } 771 772 def _get_canary_url(self) -> Tuple[str, Optional[str]]: 773 logging.debug( 774 "ChromeDriverFinder: Try downloading the chromedriver canary version") 775 properties = self.CHROMIUM_DASH_PARAMS.get(self.host_platform.key) 776 if not properties: 777 raise DriverNotFoundError( 778 f"Unsupported platform={self.platform}, key={self.host_platform.key}") 779 dash_platform = properties["dash_platform"] 780 dash_channel = properties.get("dash_channel", "canary") 781 # Limit should be > len(canary_versions) so we also get potentially 782 # the latest dev version (only beta / stable have official driver binaries). 783 dash_limit = properties.get("dash_limit", 100) 784 url = helper.update_url_query( 785 self.CHROMIUM_DASH_URL, { 786 "platform": dash_platform, 787 "channel": dash_channel, 788 "milestone": str(self.browser.major_version), 789 "num": str(dash_limit), 790 }) 791 chromium_base_position = 0 792 with helper.urlopen(url) as response: 793 version_infos = list(json.loads(response.read().decode("utf-8"))) 794 if not version_infos: 795 raise DriverNotFoundError("Could not find latest version info for " 796 f"platform={self.host_platform}") 797 for version_info in version_infos: 798 if version_info["version"] == self.browser.version: 799 chromium_base_position = int( 800 version_info["chromium_main_branch_position"]) 801 break 802 803 if not chromium_base_position and version_infos: 804 fallback_version_info = None 805 # Try matching latest milestone 806 for version_info in version_infos: 807 if version_info["milestone"] == self.browser.major_version: 808 fallback_version_info = version_info 809 break 810 811 if not fallback_version_info: 812 # Android has a slightly different release cycle than the desktop 813 # versions. Assume that the latest canary version is good enough 814 fallback_version_info = version_infos[0] 815 chromium_base_position = int( 816 fallback_version_info["chromium_main_branch_position"]) 817 logging.warning( 818 "Falling back to latest (not precisely matching) " 819 "canary chromedriver %s (expected %s)", 820 fallback_version_info["version"], self.browser.version) 821 822 if not chromium_base_position: 823 raise DriverNotFoundError("Could not find matching canary chromedriver " 824 f"for {self.browser.version}") 825 # Use prefixes to limit listing results and increase chances of finding 826 # a matching version 827 listing_prefix = self.CHROMIUM_LISTING_PREFIX.get(self.host_platform.key) 828 if not listing_prefix: 829 raise NotImplementedError( 830 f"Unsupported chromedriver platform {self.host_platform}") 831 base_prefix = str(chromium_base_position)[:4] 832 listing_url = helper.update_url_query(self.CHROMIUM_LISTING_URL, { 833 "prefix": f"{listing_prefix}/{base_prefix}", 834 "maxResults": "10000" 835 }) 836 with helper.urlopen(listing_url) as response: 837 listing = json.loads(response.read().decode("utf-8")) 838 839 versions = [] 840 logging.debug("Filtering %s candidate URLs.", len(listing["items"])) 841 for version in listing["items"]: 842 if "name" not in version: 843 continue 844 if "mediaLink" not in version: 845 continue 846 name = version["name"] 847 if "chromedriver" not in name: 848 continue 849 parts = name.split("/") 850 if "chromedriver" not in parts[-1] or len(parts) < 3: 851 continue 852 base = parts[1] 853 try: 854 int(base) 855 except ValueError: 856 # Ignore base if it is not an int 857 continue 858 versions.append((int(base), version["mediaLink"])) 859 versions.sort() 860 logging.debug("Found candidates: %s", versions) 861 logging.debug("chromium_base_position=%s", chromium_base_position) 862 863 for i in range(len(versions)): 864 base, url = versions[i] 865 if base > chromium_base_position: 866 base, url = versions[i - 1] 867 return listing_url, url 868 return listing_url, None 869