• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import dataclasses
2import json
3from typing import Any
4
5from .utils import (
6    StrJSON, TestName, FilterTuple,
7    format_duration, normalize_test_name, print_warning)
8
9
10@dataclasses.dataclass(slots=True)
11class TestStats:
12    tests_run: int = 0
13    failures: int = 0
14    skipped: int = 0
15
16    @staticmethod
17    def from_unittest(result):
18        return TestStats(result.testsRun,
19                         len(result.failures),
20                         len(result.skipped))
21
22    @staticmethod
23    def from_doctest(results):
24        return TestStats(results.attempted,
25                         results.failed,
26                         results.skipped)
27
28    def accumulate(self, stats):
29        self.tests_run += stats.tests_run
30        self.failures += stats.failures
31        self.skipped += stats.skipped
32
33
34# Avoid enum.Enum to reduce the number of imports when tests are run
35class State:
36    PASSED = "PASSED"
37    FAILED = "FAILED"
38    SKIPPED = "SKIPPED"
39    UNCAUGHT_EXC = "UNCAUGHT_EXC"
40    REFLEAK = "REFLEAK"
41    ENV_CHANGED = "ENV_CHANGED"
42    RESOURCE_DENIED = "RESOURCE_DENIED"
43    INTERRUPTED = "INTERRUPTED"
44    WORKER_FAILED = "WORKER_FAILED"   # non-zero worker process exit code
45    WORKER_BUG = "WORKER_BUG"         # exception when running a worker
46    DID_NOT_RUN = "DID_NOT_RUN"
47    TIMEOUT = "TIMEOUT"
48
49    @staticmethod
50    def is_failed(state):
51        return state in {
52            State.FAILED,
53            State.UNCAUGHT_EXC,
54            State.REFLEAK,
55            State.WORKER_FAILED,
56            State.WORKER_BUG,
57            State.TIMEOUT}
58
59    @staticmethod
60    def has_meaningful_duration(state):
61        # Consider that the duration is meaningless for these cases.
62        # For example, if a whole test file is skipped, its duration
63        # is unlikely to be the duration of executing its tests,
64        # but just the duration to execute code which skips the test.
65        return state not in {
66            State.SKIPPED,
67            State.RESOURCE_DENIED,
68            State.INTERRUPTED,
69            State.WORKER_FAILED,
70            State.WORKER_BUG,
71            State.DID_NOT_RUN}
72
73    @staticmethod
74    def must_stop(state):
75        return state in {
76            State.INTERRUPTED,
77            State.WORKER_BUG,
78        }
79
80
81FileName = str
82LineNo = int
83Location = tuple[FileName, LineNo]
84
85
86@dataclasses.dataclass(slots=True)
87class TestResult:
88    test_name: TestName
89    state: str | None = None
90    # Test duration in seconds
91    duration: float | None = None
92    xml_data: list[str] | None = None
93    stats: TestStats | None = None
94
95    # errors and failures copied from support.TestFailedWithDetails
96    errors: list[tuple[str, str]] | None = None
97    failures: list[tuple[str, str]] | None = None
98
99    # partial coverage in a worker run; not used by sequential in-process runs
100    covered_lines: list[Location] | None = None
101
102    def is_failed(self, fail_env_changed: bool) -> bool:
103        if self.state == State.ENV_CHANGED:
104            return fail_env_changed
105        return State.is_failed(self.state)
106
107    def _format_failed(self):
108        if self.errors and self.failures:
109            le = len(self.errors)
110            lf = len(self.failures)
111            error_s = "error" + ("s" if le > 1 else "")
112            failure_s = "failure" + ("s" if lf > 1 else "")
113            return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
114
115        if self.errors:
116            le = len(self.errors)
117            error_s = "error" + ("s" if le > 1 else "")
118            return f"{self.test_name} failed ({le} {error_s})"
119
120        if self.failures:
121            lf = len(self.failures)
122            failure_s = "failure" + ("s" if lf > 1 else "")
123            return f"{self.test_name} failed ({lf} {failure_s})"
124
125        return f"{self.test_name} failed"
126
127    def __str__(self) -> str:
128        match self.state:
129            case State.PASSED:
130                return f"{self.test_name} passed"
131            case State.FAILED:
132                return self._format_failed()
133            case State.SKIPPED:
134                return f"{self.test_name} skipped"
135            case State.UNCAUGHT_EXC:
136                return f"{self.test_name} failed (uncaught exception)"
137            case State.REFLEAK:
138                return f"{self.test_name} failed (reference leak)"
139            case State.ENV_CHANGED:
140                return f"{self.test_name} failed (env changed)"
141            case State.RESOURCE_DENIED:
142                return f"{self.test_name} skipped (resource denied)"
143            case State.INTERRUPTED:
144                return f"{self.test_name} interrupted"
145            case State.WORKER_FAILED:
146                return f"{self.test_name} worker non-zero exit code"
147            case State.WORKER_BUG:
148                return f"{self.test_name} worker bug"
149            case State.DID_NOT_RUN:
150                return f"{self.test_name} ran no tests"
151            case State.TIMEOUT:
152                return f"{self.test_name} timed out ({format_duration(self.duration)})"
153            case _:
154                raise ValueError("unknown result state: {state!r}")
155
156    def has_meaningful_duration(self):
157        return State.has_meaningful_duration(self.state)
158
159    def set_env_changed(self):
160        if self.state is None or self.state == State.PASSED:
161            self.state = State.ENV_CHANGED
162
163    def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
164        if State.must_stop(self.state):
165            return True
166        if fail_fast and self.is_failed(fail_env_changed):
167            return True
168        return False
169
170    def get_rerun_match_tests(self) -> FilterTuple | None:
171        match_tests = []
172
173        errors = self.errors or []
174        failures = self.failures or []
175        for error_list, is_error in (
176            (errors, True),
177            (failures, False),
178        ):
179            for full_name, *_ in error_list:
180                match_name = normalize_test_name(full_name, is_error=is_error)
181                if match_name is None:
182                    # 'setUpModule (test.test_sys)': don't filter tests
183                    return None
184                if not match_name:
185                    error_type = "ERROR" if is_error else "FAIL"
186                    print_warning(f"rerun failed to parse {error_type} test name: "
187                                  f"{full_name!r}: don't filter tests")
188                    return None
189                match_tests.append(match_name)
190
191        if not match_tests:
192            return None
193        return tuple(match_tests)
194
195    def write_json_into(self, file) -> None:
196        json.dump(self, file, cls=_EncodeTestResult)
197
198    @staticmethod
199    def from_json(worker_json: StrJSON) -> 'TestResult':
200        return json.loads(worker_json, object_hook=_decode_test_result)
201
202
203class _EncodeTestResult(json.JSONEncoder):
204    def default(self, o: Any) -> dict[str, Any]:
205        if isinstance(o, TestResult):
206            result = dataclasses.asdict(o)
207            result["__test_result__"] = o.__class__.__name__
208            return result
209        else:
210            return super().default(o)
211
212
213def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
214    if "__test_result__" in data:
215        data.pop('__test_result__')
216        if data['stats'] is not None:
217            data['stats'] = TestStats(**data['stats'])
218        if data['covered_lines'] is not None:
219            data['covered_lines'] = [
220                tuple(loc) for loc in data['covered_lines']
221            ]
222        return TestResult(**data)
223    else:
224        return data
225