• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import atexit
9import datetime as dt
10import json
11import logging
12import os
13import re
14import shutil
15import stat
16import tempfile
17import urllib.error
18import zipfile
19from typing import (TYPE_CHECKING, Any, Dict, Final, Iterable, List, Optional,
20                    Sequence, Tuple, Type, cast)
21
22import hjson
23from immutabledict import immutabledict
24from selenium.webdriver.chromium.options import ChromiumOptions
25from selenium.webdriver.chromium.service import ChromiumService
26from selenium.webdriver.chromium.webdriver import ChromiumDriver
27from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
28
29from crossbench import exception, helper
30from crossbench import path as pth
31from crossbench import plt
32from crossbench.browsers.attributes import BrowserAttributes
33from crossbench.browsers.chromium.chromium import Chromium
34from crossbench.browsers.chromium.version import (ChromeDriverVersion,
35                                                  ChromiumVersion)
36from crossbench.browsers.webdriver import WebDriverBrowser
37from crossbench.cli.config.secret_type import SecretType
38from crossbench.flags.chrome import ChromeFlags
39from crossbench.plt.android_adb import AndroidAdbPlatform
40from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform
41from crossbench.plt.linux_ssh import LinuxSshPlatform
42
43if TYPE_CHECKING:
44  from selenium import webdriver
45
46  from crossbench.browsers.settings import Settings
47  from crossbench.cli.config.secrets import Secret
48  from crossbench.flags.base import FlagsT
49  from crossbench.plt.base import Platform
50  from crossbench.runner.groups.session import BrowserSessionRunGroup
51
52
53class ChromiumWebDriver(WebDriverBrowser, Chromium, metaclass=abc.ABCMeta):
54
55  WEB_DRIVER_OPTIONS: Type[ChromiumOptions] = ChromiumOptions
56  WEB_DRIVER_SERVICE: Type[ChromiumService] = ChromiumService
57
58  @property
59  def attributes(self) -> BrowserAttributes:
60    return (BrowserAttributes.CHROMIUM | BrowserAttributes.CHROMIUM_BASED
61            | BrowserAttributes.WEBDRIVER)
62
63  def use_local_chromedriver(self) -> bool:
64    return self.major_version == 0 or self.is_locally_compiled()
65
66  def is_locally_compiled(self) -> bool:
67    return pth.LocalPath(self.app_path.parent / "args.gn").exists()
68
69  def _execute_cdp_cmd(self, driver: webdriver.Remote, cmd: str,
70                       cmd_args: dict):
71    return driver.execute("executeCdpCommand", {
72        "cmd": cmd,
73        "params": cmd_args
74    })["value"]
75
76  def _find_driver(self) -> pth.AnyPath:
77    if self._driver_path:
78      return self._driver_path
79    finder = ChromeDriverFinder(self)
80    assert self.app_path
81    if self.use_local_chromedriver():
82      return finder.find_local_build()
83    try:
84      return finder.download()
85    except DriverNotFoundError as original_download_error:
86      logging.debug(
87          "Could not download chromedriver, "
88          "falling back to finding local build: %s", original_download_error)
89      try:
90        return finder.find_local_build()
91      except DriverNotFoundError as e:
92        logging.debug("Could not find fallback chromedriver: %s", e)
93        raise original_download_error from e
94      # to make an old pytype version happy
95      return pth.LocalPath()
96
97  def _start_driver(self, session: BrowserSessionRunGroup,
98                    driver_path: pth.AnyPath) -> webdriver.Remote:
99    return self._start_chromedriver(session, driver_path)
100
101  def _start_chromedriver(self, session: BrowserSessionRunGroup,
102                          driver_path: pth.AnyPath) -> ChromiumDriver:
103    assert not self._is_running
104    assert self.log_file
105    args = self._get_browser_flags_for_session(session)
106    options = self._create_options(session, args)
107
108    self._log_browser_start(args, driver_path)
109    service_args: List[str] = []
110    log_path: Optional[str] = None
111    if self._settings.driver_logging:
112      service_args += ["--verbose"]
113      log_path = os.fspath(self.driver_log_file)
114    # pytype: disable=wrong-keyword-args
115    service = self.WEB_DRIVER_SERVICE(
116        executable_path=os.fspath(driver_path),
117        log_path=log_path,
118        service_args=service_args)
119    # TODO: support remote platforms
120    service.log_file = pth.LocalPath(self.stdout_log_file).open(  # pylint: disable=consider-using-with
121        "w", encoding="utf-8")
122    driver = self._create_driver(options, service)
123    # pytype: enable=wrong-keyword-args
124    # Prevent debugging overhead.
125    self._execute_cdp_cmd(driver, "Runtime.setMaxCallStackSizeToCapture",
126                          {"size": 0})
127    return driver
128
129  def _create_options(self, session: BrowserSessionRunGroup,
130                      args: Sequence[str]) -> ChromiumOptions:
131    assert not self._is_running
132    options: ChromiumOptions = self.WEB_DRIVER_OPTIONS()
133    options.set_capability("browserVersion", str(self.major_version))
134    # Don't wait for document-ready.
135    options.set_capability("pageLoadStrategy", "eager")
136    for arg in args:
137      options.add_argument(arg)
138    options.binary_location = os.fspath(self.path)
139    session.setup_selenium_options(options)
140    return options
141
142  @abc.abstractmethod
143  def _create_driver(self, options: ChromiumOptions,
144                     service: ChromiumService) -> ChromiumDriver:
145    pass
146
147  def _validate_driver_version(self) -> None:
148    assert self._driver_path, "No driver available"
149    error_message = None
150    if self.is_local and is_build_dir(
151        self.platform.local_path(self.app_path.parent)):
152      error_message = self._validate_locally_built_driver(
153          self.platform.local_path(self._driver_path))
154    else:
155      error_message = self._validate_any_driver_version(self._driver_path)
156    if error_message:
157      raise RuntimeError("\n".join(error_message))
158
159  def _validate_locally_built_driver(
160      self, driver_path: pth.LocalPath) -> Optional[Iterable[str]]:
161    # TODO: migrate to version object on the browser
162    browser_version = ChromiumVersion.parse(self.version)
163    driver_version = ChromeDriverVersion.parse(
164        self.platform.app_version(driver_path))
165    if browser_version.parts == driver_version.parts:
166      return None
167    return (f"Chromedriver version mismatch: driver={driver_version.parts_str} "
168            f"browser={browser_version.parts_str} ({self}).",
169            build_chromedriver_instructions(driver_path.parent))
170
171  def _validate_any_driver_version(
172      self, driver_path: pth.AnyPath) -> Optional[Iterable[str]]:
173    raw_version_str = self.host_platform.sh_stdout(driver_path, "--version")
174    driver_version = ChromeDriverVersion.parse(raw_version_str)
175    if driver_version.major == self.major_version:
176      return None
177    return (f"Chromedriver version mismatch: driver={driver_version} "
178            f"browser={self.version} ({self})",)
179
180  def run_script_on_new_document(self, script: str) -> None:
181    self._execute_cdp_cmd(self._private_driver,
182                          "Page.addScriptToEvaluateOnNewDocument",
183                          {"source": script})
184
185  def current_window_id(self) -> str:
186    return str(self._private_driver.current_window_handle)
187
188  def switch_window(self, window_id: str) -> None:
189    self._private_driver.switch_to.window(window_id)
190
191  def switch_tab(
192      self,
193      title: Optional[re.Pattern] = None,
194      url: Optional[re.Pattern] = None,
195      tab_index: Optional[int] = None,
196      timeout: dt.timedelta = dt.timedelta(seconds=0)
197  ) -> None:
198    driver = self._private_driver
199    original_handle = driver.current_window_handle
200    for _ in helper.wait_with_backoff(timeout, self.platform):
201      # Search through other handles starting from current_window_handle + 1
202      try:
203        i = driver.window_handles.index(original_handle)
204      except ValueError as e:
205        raise RuntimeError("Original starting tab no longer exists") from e
206
207      if tab_index is not None:
208        handles = [driver.window_handles[tab_index]]
209      else:
210        handles = driver.window_handles[i + 1:] + driver.window_handles[:i]
211
212      for handle in handles:
213        driver.switch_to.window(handle)
214        if title is not None:
215          if title.match(driver.title) is None:
216            continue
217        if url is not None:
218          if url.match(driver.current_url) is None:
219            continue
220        return
221    error = "No new tab found"
222    if title is not None:
223      error += f" with title matching {repr(title.pattern)}"
224    if url is not None:
225      error += f" with url matching {repr(url.pattern)}"
226    if tab_index is not None:
227      error += f" with tab_index matching {tab_index}"
228    raise RuntimeError(error)
229
230  def start_profiling(self) -> None:
231    assert isinstance(self._private_driver, ChromiumDriver)
232    # TODO: reuse the TraceProbe categories,
233    self._execute_cdp_cmd(
234        self._private_driver, "Tracing.start", {
235            "transferMode":
236                "ReturnAsStream",
237            "includedCategories": [
238                "devtools.timeline",
239                "v8.execute",
240                "disabled-by-default-devtools.timeline",
241                "disabled-by-default-devtools.timeline.frame",
242                "toplevel",
243                "blink.console",
244                "blink.user_timing",
245                "latencyInfo",
246                "disabled-by-default-devtools.timeline.stack",
247                "disabled-by-default-v8.cpu_profiler",
248            ],
249        })
250
251  def stop_profiling(self) -> Any:
252    assert isinstance(self._private_driver, ChromiumDriver)
253    data = self._execute_cdp_cmd(self._private_driver,
254                                 "Tracing.tracingComplete", {})
255    # TODO: use webdriver bidi to get the async Tracing.end event.
256    # self._execute_cdp_cmd(self._driver, "Tracing.end", {})
257    return data
258
259
260# Android is high-tech and reads chrome flags from an app-specific file.
261# TODO: extend support to more than just chrome.
262_FLAG_ROOT: pth.AnyPosixPath = pth.AnyPosixPath("/data/local/tmp/")
263FLAGS_WEBLAYER: pth.AnyPosixPath = _FLAG_ROOT / "weblayer-command-line"
264FLAGS_WEBVIEW: pth.AnyPosixPath = _FLAG_ROOT / "webview-command-line"
265FLAGS_CONTENT_SHELL: pth.AnyPosixPath = (
266    _FLAG_ROOT / "content-shell-command-line")
267FLAGS_CHROME: pth.AnyPosixPath = _FLAG_ROOT / "chrome-command-line"
268
269
270class ChromiumWebDriverAndroid(ChromiumWebDriver):
271
272  def __init__(self,
273               label: str,
274               path: Optional[pth.AnyPath] = None,
275               settings: Optional[Settings] = None):
276    assert settings, "Android browser needs custom settings and platform"
277    self._chrome_command_line_path: pth.AnyPath = FLAGS_CHROME
278    self._previous_command_line_contents: Optional[str] = None
279    super().__init__(label, path, settings)
280    self._android_package: str = self._lookup_android_package(self.path)
281    if not self._android_package:
282      raise RuntimeError("Could not find matching adb package for "
283                         f"{self.path} on {self.platform}")
284
285  def _lookup_android_package(self, path: pth.AnyPath) -> str:
286    return self.platform.app_path_to_package(path)
287
288  @property
289  def android_package(self) -> str:
290    return self._android_package
291
292  @property
293  def platform(self) -> AndroidAdbPlatform:
294    assert isinstance(
295        self._platform,
296        AndroidAdbPlatform), (f"Invalid platform: {self._platform}")
297    return cast(AndroidAdbPlatform, self._platform)
298
299  def _resolve_binary(self, path: pth.AnyPath) -> pth.AnyPath:
300    return path
301
302  # TODO: implement setting a clean profile on android
303  _UNSUPPORTED_FLAGS: Tuple[str, ...] = (
304      "--user-data-dir",
305      "--disable-sync",
306      "--window-size",
307      "--window-position",
308  )
309
310  def _filter_flags_for_run(self, flags: FlagsT) -> FlagsT:
311    assert isinstance(flags, ChromeFlags)
312    chrome_flags = cast(ChromeFlags, flags)
313    for flag in self._UNSUPPORTED_FLAGS:
314      if flag not in chrome_flags:
315        continue
316      flag_value = chrome_flags.pop(flag, None)
317      logging.debug("Chrome Android: Removed unsupported flag: %s=%s", flag,
318                    flag_value)
319    return chrome_flags
320
321  def _start_driver(self, session: BrowserSessionRunGroup,
322                    driver_path: pth.AnyPath) -> webdriver.Remote:
323    self.adb_force_stop()
324    if session.browser.wipe_system_user_data:
325      self.adb_force_clear()
326      self.platform.adb.grant_notification_permissions(self.android_package)
327    self._backup_chrome_flags()
328    atexit.register(self._restore_chrome_flags)
329    return self._start_chromedriver(session, driver_path)
330
331  def _backup_chrome_flags(self) -> None:
332    assert self._previous_command_line_contents is None
333    self._previous_command_line_contents = self._read_device_flags()
334
335  def _read_device_flags(self) -> Optional[str]:
336    if not self.platform.exists(self._chrome_command_line_path):
337      return None
338    return self.platform.cat(self._chrome_command_line_path)
339
340  def adb_force_stop(self) -> None:
341    self.platform.adb.force_stop(self.android_package)
342
343  def adb_force_clear(self) -> None:
344    self.platform.adb.force_clear(self.android_package)
345
346  def force_quit(self) -> None:
347    try:
348      try:
349        super().force_quit()
350      finally:
351        self.adb_force_stop()
352    finally:
353      self._restore_chrome_flags()
354
355  def _restore_chrome_flags(self) -> None:
356    atexit.unregister(self._restore_chrome_flags)
357    current_flags = self._read_device_flags()
358    if current_flags != self._previous_command_line_contents:
359      logging.warning("%s: flags file changed during run", self)
360      logging.debug("before: %s", self._previous_command_line_contents)
361      logging.debug("current: %s", current_flags)
362    if self._previous_command_line_contents is None:
363      logging.debug("%s: deleting chrome flags file: %s", self,
364                    self._chrome_command_line_path)
365      self.platform.rm(self._chrome_command_line_path, missing_ok=True)
366    else:
367      logging.debug("%s: restoring previous flags file contents in %s", self,
368                    self._chrome_command_line_path)
369      self.platform.set_file_contents(self._chrome_command_line_path,
370                                      self._previous_command_line_contents)
371    self._previous_command_line_contents = None
372
373  def _create_options(self, session: BrowserSessionRunGroup,
374                      args: Sequence[str]) -> ChromiumOptions:
375    options: ChromiumOptions = super()._create_options(session, args)
376    options.binary_location = ""
377    options.add_experimental_option("androidPackage", self.android_package)
378    options.add_experimental_option("androidDeviceSerial",
379                                    self.platform.adb.serial_id)
380    return options
381
382  def setup_binary(self) -> None:
383    super().setup_binary()
384    self.platform.adb.grant_notification_permissions(self.android_package)
385
386
387class LocalChromiumWebDriverAndroid(ChromiumWebDriverAndroid):
388  """
389  Custom version that uses a locally built bundle wrapper.
390  https://chromium.googlesource.com/chromium/src/+/HEAD/docs/android_build_instructions.md
391  """
392
393  @classmethod
394  def is_apk_helper(cls, path: Optional[pth.AnyPath]) -> bool:
395    if not path or len(path.parts) == 1:
396      return False
397    return path.name.endswith("_apk")
398
399  def __init__(self,
400               label: str,
401               path: Optional[pth.AnyPath] = None,
402               settings: Optional[Settings] = None):
403    if self.is_apk_helper(path):
404      raise ValueError(
405          "Locally built chrome version needs package, got empty path")
406    assert settings, "Android browser needs custom settings and platform"
407    self._package_info: immutabledict[str, Any] = self._parse_package_info(
408        settings.platform, path)
409    super().__init__(label, path, settings)
410
411  def _lookup_android_package(self, path: pth.AnyPath) -> str:
412    return self._package_info["Package name"]
413
414  def _extract_version(self) -> str:
415    return self._package_info["versionName"]
416
417  def _parse_package_info(self, platform: plt.Platform,
418                          path: pth.AnyPath) -> immutabledict[str, Any]:
419    output = platform.host_platform.sh_stdout(
420        path, "package-info").rstrip().splitlines()
421    package_info = {}
422    for line in output:
423      key, value = line.split(": ")
424      package_info[key] = hjson.loads(value)
425    return immutabledict(package_info)
426
427  def setup_binary(self) -> None:
428    super().setup_binary()
429    self.host_platform.sh_stdout(self.path, "install",
430                                 f"--device={self.platform.serial_id}")
431
432
433class ChromiumWebDriverSsh(ChromiumWebDriver):
434
435  @property
436  def platform(self) -> LinuxSshPlatform:
437    assert isinstance(self._platform,
438                      LinuxSshPlatform), (f"Invalid platform: {self._platform}")
439    return cast(LinuxSshPlatform, self._platform)
440
441  def _start_driver(self, session: BrowserSessionRunGroup,
442                    driver_path: pth.AnyPath) -> RemoteWebDriver:
443    del driver_path
444    args = self._get_browser_flags_for_session(session)
445    options = self._create_options(session, args)
446    platform = self.platform
447    host = platform.host
448    port = platform.port
449    driver = RemoteWebDriver(f"http://{host}:{port}", options=options)
450    return driver
451
452
453class ChromiumWebDriverChromeOsSsh(ChromiumWebDriver):
454
455  @property
456  def platform(self) -> ChromeOsSshPlatform:
457    assert isinstance(
458        self._platform,
459        ChromeOsSshPlatform), (f"Invalid platform: {self._platform}")
460    return cast(ChromeOsSshPlatform, self._platform)
461
462  def _start_driver(self, session: BrowserSessionRunGroup,
463                    driver_path: pth.AnyPath) -> RemoteWebDriver:
464    del driver_path
465    platform = self.platform
466    host = platform.host
467    port = platform.port
468    args = self._get_browser_flags_for_session(session)
469    # TODO(spadhi): correctly handle flags:
470    #   1. decide which flags to pass to chrome vs chromedriver
471    #   2. investigate irrelevant / unsupported flags on ChromeOS
472    #   3. filter out and pass the chrome flags to the debugging session below
473    #   4. pass the remaining flags to RemoteWebDriver options
474    google_login = session.browser.secrets.get(SecretType.GOOGLE)
475    if google_login:
476      dbg_port = platform.create_debugging_session(
477          username=google_login.username, password=google_login.password)
478    else:
479      dbg_port = platform.create_debugging_session()
480    options = self._create_options(session, args)
481    options.add_experimental_option("debuggerAddress", f"127.0.0.1:{dbg_port}")
482    driver = RemoteWebDriver(f"http://{host}:{port}", options=options)
483    return driver
484
485  # On ChromeOS, the system profile is the same as the browser profile.
486  def is_logged_in(self, secret: Secret, strict: bool = False) -> bool:
487    if secret.type != SecretType.GOOGLE:
488      return False
489    if secret.username == self.platform.username:
490      return True
491    if not strict:
492      return False
493    raise RuntimeError("Login of non-primary Google accounts not supported")
494
495
496class DriverNotFoundError(ValueError):
497  pass
498
499
500def build_chromedriver_instructions(build_dir: pth.AnyPath) -> str:
501  return ("Please build 'chromedriver' manually for local builds:\n"
502          f"    autoninja -C {build_dir} chromedriver")
503
504
505def is_build_dir(path: pth.LocalPath,
506                 platform: plt.Platform = plt.PLATFORM) -> bool:
507  return platform.is_file(path / "args.gn")
508
509
510class ChromeDriverFinder:
511  driver_path: pth.LocalPath
512
513  def __init__(self, browser: ChromiumWebDriver):
514    self.browser = browser
515    self.platform: Platform = browser.platform
516    self.host_platform: Platform = browser.platform.host_platform
517    extension: str = ""
518    if self.host_platform.is_win:
519      extension = ".exe"
520    cache_dir = self.host_platform.local_cache_dir("driver")
521    self.driver_path: pth.LocalPath = (
522        cache_dir / f"chromedriver-{self.browser.major_version}{extension}")
523    self._validate_browser()
524
525  def _validate_browser(self) -> None:
526    browser_platform = self.browser.platform
527    if browser_platform.is_local:
528      return
529    # Some remote platforms rely on a local chromedriver
530    if (browser_platform.is_android or browser_platform.is_remote_ssh):
531      return
532    raise RuntimeError("Cannot download chromedriver for remote browser yet")
533
534  def find_local_build(self) -> pth.LocalPath:
535    assert self.browser.app_path
536    # assume it's a local build
537    lookup_dir = pth.LocalPath(self.browser.app_path.parent)
538    driver_path = lookup_dir / "chromedriver"
539    if self.platform.is_win:
540      driver_path = driver_path.with_suffix(".exe")
541    if self.platform.is_file(driver_path):
542      return driver_path
543    error_message: List[str] = [f"Driver '{driver_path}' does not exist."]
544    if is_build_dir(lookup_dir, self.platform):
545      error_message += [build_chromedriver_instructions(lookup_dir)]
546    else:
547      error_message += ["Please manually provide a chromedriver binary."]
548    raise DriverNotFoundError("\n".join(error_message))
549
550  def download(self) -> pth.LocalPath:
551    if not self.platform.is_file(self.driver_path):
552      with exception.annotate(
553          f"Downloading chromedriver for {self.browser.version}"):
554        self._download()
555    return self.driver_path
556
557  def _download(self) -> None:
558    milestone = self.browser.major_version
559    logging.info("CHROMEDRIVER Downloading from %s v%s", self.browser.type_name,
560                 milestone)
561    url: Optional[str] = None
562    listing_url: Optional[str] = None
563    if milestone >= self.CFT_MIN_MILESTONE:
564      listing_url, url = self._get_cft_url(milestone)
565    if not url:
566      listing_url, url = self._get_pre_115_stable_url(milestone)
567      if not url:
568        listing_url, url = self._get_canary_url()
569
570    if not url:
571      raise DriverNotFoundError(
572          "Please manually compile/download chromedriver for "
573          f"{self.browser.type_name} {self.browser.version}")
574
575    logging.info("CHROMEDRIVER Downloading M%s: %s", milestone, listing_url or
576                 url)
577    with tempfile.TemporaryDirectory() as tmp_dir:
578      if ".zip" not in url:
579        maybe_driver = pth.LocalPath(tmp_dir) / "chromedriver"
580        self.host_platform.download_to(url, maybe_driver)
581      else:
582        zip_file = pth.LocalPath(tmp_dir) / "download.zip"
583        self.host_platform.download_to(url, zip_file)
584        with zipfile.ZipFile(zip_file, "r") as zip_ref:
585          zip_ref.extractall(zip_file.parent)
586        zip_file.unlink()
587        maybe_driver = None
588        candidates: List[pth.LocalPath] = [
589            path for path in zip_file.parent.glob("**/*")
590            if path.is_file() and "chromedriver" in path.name
591        ]
592        # Find exact match first:
593        maybe_drivers: List[pth.LocalPath] = [
594            path for path in candidates if path.stem == "chromedriver"
595        ]
596        # Backup less strict matching:
597        maybe_drivers += candidates
598        if len(maybe_drivers) > 0:
599          maybe_driver = maybe_drivers[0]
600      if not maybe_driver or not maybe_driver.is_file():
601        raise DriverNotFoundError(
602            f"Extracted driver at {maybe_driver} does not exist.")
603      self.driver_path.parent.mkdir(parents=True, exist_ok=True)
604      shutil.move(os.fspath(maybe_driver), os.fspath(self.driver_path))
605      self.driver_path.chmod(self.driver_path.stat().st_mode | stat.S_IEXEC)
606
607  # Using CFT as abbreviation for Chrome For Testing here.
608  CFT_MIN_MILESTONE = 115
609  CFT_BASE_URL: str = "https://googlechromelabs.github.io/chrome-for-testing"
610  CFT_VERSION_URL: str = f"{CFT_BASE_URL}/{{version}}.json"
611  CFT_LATEST_URL: str = f"{CFT_BASE_URL}/LATEST_RELEASE_{{major}}"
612
613  CFT_PLATFORM: Final[Dict[Tuple[str, str], str]] = {
614      ("linux", "x64"): "linux64",
615      ("macos", "x64"): "mac-x64",
616      ("macos", "arm64"): "mac-arm64",
617      ("win", "ia32"): "win32",
618      ("win", "x64"): "win64"
619  }
620
621  def _get_cft_url(self, milestone: int) -> Tuple[str, Optional[str]]:
622    logging.debug("ChromeDriverFinder: Looking up chrome-for-testing version.")
623    platform_name: Optional[str] = self.CFT_PLATFORM.get(self.host_platform.key)
624    if not platform_name:
625      raise DriverNotFoundError(
626          f"Unsupported platform {self.host_platform.key} for chromedriver.")
627    listing_url, version_data = self._get_cft_version_data(milestone)
628    download_url: Optional[str] = None
629    if version_data:
630      download_url = self._get_cft_driver_download_url(version_data,
631                                                       platform_name)
632    return (listing_url, download_url)
633
634  def _get_cft_version_data(self, milestone: int) -> Tuple[str, Optional[Dict]]:
635    logging.debug("ChromeDriverFinder: Trying direct download url")
636    listing_url, data = self._get_cft_precise_version_data(self.browser.version)
637    if data:
638      return listing_url, data
639    logging.debug(
640        "ChromeDriverFinder: Invalid precise version url %s, "
641        "using M%s", listing_url, milestone)
642    return self._get_ctf_milestone_data(milestone)
643
644  def _get_cft_precise_version_data(self,
645                                    version: str) -> Tuple[str, Optional[Dict]]:
646    version_url = self.CFT_VERSION_URL.format(version=version)
647    try:
648      with helper.urlopen(version_url) as response:
649        version_data = json.loads(response.read().decode("utf-8"))
650        return (version_url, version_data)
651    except urllib.error.HTTPError as e:
652      logging.debug("ChromeDriverFinder: "
653                    "Precise version download failed %s", e)
654      return (version_url, None)
655
656  def _get_ctf_milestone_data(self,
657                              milestone: int) -> Tuple[str, Optional[Dict]]:
658    latest_version_url = self.CFT_LATEST_URL.format(major=milestone)
659    try:
660      with helper.urlopen(latest_version_url) as response:
661        alternative_version = response.read().decode("utf-8").strip()
662        logging.debug(
663            "ChromeDriverFinder: Using alternative version %s "
664            "for M%s", alternative_version, milestone)
665        return self._get_cft_precise_version_data(alternative_version)
666    except urllib.error.HTTPError:
667      return (self.CFT_BASE_URL, None)
668
669  def _get_cft_driver_download_url(self, version_data,
670                                   platform_name) -> Optional[str]:
671    if all_downloads := version_data.get("downloads"):
672      driver_downloads: Dict = all_downloads.get("chromedriver", [])
673      for download in driver_downloads:
674        if isinstance(download, dict) and download["platform"] == platform_name:
675          return download["url"]
676    return None
677
678  PRE_115_STABLE_URL: str = "http://chromedriver.storage.googleapis.com"
679
680  def _get_pre_115_stable_url(self,
681                              milestone: int) -> Tuple[str, Optional[str]]:
682    logging.debug(
683        "ChromeDriverFinder: "
684        "Looking upe old-style stable version M%s", milestone)
685    assert milestone < self.CFT_MIN_MILESTONE
686    listing_url = f"{self.PRE_115_STABLE_URL}/index.html"
687    driver_version: Optional[str] = self._get_pre_115_driver_version(milestone)
688    if not driver_version:
689      return listing_url, None
690    if self.host_platform.is_linux:
691      arch_suffix = "linux64"
692    elif self.host_platform.is_macos:
693      arch_suffix = "mac64"
694      if self.host_platform.is_arm64:
695        # The uploaded chromedriver archives changed the naming scheme after
696        # chrome version 106.0.5249.21 for Arm64 (previously m1):
697        #   before: chromedriver_mac64_m1.zip
698        #   after:  chromedriver_mac_arm64.zip
699        last_old_naming_version = (106, 0, 5249, 21)
700        version_tuple = tuple(map(int, driver_version.split(".")))
701        if version_tuple <= last_old_naming_version:
702          arch_suffix = "mac64_m1"
703        else:
704          arch_suffix = "mac_arm64"
705    elif self.host_platform.is_win:
706      arch_suffix = "win32"
707    else:
708      raise DriverNotFoundError("Unsupported chromedriver platform")
709    url = (f"{self.PRE_115_STABLE_URL}/{driver_version}/"
710           f"chromedriver_{arch_suffix}.zip")
711    return listing_url, url
712
713  def _get_pre_115_driver_version(self, milestone) -> Optional[str]:
714    if milestone < 70:
715      return self._get_pre_70_driver_version(milestone)
716    url = f"{self.PRE_115_STABLE_URL}/LATEST_RELEASE_{milestone}"
717    try:
718      with helper.urlopen(url) as response:
719        return response.read().decode("utf-8")
720    except urllib.error.HTTPError as e:
721      if e.code != 404:
722        raise DriverNotFoundError(f"Could not query {url}") from e
723      logging.debug("ChromeDriverFinder: Could not load latest release url %s",
724                    e)
725    return None
726
727  def _get_pre_70_driver_version(self, milestone) -> Optional[str]:
728    with helper.urlopen(
729        f"{self.PRE_115_STABLE_URL}/2.46/notes.txt") as response:
730      lines = response.read().decode("utf-8").splitlines()
731    for i, line in enumerate(lines):
732      if not line.startswith("---"):
733        continue
734      [min_version, max_version] = map(int, re.findall(r"\d+", lines[i + 1]))
735      if min_version <= milestone <= max_version:
736        match = re.search(r"\d\.\d+", line)
737        if not match:
738          raise DriverNotFoundError(f"Could not parse version number: {line}")
739        return match.group(0)
740    return None
741
742  CHROMIUM_DASH_URL: str = "https://chromiumdash.appspot.com/fetch_releases"
743  CHROMIUM_LISTING_URL: str = (
744      "https://www.googleapis.com/storage/v1/b/chromium-browser-snapshots/o/")
745  CHROMIUM_DASH_PARAMS: Dict[Tuple[str, str], Dict] = {
746      ("linux", "x64"): {
747          "dash_platform": "linux",
748          "dash_channel": "dev",
749          "dash_limit": 10,
750      },
751      ("macos", "x64"): {
752          "dash_platform": "mac",
753      },
754      ("macos", "arm64"): {
755          "dash_platform": "mac",
756      },
757      ("win", "ia32"): {
758          "dash_platform": "win",
759      },
760      ("win", "x64"): {
761          "dash_platform": "win64",
762      },
763  }
764  CHROMIUM_LISTING_PREFIX: Dict[Tuple[str, str], str] = {
765      ("linux", "x64"): "Linux_x64",
766      ("macos", "x64"): "Mac",
767      ("macos", "arm64"): "Mac_Arm",
768      ("win", "ia32"): "Win",
769      ("win", "x64"): "Win_x64",
770  }
771
772  def _get_canary_url(self) -> Tuple[str, Optional[str]]:
773    logging.debug(
774        "ChromeDriverFinder: Try downloading the chromedriver canary version")
775    properties = self.CHROMIUM_DASH_PARAMS.get(self.host_platform.key)
776    if not properties:
777      raise DriverNotFoundError(
778          f"Unsupported platform={self.platform}, key={self.host_platform.key}")
779    dash_platform = properties["dash_platform"]
780    dash_channel = properties.get("dash_channel", "canary")
781    # Limit should be > len(canary_versions) so we also get potentially
782    # the latest dev version (only beta / stable have official driver binaries).
783    dash_limit = properties.get("dash_limit", 100)
784    url = helper.update_url_query(
785        self.CHROMIUM_DASH_URL, {
786            "platform": dash_platform,
787            "channel": dash_channel,
788            "milestone": str(self.browser.major_version),
789            "num": str(dash_limit),
790        })
791    chromium_base_position = 0
792    with helper.urlopen(url) as response:
793      version_infos = list(json.loads(response.read().decode("utf-8")))
794      if not version_infos:
795        raise DriverNotFoundError("Could not find latest version info for "
796                                  f"platform={self.host_platform}")
797      for version_info in version_infos:
798        if version_info["version"] == self.browser.version:
799          chromium_base_position = int(
800              version_info["chromium_main_branch_position"])
801          break
802
803    if not chromium_base_position and version_infos:
804      fallback_version_info = None
805      # Try matching latest milestone
806      for version_info in version_infos:
807        if version_info["milestone"] == self.browser.major_version:
808          fallback_version_info = version_info
809          break
810
811      if not fallback_version_info:
812        # Android has a slightly different release cycle than the desktop
813        # versions. Assume that the latest canary version is good enough
814        fallback_version_info = version_infos[0]
815      chromium_base_position = int(
816          fallback_version_info["chromium_main_branch_position"])
817      logging.warning(
818          "Falling back to latest (not precisely matching) "
819          "canary chromedriver %s (expected %s)",
820          fallback_version_info["version"], self.browser.version)
821
822    if not chromium_base_position:
823      raise DriverNotFoundError("Could not find matching canary chromedriver "
824                                f"for {self.browser.version}")
825    # Use prefixes to limit listing results and increase chances of finding
826    # a matching version
827    listing_prefix = self.CHROMIUM_LISTING_PREFIX.get(self.host_platform.key)
828    if not listing_prefix:
829      raise NotImplementedError(
830          f"Unsupported chromedriver platform {self.host_platform}")
831    base_prefix = str(chromium_base_position)[:4]
832    listing_url = helper.update_url_query(self.CHROMIUM_LISTING_URL, {
833        "prefix": f"{listing_prefix}/{base_prefix}",
834        "maxResults": "10000"
835    })
836    with helper.urlopen(listing_url) as response:
837      listing = json.loads(response.read().decode("utf-8"))
838
839    versions = []
840    logging.debug("Filtering %s candidate URLs.", len(listing["items"]))
841    for version in listing["items"]:
842      if "name" not in version:
843        continue
844      if "mediaLink" not in version:
845        continue
846      name = version["name"]
847      if "chromedriver" not in name:
848        continue
849      parts = name.split("/")
850      if "chromedriver" not in parts[-1] or len(parts) < 3:
851        continue
852      base = parts[1]
853      try:
854        int(base)
855      except ValueError:
856        # Ignore base if it is not an int
857        continue
858      versions.append((int(base), version["mediaLink"]))
859    versions.sort()
860    logging.debug("Found candidates: %s", versions)
861    logging.debug("chromium_base_position=%s", chromium_base_position)
862
863    for i in range(len(versions)):
864      base, url = versions[i]
865      if base > chromium_base_position:
866        base, url = versions[i - 1]
867        return listing_url, url
868    return listing_url, None
869