• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import sys
2import trace
3from typing import TYPE_CHECKING
4
5from .runtests import RunTests
6from .result import State, TestResult, TestStats, Location
7from .utils import (
8    StrPath, TestName, TestTuple, TestList, FilterDict,
9    printlist, count, format_duration)
10
11if TYPE_CHECKING:
12    from xml.etree.ElementTree import Element
13
14
15# Python uses exit code 1 when an exception is not caught
16# argparse.ArgumentParser.error() uses exit code 2
17EXITCODE_BAD_TEST = 2
18EXITCODE_ENV_CHANGED = 3
19EXITCODE_NO_TESTS_RAN = 4
20EXITCODE_RERUN_FAIL = 5
21EXITCODE_INTERRUPTED = 130   # 128 + signal.SIGINT=2
22
23
24class TestResults:
25    def __init__(self) -> None:
26        self.bad: TestList = []
27        self.good: TestList = []
28        self.rerun_bad: TestList = []
29        self.skipped: TestList = []
30        self.resource_denied: TestList = []
31        self.env_changed: TestList = []
32        self.run_no_tests: TestList = []
33        self.rerun: TestList = []
34        self.rerun_results: list[TestResult] = []
35
36        self.interrupted: bool = False
37        self.worker_bug: bool = False
38        self.test_times: list[tuple[float, TestName]] = []
39        self.stats = TestStats()
40        # used by --junit-xml
41        self.testsuite_xml: list['Element'] = []
42        # used by -T with -j
43        self.covered_lines: set[Location] = set()
44
45    def is_all_good(self) -> bool:
46        return (not self.bad
47                and not self.skipped
48                and not self.interrupted
49                and not self.worker_bug)
50
51    def get_executed(self) -> set[TestName]:
52        return (set(self.good) | set(self.bad) | set(self.skipped)
53                | set(self.resource_denied) | set(self.env_changed)
54                | set(self.run_no_tests))
55
56    def no_tests_run(self) -> bool:
57        return not any((self.good, self.bad, self.skipped, self.interrupted,
58                        self.env_changed))
59
60    def get_state(self, fail_env_changed: bool) -> str:
61        state = []
62        if self.bad:
63            state.append("FAILURE")
64        elif fail_env_changed and self.env_changed:
65            state.append("ENV CHANGED")
66        elif self.no_tests_run():
67            state.append("NO TESTS RAN")
68
69        if self.interrupted:
70            state.append("INTERRUPTED")
71        if self.worker_bug:
72            state.append("WORKER BUG")
73        if not state:
74            state.append("SUCCESS")
75
76        return ', '.join(state)
77
78    def get_exitcode(self, fail_env_changed, fail_rerun):
79        exitcode = 0
80        if self.bad:
81            exitcode = EXITCODE_BAD_TEST
82        elif self.interrupted:
83            exitcode = EXITCODE_INTERRUPTED
84        elif fail_env_changed and self.env_changed:
85            exitcode = EXITCODE_ENV_CHANGED
86        elif self.no_tests_run():
87            exitcode = EXITCODE_NO_TESTS_RAN
88        elif fail_rerun and self.rerun:
89            exitcode = EXITCODE_RERUN_FAIL
90        elif self.worker_bug:
91            exitcode = EXITCODE_BAD_TEST
92        return exitcode
93
94    def accumulate_result(self, result: TestResult, runtests: RunTests):
95        test_name = result.test_name
96        rerun = runtests.rerun
97        fail_env_changed = runtests.fail_env_changed
98
99        match result.state:
100            case State.PASSED:
101                self.good.append(test_name)
102            case State.ENV_CHANGED:
103                self.env_changed.append(test_name)
104                self.rerun_results.append(result)
105            case State.SKIPPED:
106                self.skipped.append(test_name)
107            case State.RESOURCE_DENIED:
108                self.resource_denied.append(test_name)
109            case State.INTERRUPTED:
110                self.interrupted = True
111            case State.DID_NOT_RUN:
112                self.run_no_tests.append(test_name)
113            case _:
114                if result.is_failed(fail_env_changed):
115                    self.bad.append(test_name)
116                    self.rerun_results.append(result)
117                else:
118                    raise ValueError(f"invalid test state: {result.state!r}")
119
120        if result.state == State.WORKER_BUG:
121            self.worker_bug = True
122
123        if result.has_meaningful_duration() and not rerun:
124            if result.duration is None:
125                raise ValueError("result.duration is None")
126            self.test_times.append((result.duration, test_name))
127        if result.stats is not None:
128            self.stats.accumulate(result.stats)
129        if rerun:
130            self.rerun.append(test_name)
131        if result.covered_lines:
132            # we don't care about trace counts so we don't have to sum them up
133            self.covered_lines.update(result.covered_lines)
134        xml_data = result.xml_data
135        if xml_data:
136            self.add_junit(xml_data)
137
138    def get_coverage_results(self) -> trace.CoverageResults:
139        counts = {loc: 1 for loc in self.covered_lines}
140        return trace.CoverageResults(counts=counts)
141
142    def need_rerun(self):
143        return bool(self.rerun_results)
144
145    def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple, FilterDict]:
146        tests: TestList = []
147        match_tests_dict = {}
148        for result in self.rerun_results:
149            tests.append(result.test_name)
150
151            match_tests = result.get_rerun_match_tests()
152            # ignore empty match list
153            if match_tests:
154                match_tests_dict[result.test_name] = match_tests
155
156        if clear:
157            # Clear previously failed tests
158            self.rerun_bad.extend(self.bad)
159            self.bad.clear()
160            self.env_changed.clear()
161            self.rerun_results.clear()
162
163        return (tuple(tests), match_tests_dict)
164
165    def add_junit(self, xml_data: list[str]):
166        import xml.etree.ElementTree as ET
167        for e in xml_data:
168            try:
169                self.testsuite_xml.append(ET.fromstring(e))
170            except ET.ParseError:
171                print(xml_data, file=sys.__stderr__)
172                raise
173
174    def write_junit(self, filename: StrPath):
175        if not self.testsuite_xml:
176            # Don't create empty XML file
177            return
178
179        import xml.etree.ElementTree as ET
180        root = ET.Element("testsuites")
181
182        # Manually count the totals for the overall summary
183        totals = {'tests': 0, 'errors': 0, 'failures': 0}
184        for suite in self.testsuite_xml:
185            root.append(suite)
186            for k in totals:
187                try:
188                    totals[k] += int(suite.get(k, 0))
189                except ValueError:
190                    pass
191
192        for k, v in totals.items():
193            root.set(k, str(v))
194
195        with open(filename, 'wb') as f:
196            for s in ET.tostringlist(root):
197                f.write(s)
198
199    def display_result(self, tests: TestTuple, quiet: bool, print_slowest: bool):
200        if print_slowest:
201            self.test_times.sort(reverse=True)
202            print()
203            print("10 slowest tests:")
204            for test_time, test in self.test_times[:10]:
205                print("- %s: %s" % (test, format_duration(test_time)))
206
207        all_tests = []
208        omitted = set(tests) - self.get_executed()
209
210        # less important
211        all_tests.append((sorted(omitted), "test", "{} omitted:"))
212        if not quiet:
213            all_tests.append((self.skipped, "test", "{} skipped:"))
214            all_tests.append((self.resource_denied, "test", "{} skipped (resource denied):"))
215        all_tests.append((self.run_no_tests, "test", "{} run no tests:"))
216
217        # more important
218        all_tests.append((self.env_changed, "test", "{} altered the execution environment (env changed):"))
219        all_tests.append((self.rerun, "re-run test", "{}:"))
220        all_tests.append((self.bad, "test", "{} failed:"))
221
222        for tests_list, count_text, title_format in all_tests:
223            if tests_list:
224                print()
225                count_text = count(len(tests_list), count_text)
226                print(title_format.format(count_text))
227                printlist(tests_list)
228
229        if self.good and not quiet:
230            print()
231            text = count(len(self.good), "test")
232            text = f"{text} OK."
233            if (self.is_all_good() and len(self.good) > 1):
234                text = f"All {text}"
235            print(text)
236
237        if self.interrupted:
238            print()
239            print("Test suite interrupted by signal SIGINT.")
240
241    def display_summary(self, first_runtests: RunTests, filtered: bool):
242        # Total tests
243        stats = self.stats
244        text = f'run={stats.tests_run:,}'
245        if filtered:
246            text = f"{text} (filtered)"
247        report = [text]
248        if stats.failures:
249            report.append(f'failures={stats.failures:,}')
250        if stats.skipped:
251            report.append(f'skipped={stats.skipped:,}')
252        print(f"Total tests: {' '.join(report)}")
253
254        # Total test files
255        all_tests = [self.good, self.bad, self.rerun,
256                     self.skipped,
257                     self.env_changed, self.run_no_tests]
258        run = sum(map(len, all_tests))
259        text = f'run={run}'
260        if not first_runtests.forever:
261            ntest = len(first_runtests.tests)
262            text = f"{text}/{ntest}"
263        if filtered:
264            text = f"{text} (filtered)"
265        report = [text]
266        for name, tests in (
267            ('failed', self.bad),
268            ('env_changed', self.env_changed),
269            ('skipped', self.skipped),
270            ('resource_denied', self.resource_denied),
271            ('rerun', self.rerun),
272            ('run_no_tests', self.run_no_tests),
273        ):
274            if tests:
275                report.append(f'{name}={len(tests)}')
276        print(f"Total test files: {' '.join(report)}")
277