1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import logging 9import os 10import plistlib 11import re 12import shutil 13import sys 14import tempfile 15from typing import TYPE_CHECKING, Final, Iterable, Optional, Tuple, Type, Union 16 17from crossbench import path as pth 18from crossbench.browsers.version import BrowserVersion, UnknownBrowserVersion 19from crossbench.helper import Spinner 20 21if TYPE_CHECKING: 22 from crossbench.plt.base import Platform 23 24 25class IncompatibleVersionError(ValueError): 26 pass 27 28 29class Downloader(abc.ABC): 30 ARCHIVE_SUFFIX: str = "" 31 ANY_MARKER: Final = 9999 32 APP_VERSION_RE = re.compile(r"(?P<version>[\d\.ab]+)") 33 34 @classmethod 35 @abc.abstractmethod 36 def _get_loader_cls(cls, browser_platform: Platform) -> Type[Downloader]: 37 pass 38 39 @classmethod 40 def is_valid(cls, path_or_identifier: pth.AnyPathLike, 41 browser_platform: Platform) -> bool: 42 return cls._get_loader_cls(browser_platform).is_valid( 43 path_or_identifier, browser_platform) 44 45 @classmethod 46 @abc.abstractmethod 47 def is_valid_version(cls, path_or_identifier: str): 48 pass 49 50 @classmethod 51 def load(cls, archive_path_or_version_identifier: Union[str, pth.LocalPath], 52 browser_platform: Platform) -> pth.LocalPath: 53 logging.debug("Downloading chrome %s binary for %s", 54 archive_path_or_version_identifier, browser_platform) 55 loader_cls: Type[Downloader] = cls._get_loader_cls(browser_platform) 56 loader: Downloader = loader_cls(archive_path_or_version_identifier, "", "", 57 browser_platform) 58 return loader.app_path 59 60 def __init__(self, archive_path_or_version_identifier: Union[str, 61 pth.LocalPath], 62 browser_type: str, platform_name: str, 63 browser_platform: Platform): 64 assert browser_type, "Missing browser_type" 65 self._browser_type = browser_type 66 self._browser_platform = browser_platform 67 self._platform_name = platform_name 68 assert platform_name, "Missing platform_name" 69 self._archive_url: str = "" 70 self._archive_path: pth.LocalPath = pth.LocalPath() 71 self._out_dir: pth.LocalPath = ( 72 self.host_platform.local_cache_dir("browser_bin")) 73 self._archive_dir: pth.LocalPath = ( 74 self.host_platform.local_cache_dir("browser_archive")) 75 self._archive_dir.mkdir(parents=True, exist_ok=True) 76 self._app_path: pth.LocalPath = pth.LocalPath() 77 self._requested_version: BrowserVersion = UnknownBrowserVersion() 78 with Spinner(): 79 self._app_path = self.find(archive_path_or_version_identifier) 80 self._validate() 81 82 def find( 83 self, archive_path_or_version_identifier: Union[str, pth.LocalPath] 84 ) -> pth.LocalPath: 85 version_value = os.fspath(archive_path_or_version_identifier) 86 if self.is_valid_version(version_value): 87 self._requested_version = self._parse_version(version_value) 88 self._pre_check() 89 sys.stdout.write(f" BROWSER: Looking for {self._requested_version}\r") 90 return self._load_from_version() 91 92 self._archive_path = pth.LocalPath(archive_path_or_version_identifier) 93 self._pre_check() 94 if not archive_path_or_version_identifier or ( 95 not self._archive_path.exists()): 96 raise ValueError( 97 f"{self._browser_type} archive does not exist: {self._archive_path}") 98 return self._load_from_archive() 99 100 def _validate(self) -> None: 101 assert self._app_path != pth.LocalPath(), "Did not set app_path" 102 assert self._is_app_installed(self._app_path), ( 103 f"Could not extract {self._browser_type} binary: {self._app_path}") 104 logging.debug("Extracted app: %s", self._app_path) 105 106 @property 107 def app_path(self) -> pth.LocalPath: 108 assert self._is_app_installed(self._app_path), "Could not download browser" 109 return self._app_path 110 111 @property 112 def host_platform(self) -> Platform: 113 return self._browser_platform.host_platform 114 115 def _pre_check(self) -> None: 116 pass 117 118 def _is_app_installed(self, app_path: pth.LocalPath) -> bool: 119 return self._browser_platform.search_app(app_path) is not None 120 121 def _find_matching_installed_version(self) -> Optional[pth.LocalPath]: 122 app_path: pth.LocalPath = self._installed_app_path() 123 if self._is_app_installed(app_path): 124 return app_path 125 return None 126 127 def _create_archive_path(self, version: BrowserVersion) -> pth.LocalPath: 128 version_name = str(version).replace(" ", "_") 129 return self._archive_dir / (f"{version_name}{self.ARCHIVE_SUFFIX}") 130 131 def _load_from_version(self) -> pth.LocalPath: 132 self._archive_path = self._create_archive_path(self._requested_version) 133 if app_path := self._find_matching_installed_version(): 134 if cached_version := self._validate_installed(app_path): 135 logging.info("CACHED BROWSER: %s %s", cached_version, self._app_path) 136 return app_path 137 self._requested_version_validation() 138 if not self._try_download_version_archive(): 139 logging.info("CACHED DOWNLOAD: %s", self._archive_path) 140 self._install_archive(self._archive_path) 141 return self._installed_app_path() 142 143 def _try_download_version_archive(self): 144 if self._archive_path.exists(): 145 return False 146 archive_version, archive_url = self._find_archive_url() 147 if not archive_url: 148 raise ValueError( 149 f"Could not find matching version for {self._requested_version}") 150 self._archive_url = archive_url 151 self._archive_path = self._create_archive_path(archive_version) 152 if self._archive_path.exists(): 153 return False 154 logging.info("DOWNLOADING %s", self._archive_url) 155 with tempfile.TemporaryDirectory(prefix="cb_download_") as tmp_dir_name: 156 tmp_dir = pth.LocalPath(tmp_dir_name) 157 self._download_archive(self._archive_url, tmp_dir) 158 return True 159 160 @abc.abstractmethod 161 def _requested_version_validation(self) -> None: 162 pass 163 164 def _load_from_archive(self) -> pth.LocalPath: 165 assert not self._requested_version.is_complete 166 assert self._archive_path.exists() 167 logging.info("EXTRACTING ARCHIVE: %s", self._archive_path) 168 original_out_dir = self._out_dir 169 with tempfile.TemporaryDirectory( 170 prefix="cb_extract_", dir=original_out_dir) as tmpdir: 171 # Extract input archive to temp dir for version extraction. 172 self._out_dir = pth.LocalPath(tmpdir) 173 temp_extracted_path = self._extract_unknown_version_archive() 174 self._out_dir = original_out_dir 175 # Install temporary extracted version 176 versioned_path = self._extracted_path() 177 app_path = self._installed_app_path() 178 if self._is_app_installed(app_path): 179 cached_version = self._validate_installed(app_path) 180 logging.info("CACHED BROWSER: %s %s", cached_version, app_path) 181 else: 182 assert not versioned_path.exists() 183 temp_extracted_path.rename(versioned_path) 184 return app_path 185 186 def _extract_unknown_version_archive(self) -> pth.LocalPath: 187 tmp_app_path: pth.LocalPath = self._installed_app_path() 188 temp_extracted_path = self._extracted_path() 189 self._install_archive(self._archive_path) 190 logging.debug("Parsing browser version: %s", tmp_app_path) 191 assert self._is_app_installed(tmp_app_path), ( 192 f"Extraction failed, app does not exist: {tmp_app_path}") 193 full_version_string = self._browser_platform.app_version(tmp_app_path) 194 self._requested_version = self._parse_version(full_version_string) 195 assert self._requested_version.is_complete 196 return temp_extracted_path 197 198 199 @abc.abstractmethod 200 def _parse_version(self, version_identifier: str) -> BrowserVersion: 201 pass 202 203 def _extracted_path(self) -> pth.LocalPath: 204 # TODO: support local vs remote 205 return self._out_dir / str(self._requested_version).replace(" ", "_") 206 207 @abc.abstractmethod 208 def _installed_app_path(self) -> pth.LocalPath: 209 pass 210 211 def _installed_app_version(self, app_path: pth.LocalPath) -> BrowserVersion: 212 raw_version = self._browser_platform.app_version(app_path) 213 return self._parse_version(raw_version) 214 215 def _validate_installed(self, app_path: pth.LocalPath) -> BrowserVersion: 216 cached_version: BrowserVersion = self._installed_app_version(app_path) 217 msg: str = "" 218 expected_version_str: str = str(self._requested_version) 219 if self._requested_version.is_complete: 220 if self._requested_version.contains(cached_version): 221 return cached_version 222 msg = (f"Previously downloaded browser at {app_path} " 223 "might have been auto-updated.\n") 224 else: 225 requested_milestone: int = self._requested_version.major 226 logging.debug("Validating installed milestone %s", requested_milestone) 227 latest_milestone_version, _ = self._find_archive_url() 228 if cached_version == latest_milestone_version: 229 return cached_version 230 msg = (f"Previously downloaded browser at {app_path} " 231 f"does not match latest milestone {requested_milestone} " 232 f"version: {latest_milestone_version}.\n") 233 expected_version_str = ( 234 f"{self._requested_version}/{latest_milestone_version}") 235 msg += ("Please delete the old version and re-install/-download it.\n" 236 f"Expected: {expected_version_str} Got: {cached_version}") 237 logging.debug(msg) 238 raise IncompatibleVersionError(msg) 239 240 @abc.abstractmethod 241 def _find_archive_url(self) -> Tuple[BrowserVersion, Optional[str]]: 242 pass 243 244 @abc.abstractmethod 245 def _archive_urls( 246 self, folder_url: str, 247 version: BrowserVersion) -> Iterable[Tuple[BrowserVersion, str]]: 248 pass 249 250 @abc.abstractmethod 251 def _download_archive(self, archive_url: str, tmp_dir: pth.LocalPath) -> None: 252 pass 253 254 @abc.abstractmethod 255 def _install_archive(self, archive_path: pth.LocalPath) -> None: 256 pass 257 258 259class ArchiveHelper(abc.ABC): 260 261 @classmethod 262 @abc.abstractmethod 263 def extract(cls, platform: Platform, archive_path: pth.LocalPath, 264 dest_path: pth.LocalPath) -> pth.LocalPath: 265 pass 266 267 268class RPMArchiveHelper(ArchiveHelper): 269 270 @classmethod 271 def extract(cls, platform: Platform, archive_path: pth.LocalPath, 272 dest_path: pth.LocalPath) -> pth.LocalPath: 273 assert platform.which("rpm2cpio"), ( 274 "Need rpm2cpio to extract downloaded .rpm archive") 275 assert platform.which("cpio"), ( 276 "Need cpio to extract downloaded .rpm archive") 277 cpio_file = archive_path.with_suffix(".cpio") 278 assert not cpio_file.exists() 279 archive_path.parent.mkdir(parents=True, exist_ok=True) 280 with cpio_file.open("w") as f: 281 platform.sh("rpm2cpio", archive_path, stdout=f) 282 assert cpio_file.is_file(), f"Could not extract archive: {archive_path}" 283 assert not dest_path.exists() 284 with cpio_file.open() as f: 285 platform.sh( 286 "cpio", 287 "--extract", 288 f"--directory={dest_path}", 289 "--make-directories", 290 stdin=f) 291 cpio_file.unlink() 292 if not dest_path.exists(): 293 raise ValueError(f"Could not extract archive to {dest_path}") 294 return dest_path 295 296 297class DMGArchiveHelper: 298 299 @classmethod 300 def extract(cls, platform: Platform, archive_path: pth.LocalPath, 301 dest_path: pth.LocalPath) -> pth.LocalPath: 302 assert platform.is_macos, "DMG are only supported on macOS." 303 assert not platform.is_remote, "Remote platform not supported yet" 304 result = platform.sh_stdout("hdiutil", "attach", "-plist", 305 archive_path).strip() 306 data = plistlib.loads(str.encode(result)) 307 dmg_path: Optional[pth.LocalPath] = None 308 for item in data["system-entities"]: 309 mount_point = item.get("mount-point", None) 310 if mount_point: 311 dmg_path = pth.LocalPath(mount_point) 312 if dmg_path.exists(): 313 break 314 if not dmg_path: 315 raise ValueError("Could not mount downloaded disk image") 316 apps = list(dmg_path.glob("*.app")) 317 assert len(apps) == 1, "Mounted disk image contains more than 1 app" 318 app = apps[0] 319 try: 320 logging.info("COPYING BROWSER src=%s dst=%s", app, dest_path) 321 shutil.copytree( 322 os.fspath(app), 323 os.fspath(dest_path), 324 symlinks=True, 325 dirs_exist_ok=False) 326 finally: 327 platform.sh("hdiutil", "detach", dmg_path) 328 if not dest_path.exists(): 329 raise ValueError(f"Could not extract archive to {dest_path}") 330 return dest_path 331