• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import contextlib
8import logging
9import re
10from typing import TYPE_CHECKING, Iterator, Optional, Union
11from urllib.parse import urlparse
12
13from crossbench import exception
14from crossbench import path as pth
15from crossbench import plt
16from crossbench.helper import Spinner
17from crossbench.network.base import Network
18from crossbench.parse import PathParser
19
20if TYPE_CHECKING:
21  from crossbench.network.traffic_shaping.base import TrafficShaper
22  from crossbench.path import LocalPath
23  from crossbench.runner.groups.session import BrowserSessionRunGroup
24
25
26GS_PREFIX = "gs://"
27GSUTIL_LS_MD5_RE = re.compile(r"Hash \(md5\):\s*([A-Za-z0-9+/]+)=*")
28
29
30class ReplayNetwork(Network):
31  """ A network implementation that can be used to replay requests
32  from a an archive."""
33
34  def __init__(self,
35               archive: Union[pth.LocalPath, str],
36               traffic_shaper: Optional[TrafficShaper] = None,
37               browser_platform: plt.Platform = plt.PLATFORM):
38    super().__init__(traffic_shaper, browser_platform)
39    self._archive_path = self._ensure_archive(archive)
40
41  @property
42  def is_wpr(self) -> bool:
43    return True
44
45  @property
46  def archive_path(self) -> LocalPath:
47    return self._archive_path
48
49  @contextlib.contextmanager
50  def open(self, session: BrowserSessionRunGroup) -> Iterator[ReplayNetwork]:
51    with super().open(session):
52      with self._open_replay_server(session):
53        with self._traffic_shaper.open(self, session):
54          yield self
55
56  @contextlib.contextmanager
57  def _open_replay_server(self, session: BrowserSessionRunGroup):
58    del session
59    yield
60
61  def _generate_filename(self, url: str) -> str:
62    metadata = self.host_platform.sh_stdout("gsutil", "ls", "-L", url)
63    if md5_search := GSUTIL_LS_MD5_RE.search(metadata):
64      md5 = md5_search.group(1)
65      safe_md5 = pth.safe_filename(md5)
66      url_path = pth.AnyPosixPath(urlparse(url).path)
67      return f"{url_path.stem}_{safe_md5}{url_path.suffix}"
68    raise RuntimeError(f"Could not find md5 hash in gsutil output: {metadata}")
69
70  def _download_gcloud_archive(self, url: str) -> LocalPath:
71    with exception.annotate(f"Downloading {url}"), Spinner():
72      local_path = (
73          self.host_platform.local_cache_dir("wpr") /
74          self._generate_filename(url))
75      if local_path.is_file():
76        logging.info("Found cached WPR archive: %s", local_path)
77        return local_path
78      logging.info("Downloading WPR archive from %s to %s", url, local_path)
79      self.host_platform.sh("gsutil", "cp", url, local_path)
80    return local_path
81
82  def _ensure_archive(self, archive: Union[pth.LocalPath, str]) -> LocalPath:
83    if isinstance(archive, str) and archive.startswith(GS_PREFIX):
84      return self._download_gcloud_archive(url=archive)
85    return PathParser.existing_file_path(archive).resolve()
86