• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5# Modified from chrome's catapult project.
6
7from __future__ import annotations
8
9import atexit
10import contextlib
11import locale
12import logging
13import os
14import re
15import shlex
16import signal
17import subprocess
18import sys
19from typing import IO, TYPE_CHECKING, Iterator, List, Optional, Union
20
21from crossbench import helper
22from crossbench.flags.base import Flags
23from crossbench.helper.path_finder import TsProxyFinder
24from crossbench.network.traffic_shaping.base import TrafficShaper
25from crossbench.parse import NumberParser, PathParser
26
27if TYPE_CHECKING:
28  from crossbench.browsers.attributes import BrowserAttributes
29  from crossbench.network.base import Network
30  from crossbench.path import AnyPath, LocalPath
31  from crossbench.plt.base import ListCmdArgs, Platform
32  from crossbench.runner.groups.session import BrowserSessionRunGroup
33
34fcntl = None
35try:
36  import fcntl
37except ModuleNotFoundError as not_found:
38  logging.debug("No fcntl support %s", not_found)
39
40
41
42class TsProxyServerError(Exception):
43  """Catch-all exception for tsProxy Server."""
44
45
46_PORT_RE = re.compile(r"Started Socks5 proxy server on "
47                      r"(?P<host>[^:]*):"
48                      r"(?P<port>\d+)")
49DEFAULT_TIMEOUT = 5
50
51
52def parse_ts_socks_proxy_port(output_line):
53  if match := _PORT_RE.match(output_line):
54    return int(match.group("port"))
55  return None
56
57
58# TODO: improve and double check
59TRAFFIC_SETTINGS = {
60    "3G-slow": {
61        "rtt_ms": 400,
62        "in_kbps": 400,
63        "out_kbps": 400,
64    },
65    "3G-regular": {
66        "rtt_ms": 300,
67        "in_kbps": 1600,
68        "out_kbps": 768,
69    },
70    "3G-fast": {
71        "rtt_ms": 150,
72        "in_kbps": 1600,
73        "out_kbps": 768,
74    },
75    "4G": {
76        "rtt_ms": 170,
77        "in_kbps": 9000,
78        "out_kbps": 9000,
79    },
80}
81
82
83class TsProxyServer:
84  """
85  TsProxy provides basic latency, download and upload traffic shaping. This
86  class provides a programming API to the tsproxy script in
87  catapult/third_party/tsproxy/tsproxy.py
88
89  This class can be used as a context manager.
90  """
91
92  def __init__(self,
93               ts_proxy_path: LocalPath,
94               host: Optional[str] = None,
95               socks_proxy_port: Optional[int] = None,
96               http_port: Optional[int] = None,
97               https_port: Optional[int] = None,
98               rtt_ms: Optional[int] = None,
99               in_kbps: Optional[int] = None,
100               out_kbps: Optional[int] = None,
101               window: Optional[int] = None,
102               verbose: bool = True):
103    self._proc: Optional[TsProxyProcess] = None
104    self._ts_proxy_path = PathParser.existing_file_path(ts_proxy_path)
105    self._socks_proxy_port = socks_proxy_port
106    self._host = host
107    self._http_port = http_port
108    self._https_port = https_port
109    self._rtt_ms = rtt_ms
110    self._in_kbps = in_kbps
111    self._out_kbps = out_kbps
112    self._window = window
113    self._verbose = verbose
114    self.verify_ports(http_port, https_port)
115
116  @classmethod
117  def verify_ports(cls,
118                   http_port: Optional[int] = None,
119                   https_port: Optional[int] = None) -> None:
120    if https_port and not bool(http_port):
121      raise ValueError(f"Got https_port={https_port} without a http port")
122    if http_port is not None and http_port == https_port:
123      raise ValueError("http_port and https_port must be different, "
124                       f"got {https_port} twice.")
125    if http_port is not None:
126      NumberParser.port_number(http_port, "http_port")
127    if https_port is not None:
128      NumberParser.port_number(https_port, "https_port")
129
130  @property
131  def is_running(self) -> bool:
132    return self._proc is not None
133
134  def set_traffic_settings(self,
135                           rtt_ms: Optional[int] = None,
136                           in_kbps: Optional[int] = None,
137                           out_kbps: Optional[int] = None,
138                           window: Optional[int] = None,
139                           timeout=DEFAULT_TIMEOUT) -> None:
140    assert self._proc, "ts_proxy is not running."
141    self._proc.set_traffic_settings(rtt_ms, in_kbps, out_kbps, window, timeout)
142
143  @property
144  def socks_proxy_port(self) -> int:
145    assert self._proc, "ts_proxy is not running."
146    return self._proc.socks_proxy_port
147
148  @property
149  def ts_proxy_path(self) -> LocalPath:
150    return self._ts_proxy_path
151
152  @property
153  def rtt_ms(self) -> Optional[int]:
154    return self._rtt_ms
155
156  @property
157  def in_kbps(self) -> Optional[int]:
158    return self._in_kbps
159
160  @property
161  def out_kbps(self) -> Optional[int]:
162    return self._out_kbps
163
164  @property
165  def window(self) -> Optional[int]:
166    return self._window
167
168  def start(self) -> None:
169    assert not self._proc, "ts_proxy is already running."
170    self._proc = TsProxyProcess(self._ts_proxy_path, self._host,
171                                self._socks_proxy_port, self._http_port,
172                                self._https_port, self._rtt_ms, self._in_kbps,
173                                self._out_kbps, self._window, self._verbose)
174    atexit.register(self.stop)
175
176  def stop(self) -> Optional[str]:
177    if not self._proc:
178      logging.debug("TsProxy: Attempting to stop server that is not running.")
179      return None
180    assert self._proc
181    err = self._proc.stop()
182    self._proc = None
183    return err
184
185  def __enter__(self):
186    self.start()
187    return self
188
189  def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb):
190    self.stop()
191
192
193class TsProxyProcess:
194  """Separate wrapper around the ts_proxy to simplify pytype testing."""
195
196  def __init__(self,
197               ts_proxy_path: LocalPath,
198               host: Optional[str] = None,
199               socks_proxy_port: Optional[int] = None,
200               http_port: Optional[int] = None,
201               https_port: Optional[int] = None,
202               rtt_ms: Optional[int] = None,
203               in_kbps: Optional[int] = None,
204               out_kbps: Optional[int] = None,
205               window: Optional[int] = None,
206               verbose: bool = False,
207               timeout: Union[int, float] = DEFAULT_TIMEOUT) -> None:
208    """Start TsProxy server and verify that it started."""
209    cmd: ListCmdArgs = [
210        sys.executable,
211        ts_proxy_path,
212    ]
213    self._socks_proxy_port: Optional[int] = socks_proxy_port
214    self._initial_socks_proxy_port: Optional[int] = socks_proxy_port
215    if not socks_proxy_port:
216      # Use port 0 so tsproxy picks a random available port.
217      cmd.append("--port=0")
218    else:
219      cmd.append(f"--port={socks_proxy_port}")
220    if verbose:
221      cmd.append("--verbose")
222    self._in_kbps: Optional[int] = in_kbps
223    if in_kbps:
224      cmd.append(f"--inkbps={in_kbps}")
225    self._out_kbps: Optional[int] = out_kbps
226    if out_kbps:
227      cmd.append(f"--outkbps={out_kbps}")
228    self._window: Optional[int] = window
229    if window:
230      cmd.append(f"--window={window}")
231    self._rtt_ms: Optional[int] = rtt_ms
232    if rtt_ms:
233      cmd.append(f"--rtt={rtt_ms}")
234    self._host: Optional[str] = host
235    if host:
236      cmd.append(f"--desthost={host}")
237    self._http_port: Optional[int] = http_port
238    self._https_port: Optional[int] = https_port
239    TsProxyServer.verify_ports(http_port, https_port)
240    mapports = []
241    if https_port:
242      mapports.append(f"443:{https_port}")
243    if http_port:
244      mapports.append(f"*:{http_port}")
245    cmd.append(f"--mapports={','.join(mapports)}")
246    logging.info("TsProxy: commandline: %s", shlex.join(map(str, cmd)))
247    self._verify_default_encoding()
248    # In python3 universal_newlines forces subprocess to encode/decode,
249    # allowing per-line buffering.
250    proc = subprocess.Popen(  # pylint: disable=consider-using-with
251        cmd,
252        stdout=subprocess.PIPE,
253        stdin=subprocess.PIPE,
254        # stderr=subprocess.PIPE,
255        bufsize=1,
256        universal_newlines=True)
257    assert proc and proc.stdout and proc.stdin, "Could not start ts_proxy"
258    self._proc = proc
259    if stdout := proc.stdout:
260      self._stdout: IO[str] = stdout
261    else:
262      raise RuntimeError("Missing stdout")
263    if stdin := proc.stdin:
264      self._stdin: IO[str] = stdin
265    else:
266      raise RuntimeError("Missing stdin")
267    if fcntl:  # pylint: disable=using-constant-test
268      self._setup_non_blocking_io()
269    self._wait_for_startup(timeout)
270
271  def _setup_non_blocking_io(self) -> None:
272    logging.debug("TsProxy: fcntl is supported, trying to set "
273                  "non blocking I/O for the ts_proxy process")
274    fd = self._stdout.fileno()
275    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
276    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)  # pylint: disable=no-member
277
278  @property
279  def socks_proxy_port(self) -> int:
280    if self._socks_proxy_port is None:
281      raise RuntimeError("ts_proxy didn't start")
282    return self._socks_proxy_port
283
284  def _verify_default_encoding(self) -> None:
285    # In python3 subprocess handles encoding/decoding; this warns if it won't
286    # be UTF-8.
287    encoding = locale.getpreferredencoding()
288    if encoding != "UTF-8":
289      logging.warning("Decoding will use %s instead of UTF-8", encoding)
290
291  def _wait_for_startup(self, timeout: Union[int, float]) -> None:
292    for _ in helper.wait_with_backoff(timeout):
293      if self._has_started():
294        logging.info("TsProxy: port=%i", self._socks_proxy_port)
295        return
296    if err := self.stop():
297      logging.error("TsProxy: Error stopping WPR server:\n%s", err)
298    raise TsProxyServerError(
299        f"Starting tsproxy timed out after {timeout} seconds")
300
301  def _has_started(self) -> bool:
302    if self._proc.poll() is not None:
303      return False
304    self._stdout.flush()
305    output_line = self._read_line_ts_proxy_stdout(timeout=5)
306    if not output_line:
307      return False
308    logging.debug("TsProxy: output: %s", output_line)
309    port = parse_ts_socks_proxy_port(output_line)
310    self._socks_proxy_port = NumberParser.port_number(port, "socks_proxy_port")
311    return True
312
313  def _read_line_ts_proxy_stdout(self, timeout: Union[int, float]) -> str:
314    for _ in helper.wait_with_backoff(timeout):
315      try:
316        return self._stdout.readline().strip()
317      except IOError as io_error:
318        logging.debug("TsProxy: Error while reading tsproxy line: %s", io_error)
319    return ""
320
321  def _send_command(self,
322                    command: str,
323                    timeout: Union[int, float] = DEFAULT_TIMEOUT) -> None:
324    logging.debug("TsProxy: Sending command to ts_proxy_server: %s", command)
325    self._stdin.write(f"{command}\n")
326    command_output = self._wait_for_status_response(timeout)
327    success = "OK" in command_output
328    logging.log(logging.DEBUG if success else logging.ERROR,
329                "TsProxy: output:\n%s", "\n".join(command_output))
330    if not success:
331      raise TsProxyServerError(f"Failed to execute command: {command}")
332
333  def _wait_for_status_response(self, timeout: Union[int, float]) -> List[str]:
334    logging.debug("TsProxy: waiting for status response")
335    command_output = []
336    for _ in helper.wait_with_backoff(timeout):
337      self._stdin.flush()
338      self._stdout.flush()
339      last_output = self._read_line_ts_proxy_stdout(timeout)
340      command_output.append(last_output)
341      if last_output in ("OK", "ERROR"):
342        break
343    return command_output
344
345  def set_traffic_settings(self,
346                           rtt_ms: Optional[int] = None,
347                           in_kbps: Optional[int] = None,
348                           out_kbps: Optional[int] = None,
349                           window: Optional[int] = None,
350                           timeout=DEFAULT_TIMEOUT) -> None:
351    if rtt_ms is not None and self._rtt_ms != rtt_ms:
352      assert rtt_ms >= 0, f"Invalid rtt value: {rtt_ms}"
353      self._send_command(f"set rtt {rtt_ms}", timeout)
354      self._rtt_ms = rtt_ms
355
356    if in_kbps is not None and self._in_kbps != in_kbps:
357      assert in_kbps >= 0, f"Invalid in_kbps value: {in_kbps}"
358      self._send_command(f"set inkbps {in_kbps}", timeout)
359      self._in_kbps = in_kbps
360
361    if out_kbps is not None and self._out_kbps != out_kbps:
362      assert out_kbps >= 0, f"Invalid out_kbps value: {out_kbps}"
363      self._send_command(f"set outkbps {out_kbps}", timeout)
364      self._out_kbps = out_kbps
365
366    if window is not None and self._window != window:
367      assert window >= 0, f"Invalid window value: {window}"
368      self._send_command(f"set window {window}", timeout)
369      self._window = window
370
371  def stop(self) -> Optional[str]:
372    self._send_command("exit")
373    helper.wait_and_kill(self._proc, signal=signal.SIGINT)
374    _, err = self._proc.communicate()
375    self._socks_proxy_port = self._initial_socks_proxy_port
376    return err
377
378
379class TsProxyTrafficShaper(TrafficShaper):
380
381  def __init__(self,
382               browser_platform: Platform,
383               ts_proxy_path: Optional[AnyPath] = None,
384               rtt_ms: Optional[int] = None,
385               in_kbps: Optional[int] = None,
386               out_kbps: Optional[int] = None,
387               window: Optional[int] = None):
388    super().__init__(browser_platform)
389    if not ts_proxy_path:
390      if maybe_ts_proxy_path := TsProxyFinder(self.host_platform).path:
391        ts_proxy_path = self.host_platform.local_path(maybe_ts_proxy_path)
392    if not ts_proxy_path:
393      raise RuntimeError(
394          f"Could not find ts_proxy script on {self.host_platform}")
395    # Early instantiation to validate inputs.
396    self._ts_proxy = TsProxyServer(
397        self.host_platform.local_path(ts_proxy_path),
398        rtt_ms=rtt_ms,
399        in_kbps=in_kbps,
400        out_kbps=out_kbps,
401        window=window)
402    # TODO: support custom name
403    self._name = "tsproxy"
404
405  @property
406  def ts_proxy(self) -> TsProxyServer:
407    return self._ts_proxy
408
409  @contextlib.contextmanager
410  def open(self, network: Network,
411           session: BrowserSessionRunGroup) -> Iterator[TrafficShaper]:
412    if not network.is_live:
413      self._ts_proxy = self._create_remapping_ts_proxy(network)
414
415    with super().open(network, session):
416      logging.debug("Starting TS Proxy")
417      with self._ts_proxy:
418        with self._forward_ports(network, session):
419          yield self
420
421  def _create_remapping_ts_proxy(self, network) -> TsProxyServer:
422    return TsProxyServer(
423        self._ts_proxy.ts_proxy_path,
424        rtt_ms=self._ts_proxy.rtt_ms,
425        in_kbps=self._ts_proxy.in_kbps,
426        out_kbps=self._ts_proxy.out_kbps,
427        window=self._ts_proxy.window,
428        host=network.host,
429        http_port=network.http_port,
430        https_port=network.https_port)
431
432  @contextlib.contextmanager
433  def _forward_ports(self, network: Network,
434                     session: BrowserSessionRunGroup) -> Iterator:
435    del network
436    browser_platform = session.browser_platform
437    ts_proxy_port = self._ts_proxy.socks_proxy_port
438    # TODO; remap network port for remote browsers or when ports are occupied
439    # already.
440    if browser_platform.is_remote:
441      browser_platform.reverse_port_forward(ts_proxy_port, ts_proxy_port)
442    yield
443    if browser_platform.is_remote:
444      browser_platform.stop_reverse_port_forward(ts_proxy_port)
445
446  def extra_flags(self, browser_attributes: BrowserAttributes) -> Flags:
447    if not browser_attributes.is_chromium_based:
448      raise ValueError(
449          "Only chromium-based browsers are supported with ts_proxy.")
450    # TODO: support port forwarding to remote device
451    assert browser_attributes.is_local, "Only local browsers supported for now"
452    assert self.is_running, "TrafficShaper is not running."
453    assert self._ts_proxy.socks_proxy_port, "ts_proxy is not running"
454    return Flags({
455        "--proxy-server":
456            f"socks://127.0.0.1:{self._ts_proxy.socks_proxy_port}",
457        "--proxy-bypass-list":
458            "<-loopback>"
459    })
460
461  def __str__(self) -> str:
462    return self._name
463