• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import contextlib
9import datetime as dt
10import logging
11from typing import (TYPE_CHECKING, Dict, Generic, Iterable, List, Optional,
12                    Tuple, Type, TypeVar)
13
14from crossbench.helper.state import State, StateMachine
15from crossbench.probes.probe_context import BaseProbeContext, ProbeContext
16from crossbench.probes.results import EmptyProbeResult, ProbeResult
17from crossbench.runner.result_origin import ResultOrigin
18
19if TYPE_CHECKING:
20  from crossbench.probes.probe import Probe, ProbeT
21  from crossbench.probes.results import ProbeResultDict
22
23ResultOriginT = TypeVar("ResultOriginT", bound=ResultOrigin)
24ProbeContextT = TypeVar("ProbeContextT", bound=BaseProbeContext)
25
26
27class ProbeContextManager(Generic[ResultOriginT, ProbeContextT], abc.ABC):
28
29  def __init__(self, result_origin: ResultOriginT,
30               probe_results: ProbeResultDict):
31    self._state = StateMachine(State.INITIAL)
32    self._origin = result_origin
33    self._probe_results = probe_results
34    self._probe_contexts: Dict[Type[Probe], ProbeContextT] = {}
35    # TODO: either prefix timers or use custom duration
36    self._durations = result_origin.durations
37    self._exceptions = result_origin.exceptions
38
39  @property
40  def is_ready(self) -> bool:
41    return self._state == State.READY
42
43  @property
44  def is_running(self) -> bool:
45    return self._state == State.RUN
46
47  def measure(self, name):
48    return self._origin.measure(name)
49
50  @contextlib.contextmanager
51  def capture(self, label: str, measure: bool = False):
52    with self._exceptions.capture(label):
53      if not measure:
54        yield
55      else:
56        with self._origin.durations.measure(label):
57          yield
58
59  @property
60  def is_success(self) -> bool:
61    return self._exceptions.is_success
62
63  def setup(self, probes: Iterable[Probe], is_dry_run: bool):
64    self._state.transition(State.INITIAL, to=State.SETUP)
65    if not is_dry_run:
66      if not self._setup_probes(tuple(probes), is_dry_run):
67        return
68    self._state.transition(State.SETUP, to=State.READY)
69
70  def _setup_probes(self, probes: Tuple[Probe, ...], is_dry_run: bool) -> bool:
71    with self.capture("probes-setup", measure=True):
72      self._validate_probes(probes)
73      self._create_contexts(probes)
74      self._setup_contexts()
75    if not self.is_success:
76      self._handle_setup_error(is_dry_run)
77    return self.is_success
78
79  def _validate_probes(self, probes: Tuple[Probe, ...]):
80    assert not self._probe_contexts, "Wrong probe context initialization order"
81    probe_set = set()
82    for probe in probes:
83      assert probe not in probe_set, (f"Got duplicate probe name={probe.name}")
84      probe_set.add(probe)
85      assert probe.is_attached, (
86          f"Probe {probe.name} is not properly attached to a browser")
87
88  def _create_contexts(self, probes: Tuple[Probe, ...]):
89    for probe in probes:
90      if probe.PRODUCES_DATA:
91        self._probe_results[probe] = EmptyProbeResult()
92      with self.capture(f"{probe.name} get_context"):
93        if probe_context := self.get_probe_context(probe):
94          probe_cls = type(probe)
95          assert probe_cls not in self._probe_contexts
96          self._probe_contexts[probe_cls] = probe_context
97
98  def _setup_contexts(self):
99    for probe_context in self._probe_contexts.values():
100      with self.capture(f"probes-setup {probe_context.name}"):
101        probe_context.setup()
102
103  def _handle_setup_error(self, is_dry_run: bool) -> None:
104    self._state.transition(State.SETUP, to=State.DONE)
105    logging.debug("Handling setup error")
106    assert not self.is_success
107    # Special handling for crucial runner probes
108    internal_probe_contexts = [
109        context for context in self._probe_contexts.values()
110        if context.probe.is_internal
111    ]
112    self._teardown(internal_probe_contexts, is_dry_run, setup_error=True)
113
114  @contextlib.contextmanager
115  def open(self, is_dry_run: bool):
116    self._state.transition(State.READY, to=State.RUN)
117    probe_start_time = dt.datetime.now()
118    combined_contexts = contextlib.ExitStack()
119
120    for probe_context in self._probe_contexts.values():
121      probe_context.set_start_time(probe_start_time)
122      if not is_dry_run:
123        combined_contexts.enter_context(probe_context.open())
124
125    with combined_contexts:
126      self._durations["probes-start"] = dt.datetime.now() - probe_start_time
127      yield self
128
129  def teardown(self, is_dry_run: bool, setup_error: bool = False) -> None:
130    self._state.transition(State.READY, State.RUN, to=State.DONE)
131    with self.measure("probes-teardown"):
132      self._teardown(
133          list(self._probe_contexts.values()), is_dry_run, setup_error)
134      self._probe_contexts = {}
135
136  def _teardown(self,
137                probe_contexts: List[ProbeContextT],
138                is_dry_run: bool,
139                setup_error: bool = False) -> None:
140    if setup_error:
141      assert self._probe_contexts, "Invalid state"
142    self._state.expect(State.DONE)
143    logging.debug("PROBE SCOPE TEARDOWN")
144    if is_dry_run:
145      return
146    for probe_context in reversed(probe_contexts):
147      with self.capture(f"Probe {probe_context.name} teardown", measure=True):
148        assert probe_context.result_origin == self._origin
149        probe_results: ProbeResult = probe_context.teardown()
150        probe = probe_context.probe
151        if probe_results.is_empty:
152          logging.warning("Probe did not extract any data. probe=%s in %s",
153                          probe, self._origin)
154        self._probe_results[probe] = probe_results
155
156  @abc.abstractmethod
157  def get_probe_context(self, probe: Probe) -> Optional[ProbeContextT]:
158    pass
159
160  def find_probe_context(self,
161                         cls: Type[ProbeT]) -> Optional[ProbeContext[ProbeT]]:
162    return self._probe_contexts.get(cls)
163