• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import contextlib
9from typing import TYPE_CHECKING, Iterator
10
11from crossbench.flags.base import Flags
12
13if TYPE_CHECKING:
14  from crossbench.browsers.attributes import BrowserAttributes
15  from crossbench.network.base import Network
16  from crossbench.plt.base import Platform
17  from crossbench.runner.groups.session import BrowserSessionRunGroup
18
19
20class TrafficShaper(abc.ABC):
21
22  def __init__(self, browser_platform: Platform):
23    self._browser_platform = browser_platform
24    self._is_running = False
25
26  @property
27  def browser_platform(self) -> Platform:
28    return self._browser_platform
29
30  @property
31  def host_platform(self) -> Platform:
32    return self._browser_platform.host_platform
33
34  @property
35  def is_live(self) -> bool:
36    return False
37
38  @property
39  def is_running(self) -> bool:
40    return self._is_running
41
42  def extra_flags(self, browser_attributes: BrowserAttributes) -> Flags:
43    del browser_attributes
44    assert self.is_running, "TrafficShaper is not running."
45    return Flags()
46
47  @contextlib.contextmanager
48  def open(self, network: Network,
49           session: BrowserSessionRunGroup) -> Iterator[TrafficShaper]:
50    del network, session
51    assert not self._is_running, "Cannot start network more than once."
52    self._is_running = True
53    try:
54      yield self
55    finally:
56      self._is_running = False
57