1# Copyright 2024 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import contextlib 9import datetime as dt 10import logging 11from typing import (TYPE_CHECKING, Dict, Generic, Iterable, List, Optional, 12 Tuple, Type, TypeVar) 13 14from crossbench.helper.state import State, StateMachine 15from crossbench.probes.probe_context import BaseProbeContext, ProbeContext 16from crossbench.probes.results import EmptyProbeResult, ProbeResult 17from crossbench.runner.result_origin import ResultOrigin 18 19if TYPE_CHECKING: 20 from crossbench.probes.probe import Probe, ProbeT 21 from crossbench.probes.results import ProbeResultDict 22 23ResultOriginT = TypeVar("ResultOriginT", bound=ResultOrigin) 24ProbeContextT = TypeVar("ProbeContextT", bound=BaseProbeContext) 25 26 27class ProbeContextManager(Generic[ResultOriginT, ProbeContextT], abc.ABC): 28 29 def __init__(self, result_origin: ResultOriginT, 30 probe_results: ProbeResultDict): 31 self._state = StateMachine(State.INITIAL) 32 self._origin = result_origin 33 self._probe_results = probe_results 34 self._probe_contexts: Dict[Type[Probe], ProbeContextT] = {} 35 # TODO: either prefix timers or use custom duration 36 self._durations = result_origin.durations 37 self._exceptions = result_origin.exceptions 38 39 @property 40 def is_ready(self) -> bool: 41 return self._state == State.READY 42 43 @property 44 def is_running(self) -> bool: 45 return self._state == State.RUN 46 47 def measure(self, name): 48 return self._origin.measure(name) 49 50 @contextlib.contextmanager 51 def capture(self, label: str, measure: bool = False): 52 with self._exceptions.capture(label): 53 if not measure: 54 yield 55 else: 56 with self._origin.durations.measure(label): 57 yield 58 59 @property 60 def is_success(self) -> bool: 61 return self._exceptions.is_success 62 63 def setup(self, probes: Iterable[Probe], is_dry_run: bool): 64 self._state.transition(State.INITIAL, to=State.SETUP) 65 if not is_dry_run: 66 if not self._setup_probes(tuple(probes), is_dry_run): 67 return 68 self._state.transition(State.SETUP, to=State.READY) 69 70 def _setup_probes(self, probes: Tuple[Probe, ...], is_dry_run: bool) -> bool: 71 with self.capture("probes-setup", measure=True): 72 self._validate_probes(probes) 73 self._create_contexts(probes) 74 self._setup_contexts() 75 if not self.is_success: 76 self._handle_setup_error(is_dry_run) 77 return self.is_success 78 79 def _validate_probes(self, probes: Tuple[Probe, ...]): 80 assert not self._probe_contexts, "Wrong probe context initialization order" 81 probe_set = set() 82 for probe in probes: 83 assert probe not in probe_set, (f"Got duplicate probe name={probe.name}") 84 probe_set.add(probe) 85 assert probe.is_attached, ( 86 f"Probe {probe.name} is not properly attached to a browser") 87 88 def _create_contexts(self, probes: Tuple[Probe, ...]): 89 for probe in probes: 90 if probe.PRODUCES_DATA: 91 self._probe_results[probe] = EmptyProbeResult() 92 with self.capture(f"{probe.name} get_context"): 93 if probe_context := self.get_probe_context(probe): 94 probe_cls = type(probe) 95 assert probe_cls not in self._probe_contexts 96 self._probe_contexts[probe_cls] = probe_context 97 98 def _setup_contexts(self): 99 for probe_context in self._probe_contexts.values(): 100 with self.capture(f"probes-setup {probe_context.name}"): 101 probe_context.setup() 102 103 def _handle_setup_error(self, is_dry_run: bool) -> None: 104 self._state.transition(State.SETUP, to=State.DONE) 105 logging.debug("Handling setup error") 106 assert not self.is_success 107 # Special handling for crucial runner probes 108 internal_probe_contexts = [ 109 context for context in self._probe_contexts.values() 110 if context.probe.is_internal 111 ] 112 self._teardown(internal_probe_contexts, is_dry_run, setup_error=True) 113 114 @contextlib.contextmanager 115 def open(self, is_dry_run: bool): 116 self._state.transition(State.READY, to=State.RUN) 117 probe_start_time = dt.datetime.now() 118 combined_contexts = contextlib.ExitStack() 119 120 for probe_context in self._probe_contexts.values(): 121 probe_context.set_start_time(probe_start_time) 122 if not is_dry_run: 123 combined_contexts.enter_context(probe_context.open()) 124 125 with combined_contexts: 126 self._durations["probes-start"] = dt.datetime.now() - probe_start_time 127 yield self 128 129 def teardown(self, is_dry_run: bool, setup_error: bool = False) -> None: 130 self._state.transition(State.READY, State.RUN, to=State.DONE) 131 with self.measure("probes-teardown"): 132 self._teardown( 133 list(self._probe_contexts.values()), is_dry_run, setup_error) 134 self._probe_contexts = {} 135 136 def _teardown(self, 137 probe_contexts: List[ProbeContextT], 138 is_dry_run: bool, 139 setup_error: bool = False) -> None: 140 if setup_error: 141 assert self._probe_contexts, "Invalid state" 142 self._state.expect(State.DONE) 143 logging.debug("PROBE SCOPE TEARDOWN") 144 if is_dry_run: 145 return 146 for probe_context in reversed(probe_contexts): 147 with self.capture(f"Probe {probe_context.name} teardown", measure=True): 148 assert probe_context.result_origin == self._origin 149 probe_results: ProbeResult = probe_context.teardown() 150 probe = probe_context.probe 151 if probe_results.is_empty: 152 logging.warning("Probe did not extract any data. probe=%s in %s", 153 probe, self._origin) 154 self._probe_results[probe] = probe_results 155 156 @abc.abstractmethod 157 def get_probe_context(self, probe: Probe) -> Optional[ProbeContextT]: 158 pass 159 160 def find_probe_context(self, 161 cls: Type[ProbeT]) -> Optional[ProbeContext[ProbeT]]: 162 return self._probe_contexts.get(cls) 163