• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import argparse
8import dataclasses
9import enum
10import logging
11import re
12from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
13
14from immutabledict import immutabledict
15
16from crossbench import compat
17from crossbench import path as pth
18from crossbench import plt
19from crossbench.config import ConfigObject, ConfigParser
20from crossbench.parse import NumberParser, ObjectParser, PathParser
21from crossbench.plt.android_adb import Adb, AndroidAdbPlatform, adb_devices
22from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform
23from crossbench.plt.ios import ios_devices
24
25if TYPE_CHECKING:
26  from crossbench.path import AnyPath, LocalPath
27
28
29@enum.unique
30class BrowserDriverType(compat.StrEnumWithHelp):
31  WEB_DRIVER = ("WebDriver", "Use Selenium with webdriver, for local runs.")
32  APPLE_SCRIPT = ("AppleScript", "Use AppleScript, for local macOS runs only")
33  ANDROID = ("Android",
34             "Use Webdriver for android. Allows to specify additional settings")
35  IOS = ("iOS", "Placeholder, unsupported at the moment")
36  LINUX_SSH = ("Remote Linux",
37               "Use remote webdriver and execute commands via SSH")
38  CHROMEOS_SSH = ("Remote ChromeOS",
39                  "Use remote ChromeDriver and execute commands via SSH")
40
41  @classmethod
42  def default(cls) -> BrowserDriverType:
43    return cls.WEB_DRIVER
44
45  @classmethod
46  def parse(cls, value: Any) -> BrowserDriverType:
47    if isinstance(value, cls):
48      return value
49    if value == "":
50      return BrowserDriverType.default()
51    value = ObjectParser.non_empty_str(value, "driver_type")
52    identifier = value.lower()
53    if identifier in ("selenium", "webdriver"):
54      return BrowserDriverType.WEB_DRIVER
55    if identifier in ("applescript", "osa"):
56      return BrowserDriverType.APPLE_SCRIPT
57    if identifier in ("android", "adb"):
58      return BrowserDriverType.ANDROID
59    if identifier in ("iphone", "ios"):
60      return BrowserDriverType.IOS
61    if identifier == "ssh":
62      return BrowserDriverType.LINUX_SSH
63    if identifier == "chromeos-ssh":
64      return BrowserDriverType.CHROMEOS_SSH
65    raise argparse.ArgumentTypeError(f"Unknown driver type: {repr(value)}")
66
67  @property
68  def is_remote(self):
69    if self.name in ("ANDROID", "CHROMEOS_SSH", "LINUX_SSH"):
70      return True
71    return False
72
73  @property
74  def is_local(self):
75    return not self.is_remote
76
77
78class AmbiguousDriverIdentifier(argparse.ArgumentTypeError):
79  pass
80
81
82IOS_UUID_RE = re.compile(r"[0-9A-Z]+-[0-9A-Z-]+")
83
84
85@dataclasses.dataclass(frozen=True)
86class DriverConfig(ConfigObject):
87  type: BrowserDriverType = BrowserDriverType.default()
88  path: Optional[AnyPath] = None
89  device_id: Optional[str] = None
90  adb_bin: Optional[AnyPath] = None
91  settings: Optional[immutabledict] = None
92
93  @classmethod
94  def default(cls) -> DriverConfig:
95    return cls(BrowserDriverType.default())
96
97  @classmethod
98  def parse_str(cls, value: str) -> DriverConfig:
99    if not value:
100      raise argparse.ArgumentTypeError("Cannot parse empty string")
101    # Variant 1: $PATH
102    path: Optional[LocalPath] = pth.try_resolve_existing_path(value)
103    driver_type: BrowserDriverType = BrowserDriverType.default()
104    if path:
105      if path.stat().st_size == 0:
106        raise argparse.ArgumentTypeError(f"Driver path is empty file: {path}")
107    else:
108      if cls.value_has_path_prefix(value):
109        raise argparse.ArgumentTypeError(
110            f"Driver path does not exist: {repr(value)}")
111      if value[0] == "{":
112        # Variant 1: full hjson config
113        return cls.parse_inline_hjson(value)
114      # Variant 2: $DRIVER_TYPE
115      try:
116        driver_type = BrowserDriverType.parse(value)
117      except argparse.ArgumentTypeError as original_error:
118        try:
119          return cls.parse_short_settings(value, plt.PLATFORM)
120        except AmbiguousDriverIdentifier:  # pylint: disable=try-except-raise
121          raise
122        except ValueError as e:
123          logging.debug("Parsing short inline driver config failed: %s", e)
124          raise original_error from e
125    return DriverConfig(driver_type, path)
126
127  @classmethod
128  def parse_short_settings(cls, value: str,
129                           platform: plt.Platform) -> DriverConfig:
130    """Check for short versions and multiple candidates"""
131    logging.debug("Looking for driver candidates: %s", value)
132    candidate: Optional[DriverConfig]
133    if candidate := cls.try_parse_adb_settings(value, platform):
134      return candidate
135    if platform.is_macos:
136      if candidate := cls.try_parse_ios_settings(value, platform):
137        return candidate
138    # TODO: add more custom parsing here
139    raise ValueError("Unknown setting")
140
141  @classmethod
142  def try_parse_adb_settings(cls, value: str,
143                             platform: plt.Platform) -> Optional[DriverConfig]:
144    candidate_serials: List[str] = []
145    pattern: re.Pattern = cls.compile_search_pattern(value)
146    for serial, info in adb_devices(platform).items():
147      if pattern.fullmatch(serial):
148        candidate_serials.append(serial)
149        continue
150      print(info)
151      for key, info_value in info.items():
152        if (pattern.fullmatch(f"{key}:{info_value}") or
153            pattern.fullmatch(info_value)):
154          candidate_serials.append(serial)
155          break
156    if len(candidate_serials) > 1:
157      raise AmbiguousDriverIdentifier(
158          "Found more than one adb devices matching "
159          f"'{value}': {candidate_serials}")
160    if len(candidate_serials) == 0:
161      logging.debug("No matching adb devices found.")
162      return None
163    assert len(candidate_serials) == 1
164    return DriverConfig(
165        BrowserDriverType.ANDROID, device_id=candidate_serials[0])
166
167  @classmethod
168  def try_parse_ios_settings(cls, value: str,
169                             platform: plt.Platform) -> Optional[DriverConfig]:
170    candidate_serials: List[str] = []
171    pattern: re.Pattern = cls.compile_search_pattern(value)
172    for uuid, device_info in ios_devices(platform).items():
173      if pattern.fullmatch(uuid):
174        candidate_serials.append(uuid)
175        continue
176      if pattern.fullmatch(device_info.name):
177        candidate_serials.append(uuid)
178        continue
179    if len(candidate_serials) > 1:
180      raise AmbiguousDriverIdentifier(
181          "Found more than one ios devices matching "
182          f"'{value}': {candidate_serials}")
183    if len(candidate_serials) == 0:
184      logging.debug("No matching ios devices found.")
185      return None
186    assert len(candidate_serials) == 1
187    return DriverConfig(BrowserDriverType.IOS, device_id=candidate_serials[0])
188
189  @classmethod
190  def compile_search_pattern(cls, maybe_pattern: str) -> re.Pattern:
191    try:
192      return re.compile(maybe_pattern)
193    except Exception as e:  # pylint: disable=broad-except
194      logging.debug(
195          "Falling back to full string match for "
196          "invalid regexp search pattern: %s %s", maybe_pattern, e)
197      return re.compile(re.escape(maybe_pattern))
198
199  @classmethod
200  def parse_dict(cls, config: Dict[str, Any]) -> DriverConfig:
201    return cls.config_parser().parse(config)
202
203  @classmethod
204  def config_parser(cls) -> ConfigParser[DriverConfig]:
205    parser = ConfigParser("DriverConfig parser", cls)
206    parser.add_argument(
207        "type",
208        type=BrowserDriverType.parse,
209        default=BrowserDriverType.default())
210    # TODO: likely distinguish between local and remote driver path
211    parser.add_argument(
212        "path",
213        type=PathParser.binary_path,
214        required=False,
215        help="Path to the driver executable")
216    parser.add_argument(
217        "settings",
218        type=immutabledict,
219        help="Additional driver-dependent settings.")
220    parser.add_argument(
221        "device_id",
222        type=driver_device_id,
223        depends_on=("settings",),
224        help="Device ID / Serial ID / Unique device name")
225    parser.add_argument(
226        "adb_bin",
227        type=PathParser.binary_path,
228        required=False,
229        help="Path to the adb binary, only valid for Android.")
230    return parser
231
232  def __post_init__(self):
233    if not self.type:
234      raise ValueError(f"{type(self).__name__}.type cannot be None.")
235    try:
236      hash(self.settings)
237    except ValueError as e:
238      raise ValueError(
239          f"settings must be hashable but got: {self.settings}") from e
240    self.validate()
241
242  @property
243  def is_remote(self) -> bool:
244    return self.type.is_remote
245
246  @property
247  def is_local(self) -> bool:
248    return self.type.is_local
249
250  def validate(self) -> None:
251    if self.type == BrowserDriverType.ANDROID:
252      self.validate_android()
253    elif self.adb_bin:
254      raise argparse.ArgumentTypeError("adb_path is only valid for Android.")
255    if self.type == BrowserDriverType.IOS:
256      self.validate_ios()
257    if self.type == BrowserDriverType.CHROMEOS_SSH:
258      # Unlike the validation functions above for iOS and Android,
259      # which validate the "host" to which the device is connected,
260      # the ChromeOS validation function validates the "client".
261      # Consider moving this logic elsewhere in the future.
262      self.validate_chromeos()
263
264  def validate_android(self) -> None:
265    platform = plt.PLATFORM
266    devices = adb_devices(platform, self.adb_bin)
267    names = list(devices.keys())
268    if not devices:
269      raise argparse.ArgumentTypeError("No ADB devices attached.")
270    if not self.device_id:
271      if len(devices) == 1:
272        # Default device "adb" (no settings) with exactly one device is ok.
273        return
274      raise AmbiguousDriverIdentifier(
275          f"{len(devices)} ADB devices connected: {names}. "
276          "Please explicitly specify a device ID.")
277    if self.device_id not in devices:
278      raise argparse.ArgumentTypeError(
279          f"Could not find ADB device with device_id={repr(self.device_id)}. "
280          f"Choices are {names}.")
281    if self.adb_bin:
282      PathParser.binary_path(self.adb_bin, platform=platform)
283
284  def validate_chromeos(self) -> None:
285    platform = self.get_platform()
286    assert isinstance(platform, ChromeOsSshPlatform), \
287           f"Invalid platform: {platform}"
288    platform = cast(ChromeOsSshPlatform, platform)
289    if not platform.exists(platform.AUTOLOGIN_PATH):
290      raise ValueError(f"Could not find `autotest` on {platform.host}."
291                       "Please ensure that it is running a test image:"
292                       "go/arc-setup-dev-mode-dut#usb-cros-test-image")
293
294  def validate_ios(self) -> None:
295    devices: Dict[str, Any] = ios_devices(plt.PLATFORM)
296    if not devices:
297      raise argparse.ArgumentTypeError("No iOS devices attached.")
298    names = list(map(str, devices))
299    if not self.device_id:
300      if len(devices) == 1:
301        # Default device "ios" (no settings) with exactly one device is ok.
302        return
303      raise AmbiguousDriverIdentifier(
304          f"{len(devices)} ios devices connected: {names}. "
305          "Please explicitly specify a device UUID.")
306    if self.device_id not in devices:
307      raise argparse.ArgumentTypeError(
308          f"Could not find ios device with device_id={repr(self.device_id)}. "
309          f"Choices are {names}.")
310
311  def get_platform(self) -> plt.Platform:
312    if self.type == BrowserDriverType.ANDROID:
313      return self.get_adb_platform()
314    if self.type == BrowserDriverType.IOS:
315      # TODO(cbruni): use `xcrun xctrace list devices` to find the UDID
316      # for attached simulators or devices. Currently only a single device
317      # is supported
318      pass
319    if self.type in (BrowserDriverType.LINUX_SSH,
320                     BrowserDriverType.CHROMEOS_SSH):
321      return self.get_ssh_platform()
322    return plt.PLATFORM
323
324  def get_ssh_platform(self) -> plt.Platform:
325    assert self.settings
326    host = ObjectParser.non_empty_str(self.settings.get("host"), "host")
327    port = NumberParser.port_number(self.settings.get("port"), "port")
328    ssh_port = NumberParser.port_number(
329        self.settings.get("ssh_port"), "ssh port")
330    ssh_user = ObjectParser.non_empty_str(
331        self.settings.get("ssh_user"), "ssh user")
332    if self.type == BrowserDriverType.CHROMEOS_SSH:
333      return ChromeOsSshPlatform(
334          plt.PLATFORM,
335          host=host,
336          port=port,
337          ssh_port=ssh_port,
338          ssh_user=ssh_user)
339    return plt.LinuxSshPlatform(
340        plt.PLATFORM,
341        host=host,
342        port=port,
343        ssh_port=ssh_port,
344        ssh_user=ssh_user)
345
346  def get_adb_platform(self) -> plt.Platform:
347    adb = Adb(plt.PLATFORM, self.device_id, self.adb_bin)
348    return AndroidAdbPlatform(plt.PLATFORM, self.device_id, adb)
349
350def driver_device_id(device_id: Optional[str],
351                     settings: Optional[immutabledict]) -> Optional[str]:
352  if not settings:
353    return device_id
354  settings_device_id = settings.get("device_id")
355  if not device_id:
356    return settings_device_id
357  if settings_device_id != device_id:
358    raise TypeError("Conflicting both driver['settings']['device_id'] "
359                    "and driver['device_id']: "
360                    f"{repr(settings_device_id)} vs {repr(device_id)}")
361  return device_id
362