1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5# Modified from chrome's catapult project. 6 7from __future__ import annotations 8 9import atexit 10import contextlib 11import locale 12import logging 13import os 14import re 15import shlex 16import signal 17import subprocess 18import sys 19from typing import IO, TYPE_CHECKING, Iterator, List, Optional, Union 20 21from crossbench import helper 22from crossbench.flags.base import Flags 23from crossbench.helper.path_finder import TsProxyFinder 24from crossbench.network.traffic_shaping.base import TrafficShaper 25from crossbench.parse import NumberParser, PathParser 26 27if TYPE_CHECKING: 28 from crossbench.browsers.attributes import BrowserAttributes 29 from crossbench.network.base import Network 30 from crossbench.path import AnyPath, LocalPath 31 from crossbench.plt.base import ListCmdArgs, Platform 32 from crossbench.runner.groups.session import BrowserSessionRunGroup 33 34fcntl = None 35try: 36 import fcntl 37except ModuleNotFoundError as not_found: 38 logging.debug("No fcntl support %s", not_found) 39 40 41 42class TsProxyServerError(Exception): 43 """Catch-all exception for tsProxy Server.""" 44 45 46_PORT_RE = re.compile(r"Started Socks5 proxy server on " 47 r"(?P<host>[^:]*):" 48 r"(?P<port>\d+)") 49DEFAULT_TIMEOUT = 5 50 51 52def parse_ts_socks_proxy_port(output_line): 53 if match := _PORT_RE.match(output_line): 54 return int(match.group("port")) 55 return None 56 57 58# TODO: improve and double check 59TRAFFIC_SETTINGS = { 60 "3G-slow": { 61 "rtt_ms": 400, 62 "in_kbps": 400, 63 "out_kbps": 400, 64 }, 65 "3G-regular": { 66 "rtt_ms": 300, 67 "in_kbps": 1600, 68 "out_kbps": 768, 69 }, 70 "3G-fast": { 71 "rtt_ms": 150, 72 "in_kbps": 1600, 73 "out_kbps": 768, 74 }, 75 "4G": { 76 "rtt_ms": 170, 77 "in_kbps": 9000, 78 "out_kbps": 9000, 79 }, 80} 81 82 83class TsProxyServer: 84 """ 85 TsProxy provides basic latency, download and upload traffic shaping. This 86 class provides a programming API to the tsproxy script in 87 catapult/third_party/tsproxy/tsproxy.py 88 89 This class can be used as a context manager. 90 """ 91 92 def __init__(self, 93 ts_proxy_path: LocalPath, 94 host: Optional[str] = None, 95 socks_proxy_port: Optional[int] = None, 96 http_port: Optional[int] = None, 97 https_port: Optional[int] = None, 98 rtt_ms: Optional[int] = None, 99 in_kbps: Optional[int] = None, 100 out_kbps: Optional[int] = None, 101 window: Optional[int] = None, 102 verbose: bool = True): 103 self._proc: Optional[TsProxyProcess] = None 104 self._ts_proxy_path = PathParser.existing_file_path(ts_proxy_path) 105 self._socks_proxy_port = socks_proxy_port 106 self._host = host 107 self._http_port = http_port 108 self._https_port = https_port 109 self._rtt_ms = rtt_ms 110 self._in_kbps = in_kbps 111 self._out_kbps = out_kbps 112 self._window = window 113 self._verbose = verbose 114 self.verify_ports(http_port, https_port) 115 116 @classmethod 117 def verify_ports(cls, 118 http_port: Optional[int] = None, 119 https_port: Optional[int] = None) -> None: 120 if https_port and not bool(http_port): 121 raise ValueError(f"Got https_port={https_port} without a http port") 122 if http_port is not None and http_port == https_port: 123 raise ValueError("http_port and https_port must be different, " 124 f"got {https_port} twice.") 125 if http_port is not None: 126 NumberParser.port_number(http_port, "http_port") 127 if https_port is not None: 128 NumberParser.port_number(https_port, "https_port") 129 130 @property 131 def is_running(self) -> bool: 132 return self._proc is not None 133 134 def set_traffic_settings(self, 135 rtt_ms: Optional[int] = None, 136 in_kbps: Optional[int] = None, 137 out_kbps: Optional[int] = None, 138 window: Optional[int] = None, 139 timeout=DEFAULT_TIMEOUT) -> None: 140 assert self._proc, "ts_proxy is not running." 141 self._proc.set_traffic_settings(rtt_ms, in_kbps, out_kbps, window, timeout) 142 143 @property 144 def socks_proxy_port(self) -> int: 145 assert self._proc, "ts_proxy is not running." 146 return self._proc.socks_proxy_port 147 148 @property 149 def ts_proxy_path(self) -> LocalPath: 150 return self._ts_proxy_path 151 152 @property 153 def rtt_ms(self) -> Optional[int]: 154 return self._rtt_ms 155 156 @property 157 def in_kbps(self) -> Optional[int]: 158 return self._in_kbps 159 160 @property 161 def out_kbps(self) -> Optional[int]: 162 return self._out_kbps 163 164 @property 165 def window(self) -> Optional[int]: 166 return self._window 167 168 def start(self) -> None: 169 assert not self._proc, "ts_proxy is already running." 170 self._proc = TsProxyProcess(self._ts_proxy_path, self._host, 171 self._socks_proxy_port, self._http_port, 172 self._https_port, self._rtt_ms, self._in_kbps, 173 self._out_kbps, self._window, self._verbose) 174 atexit.register(self.stop) 175 176 def stop(self) -> Optional[str]: 177 if not self._proc: 178 logging.debug("TsProxy: Attempting to stop server that is not running.") 179 return None 180 assert self._proc 181 err = self._proc.stop() 182 self._proc = None 183 return err 184 185 def __enter__(self): 186 self.start() 187 return self 188 189 def __exit__(self, unused_exc_type, unused_exc_val, unused_exc_tb): 190 self.stop() 191 192 193class TsProxyProcess: 194 """Separate wrapper around the ts_proxy to simplify pytype testing.""" 195 196 def __init__(self, 197 ts_proxy_path: LocalPath, 198 host: Optional[str] = None, 199 socks_proxy_port: Optional[int] = None, 200 http_port: Optional[int] = None, 201 https_port: Optional[int] = None, 202 rtt_ms: Optional[int] = None, 203 in_kbps: Optional[int] = None, 204 out_kbps: Optional[int] = None, 205 window: Optional[int] = None, 206 verbose: bool = False, 207 timeout: Union[int, float] = DEFAULT_TIMEOUT) -> None: 208 """Start TsProxy server and verify that it started.""" 209 cmd: ListCmdArgs = [ 210 sys.executable, 211 ts_proxy_path, 212 ] 213 self._socks_proxy_port: Optional[int] = socks_proxy_port 214 self._initial_socks_proxy_port: Optional[int] = socks_proxy_port 215 if not socks_proxy_port: 216 # Use port 0 so tsproxy picks a random available port. 217 cmd.append("--port=0") 218 else: 219 cmd.append(f"--port={socks_proxy_port}") 220 if verbose: 221 cmd.append("--verbose") 222 self._in_kbps: Optional[int] = in_kbps 223 if in_kbps: 224 cmd.append(f"--inkbps={in_kbps}") 225 self._out_kbps: Optional[int] = out_kbps 226 if out_kbps: 227 cmd.append(f"--outkbps={out_kbps}") 228 self._window: Optional[int] = window 229 if window: 230 cmd.append(f"--window={window}") 231 self._rtt_ms: Optional[int] = rtt_ms 232 if rtt_ms: 233 cmd.append(f"--rtt={rtt_ms}") 234 self._host: Optional[str] = host 235 if host: 236 cmd.append(f"--desthost={host}") 237 self._http_port: Optional[int] = http_port 238 self._https_port: Optional[int] = https_port 239 TsProxyServer.verify_ports(http_port, https_port) 240 mapports = [] 241 if https_port: 242 mapports.append(f"443:{https_port}") 243 if http_port: 244 mapports.append(f"*:{http_port}") 245 cmd.append(f"--mapports={','.join(mapports)}") 246 logging.info("TsProxy: commandline: %s", shlex.join(map(str, cmd))) 247 self._verify_default_encoding() 248 # In python3 universal_newlines forces subprocess to encode/decode, 249 # allowing per-line buffering. 250 proc = subprocess.Popen( # pylint: disable=consider-using-with 251 cmd, 252 stdout=subprocess.PIPE, 253 stdin=subprocess.PIPE, 254 # stderr=subprocess.PIPE, 255 bufsize=1, 256 universal_newlines=True) 257 assert proc and proc.stdout and proc.stdin, "Could not start ts_proxy" 258 self._proc = proc 259 if stdout := proc.stdout: 260 self._stdout: IO[str] = stdout 261 else: 262 raise RuntimeError("Missing stdout") 263 if stdin := proc.stdin: 264 self._stdin: IO[str] = stdin 265 else: 266 raise RuntimeError("Missing stdin") 267 if fcntl: # pylint: disable=using-constant-test 268 self._setup_non_blocking_io() 269 self._wait_for_startup(timeout) 270 271 def _setup_non_blocking_io(self) -> None: 272 logging.debug("TsProxy: fcntl is supported, trying to set " 273 "non blocking I/O for the ts_proxy process") 274 fd = self._stdout.fileno() 275 fl = fcntl.fcntl(fd, fcntl.F_GETFL) 276 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # pylint: disable=no-member 277 278 @property 279 def socks_proxy_port(self) -> int: 280 if self._socks_proxy_port is None: 281 raise RuntimeError("ts_proxy didn't start") 282 return self._socks_proxy_port 283 284 def _verify_default_encoding(self) -> None: 285 # In python3 subprocess handles encoding/decoding; this warns if it won't 286 # be UTF-8. 287 encoding = locale.getpreferredencoding() 288 if encoding != "UTF-8": 289 logging.warning("Decoding will use %s instead of UTF-8", encoding) 290 291 def _wait_for_startup(self, timeout: Union[int, float]) -> None: 292 for _ in helper.wait_with_backoff(timeout): 293 if self._has_started(): 294 logging.info("TsProxy: port=%i", self._socks_proxy_port) 295 return 296 if err := self.stop(): 297 logging.error("TsProxy: Error stopping WPR server:\n%s", err) 298 raise TsProxyServerError( 299 f"Starting tsproxy timed out after {timeout} seconds") 300 301 def _has_started(self) -> bool: 302 if self._proc.poll() is not None: 303 return False 304 self._stdout.flush() 305 output_line = self._read_line_ts_proxy_stdout(timeout=5) 306 if not output_line: 307 return False 308 logging.debug("TsProxy: output: %s", output_line) 309 port = parse_ts_socks_proxy_port(output_line) 310 self._socks_proxy_port = NumberParser.port_number(port, "socks_proxy_port") 311 return True 312 313 def _read_line_ts_proxy_stdout(self, timeout: Union[int, float]) -> str: 314 for _ in helper.wait_with_backoff(timeout): 315 try: 316 return self._stdout.readline().strip() 317 except IOError as io_error: 318 logging.debug("TsProxy: Error while reading tsproxy line: %s", io_error) 319 return "" 320 321 def _send_command(self, 322 command: str, 323 timeout: Union[int, float] = DEFAULT_TIMEOUT) -> None: 324 logging.debug("TsProxy: Sending command to ts_proxy_server: %s", command) 325 self._stdin.write(f"{command}\n") 326 command_output = self._wait_for_status_response(timeout) 327 success = "OK" in command_output 328 logging.log(logging.DEBUG if success else logging.ERROR, 329 "TsProxy: output:\n%s", "\n".join(command_output)) 330 if not success: 331 raise TsProxyServerError(f"Failed to execute command: {command}") 332 333 def _wait_for_status_response(self, timeout: Union[int, float]) -> List[str]: 334 logging.debug("TsProxy: waiting for status response") 335 command_output = [] 336 for _ in helper.wait_with_backoff(timeout): 337 self._stdin.flush() 338 self._stdout.flush() 339 last_output = self._read_line_ts_proxy_stdout(timeout) 340 command_output.append(last_output) 341 if last_output in ("OK", "ERROR"): 342 break 343 return command_output 344 345 def set_traffic_settings(self, 346 rtt_ms: Optional[int] = None, 347 in_kbps: Optional[int] = None, 348 out_kbps: Optional[int] = None, 349 window: Optional[int] = None, 350 timeout=DEFAULT_TIMEOUT) -> None: 351 if rtt_ms is not None and self._rtt_ms != rtt_ms: 352 assert rtt_ms >= 0, f"Invalid rtt value: {rtt_ms}" 353 self._send_command(f"set rtt {rtt_ms}", timeout) 354 self._rtt_ms = rtt_ms 355 356 if in_kbps is not None and self._in_kbps != in_kbps: 357 assert in_kbps >= 0, f"Invalid in_kbps value: {in_kbps}" 358 self._send_command(f"set inkbps {in_kbps}", timeout) 359 self._in_kbps = in_kbps 360 361 if out_kbps is not None and self._out_kbps != out_kbps: 362 assert out_kbps >= 0, f"Invalid out_kbps value: {out_kbps}" 363 self._send_command(f"set outkbps {out_kbps}", timeout) 364 self._out_kbps = out_kbps 365 366 if window is not None and self._window != window: 367 assert window >= 0, f"Invalid window value: {window}" 368 self._send_command(f"set window {window}", timeout) 369 self._window = window 370 371 def stop(self) -> Optional[str]: 372 self._send_command("exit") 373 helper.wait_and_kill(self._proc, signal=signal.SIGINT) 374 _, err = self._proc.communicate() 375 self._socks_proxy_port = self._initial_socks_proxy_port 376 return err 377 378 379class TsProxyTrafficShaper(TrafficShaper): 380 381 def __init__(self, 382 browser_platform: Platform, 383 ts_proxy_path: Optional[AnyPath] = None, 384 rtt_ms: Optional[int] = None, 385 in_kbps: Optional[int] = None, 386 out_kbps: Optional[int] = None, 387 window: Optional[int] = None): 388 super().__init__(browser_platform) 389 if not ts_proxy_path: 390 if maybe_ts_proxy_path := TsProxyFinder(self.host_platform).path: 391 ts_proxy_path = self.host_platform.local_path(maybe_ts_proxy_path) 392 if not ts_proxy_path: 393 raise RuntimeError( 394 f"Could not find ts_proxy script on {self.host_platform}") 395 # Early instantiation to validate inputs. 396 self._ts_proxy = TsProxyServer( 397 self.host_platform.local_path(ts_proxy_path), 398 rtt_ms=rtt_ms, 399 in_kbps=in_kbps, 400 out_kbps=out_kbps, 401 window=window) 402 # TODO: support custom name 403 self._name = "tsproxy" 404 405 @property 406 def ts_proxy(self) -> TsProxyServer: 407 return self._ts_proxy 408 409 @contextlib.contextmanager 410 def open(self, network: Network, 411 session: BrowserSessionRunGroup) -> Iterator[TrafficShaper]: 412 if not network.is_live: 413 self._ts_proxy = self._create_remapping_ts_proxy(network) 414 415 with super().open(network, session): 416 logging.debug("Starting TS Proxy") 417 with self._ts_proxy: 418 with self._forward_ports(network, session): 419 yield self 420 421 def _create_remapping_ts_proxy(self, network) -> TsProxyServer: 422 return TsProxyServer( 423 self._ts_proxy.ts_proxy_path, 424 rtt_ms=self._ts_proxy.rtt_ms, 425 in_kbps=self._ts_proxy.in_kbps, 426 out_kbps=self._ts_proxy.out_kbps, 427 window=self._ts_proxy.window, 428 host=network.host, 429 http_port=network.http_port, 430 https_port=network.https_port) 431 432 @contextlib.contextmanager 433 def _forward_ports(self, network: Network, 434 session: BrowserSessionRunGroup) -> Iterator: 435 del network 436 browser_platform = session.browser_platform 437 ts_proxy_port = self._ts_proxy.socks_proxy_port 438 # TODO; remap network port for remote browsers or when ports are occupied 439 # already. 440 if browser_platform.is_remote: 441 browser_platform.reverse_port_forward(ts_proxy_port, ts_proxy_port) 442 yield 443 if browser_platform.is_remote: 444 browser_platform.stop_reverse_port_forward(ts_proxy_port) 445 446 def extra_flags(self, browser_attributes: BrowserAttributes) -> Flags: 447 if not browser_attributes.is_chromium_based: 448 raise ValueError( 449 "Only chromium-based browsers are supported with ts_proxy.") 450 # TODO: support port forwarding to remote device 451 assert browser_attributes.is_local, "Only local browsers supported for now" 452 assert self.is_running, "TrafficShaper is not running." 453 assert self._ts_proxy.socks_proxy_port, "ts_proxy is not running" 454 return Flags({ 455 "--proxy-server": 456 f"socks://127.0.0.1:{self._ts_proxy.socks_proxy_port}", 457 "--proxy-bypass-list": 458 "<-loopback>" 459 }) 460 461 def __str__(self) -> str: 462 return self._name 463