• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import atexit
9import logging
10import os
11import time
12import traceback
13from typing import TYPE_CHECKING, Any, List, Optional, Sequence, cast
14
15import selenium.common.exceptions
16import urllib3
17from selenium import webdriver
18from selenium.webdriver.remote.remote_connection import RemoteConnection
19
20from crossbench.browsers.attributes import BrowserAttributes
21from crossbench.browsers.browser import Browser
22from crossbench.types import JsonDict
23
24if TYPE_CHECKING:
25  import datetime as dt
26
27  from selenium.webdriver.common.timeouts import Timeouts
28
29  from crossbench.browsers.settings import Settings
30  from crossbench.env import HostEnvironment
31  from crossbench.path import AnyPath, LocalPath
32  from crossbench.runner.groups.session import BrowserSessionRunGroup
33
34
35class DriverException(RuntimeError):
36  """Wrapper for more readable error messages than the default
37  WebDriver exceptions."""
38
39  def __init__(self, msg: str, browser: Optional[Browser] = None) -> None:
40    self._browser = browser
41    self._msg = msg
42    super().__init__(msg)
43
44  def __str__(self) -> str:
45    browser_prefix = ""
46    if self._browser:
47      browser_prefix = f"browser={self._browser}: "
48    return f"{browser_prefix}{self._msg}"
49
50
51class WebDriverBrowser(Browser, metaclass=abc.ABCMeta):
52  _private_driver: webdriver.Remote
53  _driver_path: Optional[AnyPath]
54  _driver_pid: int
55  _pid: int
56  log_file: Optional[LocalPath]
57
58  def __init__(self,
59               label: str,
60               path: Optional[AnyPath] = None,
61               settings: Optional[Settings] = None):
62    super().__init__(label, path, settings)
63    self._driver_path = self._settings.driver_path
64
65  @property
66  def attributes(self) -> BrowserAttributes:
67    return BrowserAttributes.WEBDRIVER
68
69  @property
70  def driver_log_file(self) -> LocalPath:
71    log_file = self.log_file
72    assert log_file
73    return log_file.with_suffix(".driver.log")
74
75  def validate_binary(self) -> None:
76    super().validate_binary()
77    self._driver_path = self.platform.absolute(self._find_driver())
78    # TODO: support remote chromedriver as well
79    assert self.host_platform.exists(self._driver_path), (
80        f"Webdriver path '{self._driver_path}' does not exist")
81
82  @abc.abstractmethod
83  def _find_driver(self) -> AnyPath:
84    pass
85
86  @abc.abstractmethod
87  def _validate_driver_version(self) -> None:
88    pass
89
90  def validate_env(self, env: HostEnvironment) -> None:
91    super().validate_env(env)
92    self._validate_driver_version()
93
94  def start(self, session: BrowserSessionRunGroup) -> None:
95    assert self._driver_path
96    if timeout := self.http_request_timeout:
97      logging.debug("Setting http request timeout to %s", timeout)
98      RemoteConnection.set_timeout(timeout.total_seconds())
99    try:
100      self._private_driver = self._start_driver(session, self._driver_path)
101    except selenium.common.exceptions.SessionNotCreatedException as e:
102      msg = e.msg or "Could not create Webdriver session."
103      raise DriverException(msg, self) from e
104    self._is_running = True
105    atexit.register(self.force_quit)
106    self._find_driver_pid()
107    self._set_driver_timeouts(session)
108    self._setup_window()
109
110  def _find_driver_pid(self) -> None:
111    service = getattr(self._private_driver, "service", None)
112    if not service:
113      return
114    self._driver_pid = service.process.pid
115    candidates: List[int] = []
116    for child in self.platform.process_children(self._driver_pid):
117      if str(child["exe"]) == str(self.path):
118        candidates.append(child["pid"])
119    if len(candidates) == 1:
120      self._pid = candidates[0]
121    else:
122      logging.debug(
123          "Could not find unique browser process for webdriver: %s, got %s",
124          self, candidates)
125
126  def _set_driver_timeouts(self, session: BrowserSessionRunGroup) -> None:
127    """Adjust the global webdriver timeouts if the runner has custom timeout
128    unit values.
129    If timing.has_no_timeout each value is set to SAFE_MAX_TIMEOUT_TIMEDELTA."""
130    timing = session.timing
131    if not timing.timeout_unit:
132      return
133    if timing.has_no_timeout:
134      logging.info("Disabling webdriver timeouts")
135    else:
136      factor = timing.timeout_unit.total_seconds()
137      if factor != 1.0:
138        logging.info("Increasing webdriver timeouts by %fx", factor)
139    timeouts: Timeouts = self._private_driver.timeouts
140    if implicit_wait := getattr(timeouts, "implicit_wait", None):
141      timeouts.implicit_wait = timing.timeout_timedelta(
142          implicit_wait).total_seconds()
143    if script := getattr(timeouts, "script", None):
144      timeouts.script = timing.timeout_timedelta(script).total_seconds()
145    if page_load := getattr(timeouts, "page_load", None):
146      timeouts.page_load = timing.timeout_timedelta(page_load).total_seconds()
147    self._private_driver.timeouts = timeouts
148
149  def _setup_window(self) -> None:
150    # Force main window to foreground.
151    self._private_driver.switch_to.window(
152        self._private_driver.current_window_handle)
153    if self.viewport.is_headless:
154      return
155    if self.viewport.is_fullscreen:
156      self._private_driver.fullscreen_window()
157    elif self.viewport.is_maximized:
158      self._private_driver.maximize_window()
159    else:
160      self._private_driver.set_window_position(self.viewport.x, self.viewport.y)
161      self._private_driver.set_window_size(self.viewport.width,
162                                           self.viewport.height)
163
164  @abc.abstractmethod
165  def _start_driver(self, session: BrowserSessionRunGroup,
166                    driver_path: AnyPath) -> webdriver.Remote:
167    pass
168
169  def details_json(self) -> JsonDict:
170    details: JsonDict = super().details_json()
171    log = cast(JsonDict, details["log"])
172    if self.log_file:
173      log["driver"] = os.fspath(self.driver_log_file)
174    return details
175
176  def show_url(self, url: str, target: Optional[str] = None) -> None:
177    logging.debug("WebDriverBrowser.show_url(%s, %s)", url, target)
178    try:
179      if target in ("_self", None):
180        handles = self._private_driver.window_handles
181        assert handles, "Browser has no more opened windows."
182        self._private_driver.switch_to.window(handles[-1])
183      elif target == "_new_tab":
184        self._private_driver.switch_to.new_window("tab")
185      elif target == "_new_window":
186        self._private_driver.switch_to.new_window("window")
187      else:
188        raise RuntimeError(f"unexpected target {target}")
189      self._private_driver.get(url)
190    except selenium.common.exceptions.WebDriverException as e:
191      if msg := e.msg:
192        self._wrap_webdriver_exception(e, msg, url)
193      raise
194
195  def switch_to_new_tab(self) -> None:
196    self._private_driver.switch_to.new_window("tab")
197
198  def screenshot(self, path: LocalPath) -> None:
199    if not self._private_driver.get_screenshot_as_file(path.as_posix()):
200      raise DriverException(
201          f"Browser failed to get_screenshot_as_file to file '{path}'", self)
202
203  def _wrap_webdriver_exception(
204      self, e: selenium.common.exceptions.WebDriverException, msg: str,
205      url: str) -> None:
206    if "net::ERR_CONNECTION_REFUSED" in msg:
207      raise DriverException(
208          f"Browser failed to load URL={url}. The URL is likely unreachable.",
209          self) from e
210    if "net::ERR_INTERNET_DISCONNECTED" in msg:
211      raise DriverException(
212          f"Browser failed to load URL={url}. "
213          f"The device is not connected to the internet.", self) from e
214
215  def js(
216      self,
217      script: str,
218      timeout: Optional[dt.timedelta] = None,
219      arguments: Sequence[object] = ()
220  ) -> Any:
221    logging.debug("WebDriverBrowser.js() timeout=%s, script: %s", timeout,
222                  script)
223    assert self._is_running
224    try:
225      if timeout is not None:
226        assert timeout.total_seconds() > 0, (
227            f"timeout must be a positive number, got: {timeout}")
228        self._private_driver.set_script_timeout(timeout.total_seconds())
229      return self._private_driver.execute_script(script, *arguments)
230    except selenium.common.exceptions.WebDriverException as e:
231      # pylint: disable=raise-missing-from
232      raise ValueError(f"Could not execute JS: {e.msg}")
233
234  def close_all_tabs(self) -> None:
235    try:
236      all_handles = self._private_driver.window_handles
237      for handle in all_handles:
238        self._private_driver.switch_to.window(handle)
239        self._private_driver.close()
240    except (selenium.common.exceptions.InvalidSessionIdException,
241            urllib3.exceptions.MaxRetryError) as e:
242      logging.debug("%s: Got errors while closing all tabs: {%s}", self, e)
243
244  def quit(self) -> None:
245    assert self._is_running
246    self.close_all_tabs()
247    self.force_quit()
248
249  def force_quit(self) -> None:
250    if getattr(self, "_private_driver", None) is None or not self._is_running:
251      return
252    atexit.unregister(self.force_quit)
253    logging.debug("WebDriverBrowser.force_quit()")
254    try:
255      try:
256        # Close the current window.
257        self._private_driver.close()
258        time.sleep(0.1)
259      except selenium.common.exceptions.NoSuchWindowException:
260        # No window is good.
261        pass
262      except selenium.common.exceptions.InvalidSessionIdException:
263        # Closing the last tab will close the session as well.
264        return
265      try:
266        self._private_driver.quit()
267      except selenium.common.exceptions.InvalidSessionIdException:
268        return
269      # Sometimes a second quit is needed, ignore any warnings there
270      try:
271        self._private_driver.quit()
272      except Exception as e:  # pylint: disable=broad-except
273        logging.debug("Driver raised exception on quit: %s\n%s", e,
274                      traceback.format_exc())
275      return
276    except Exception as e:  # pylint: disable=broad-except
277      logging.debug("Could not quit browser: %s\n%s", e, traceback.format_exc())
278    finally:
279      self._is_running = False
280
281
282class RemoteWebDriver(WebDriverBrowser, Browser):
283  """Represent a remote WebDriver that has already been started"""
284
285  def __init__(self, label: str, driver: webdriver.Remote) -> None:
286    super().__init__(label=label, path=None)
287    self._private_driver = driver
288    self.version: str = driver.capabilities["browserVersion"]
289    self.major_version: int = int(self.version.split(".")[0])
290
291  @property
292  def type_name(self) -> str:
293    return "remote"
294
295  @property
296  def attributes(self) -> BrowserAttributes:
297    return BrowserAttributes.WEBDRIVER | BrowserAttributes.REMOTE
298
299  def _validate_driver_version(self) -> None:
300    pass
301
302  def _extract_version(self) -> str:
303    raise NotImplementedError()
304
305  def _find_driver(self) -> LocalPath:
306    raise NotImplementedError()
307
308  def _start_driver(self, session: BrowserSessionRunGroup,
309                    driver_path: AnyPath) -> webdriver.Remote:
310    raise NotImplementedError()
311
312  def setup_binary(self) -> None:
313    pass
314
315  def start(self, session: BrowserSessionRunGroup) -> None:
316    # Driver has already been started. We just need to mark it as running.
317    self._is_running = True
318    if self.viewport.is_fullscreen:
319      self._private_driver.fullscreen_window()
320    elif self.viewport.is_maximized:
321      self._private_driver.maximize_window()
322    else:
323      self._private_driver.set_window_position(self.viewport.x, self.viewport.y)
324      self._private_driver.set_window_size(self.viewport.width,
325                                           self.viewport.height)
326
327  def quit(self) -> None:
328    # External code that started the driver is responsible for shutting it down.
329    self._is_running = False
330