• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import faulthandler
2import gc
3import importlib
4import io
5import sys
6import time
7import traceback
8import unittest
9
10from test import support
11from test.support import threading_helper
12
13from .filter import match_test
14from .result import State, TestResult, TestStats
15from .runtests import RunTests
16from .save_env import saved_test_environment
17from .setup import setup_tests
18from .testresult import get_test_runner
19from .utils import (
20    TestName,
21    clear_caches, remove_testfn, abs_module_name, print_warning)
22
23
24# Minimum duration of a test to display its duration or to mention that
25# the test is running in background
26PROGRESS_MIN_TIME = 30.0   # seconds
27
28
29def run_unittest(test_mod):
30    loader = unittest.TestLoader()
31    tests = loader.loadTestsFromModule(test_mod)
32    for error in loader.errors:
33        print(error, file=sys.stderr)
34    if loader.errors:
35        raise Exception("errors while loading tests")
36    _filter_suite(tests, match_test)
37    return _run_suite(tests)
38
39def _filter_suite(suite, pred):
40    """Recursively filter test cases in a suite based on a predicate."""
41    newtests = []
42    for test in suite._tests:
43        if isinstance(test, unittest.TestSuite):
44            _filter_suite(test, pred)
45            newtests.append(test)
46        else:
47            if pred(test):
48                newtests.append(test)
49    suite._tests = newtests
50
51def _run_suite(suite):
52    """Run tests from a unittest.TestSuite-derived class."""
53    runner = get_test_runner(sys.stdout,
54                             verbosity=support.verbose,
55                             capture_output=(support.junit_xml_list is not None))
56
57    result = runner.run(suite)
58
59    if support.junit_xml_list is not None:
60        import xml.etree.ElementTree as ET
61        xml_elem = result.get_xml_element()
62        xml_str = ET.tostring(xml_elem).decode('ascii')
63        support.junit_xml_list.append(xml_str)
64
65    if not result.testsRun and not result.skipped and not result.errors:
66        raise support.TestDidNotRun
67    if not result.wasSuccessful():
68        stats = TestStats.from_unittest(result)
69        if len(result.errors) == 1 and not result.failures:
70            err = result.errors[0][1]
71        elif len(result.failures) == 1 and not result.errors:
72            err = result.failures[0][1]
73        else:
74            err = "multiple errors occurred"
75            if not support.verbose: err += "; run in verbose mode for details"
76        errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
77        failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
78        raise support.TestFailedWithDetails(err, errors, failures, stats=stats)
79    return result
80
81
82def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
83    # Run test_func(), collect statistics, and detect reference and memory
84    # leaks.
85    if runtests.hunt_refleak:
86        from .refleak import runtest_refleak
87        refleak, test_result = runtest_refleak(result.test_name, test_func,
88                                               runtests.hunt_refleak,
89                                               runtests.quiet)
90    else:
91        test_result = test_func()
92        refleak = False
93
94    if refleak:
95        result.state = State.REFLEAK
96
97    stats: TestStats | None
98
99    match test_result:
100        case TestStats():
101            stats = test_result
102        case unittest.TestResult():
103            stats = TestStats.from_unittest(test_result)
104        case None:
105            print_warning(f"{result.test_name} test runner returned None: {test_func}")
106            stats = None
107        case _:
108            # Don't import doctest at top level since only few tests return
109            # a doctest.TestResult instance.
110            import doctest
111            if isinstance(test_result, doctest.TestResults):
112                stats = TestStats.from_doctest(test_result)
113            else:
114                print_warning(f"Unknown test result type: {type(test_result)}")
115                stats = None
116
117    result.stats = stats
118
119
120# Storage of uncollectable GC objects (gc.garbage)
121GC_GARBAGE = []
122
123
124def _load_run_test(result: TestResult, runtests: RunTests) -> None:
125    # Load the test module and run the tests.
126    test_name = result.test_name
127    module_name = abs_module_name(test_name, runtests.test_dir)
128    test_mod = importlib.import_module(module_name)
129
130    if hasattr(test_mod, "test_main"):
131        # https://github.com/python/cpython/issues/89392
132        raise Exception(f"Module {test_name} defines test_main() which "
133                        f"is no longer supported by regrtest")
134    def test_func():
135        return run_unittest(test_mod)
136
137    try:
138        regrtest_runner(result, test_func, runtests)
139    finally:
140        # First kill any dangling references to open files etc.
141        # This can also issue some ResourceWarnings which would otherwise get
142        # triggered during the following test run, and possibly produce
143        # failures.
144        support.gc_collect()
145
146        remove_testfn(test_name, runtests.verbose)
147
148    if gc.garbage:
149        support.environment_altered = True
150        print_warning(f"{test_name} created {len(gc.garbage)} "
151                      f"uncollectable object(s)")
152
153        # move the uncollectable objects somewhere,
154        # so we don't see them again
155        GC_GARBAGE.extend(gc.garbage)
156        gc.garbage.clear()
157
158    support.reap_children()
159
160
161def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
162                             display_failure: bool = True) -> None:
163    # Handle exceptions, detect environment changes.
164
165    # Reset the environment_altered flag to detect if a test altered
166    # the environment
167    support.environment_altered = False
168
169    pgo = runtests.pgo
170    if pgo:
171        display_failure = False
172    quiet = runtests.quiet
173
174    test_name = result.test_name
175    try:
176        clear_caches()
177        support.gc_collect()
178
179        with saved_test_environment(test_name,
180                                    runtests.verbose, quiet, pgo=pgo):
181            _load_run_test(result, runtests)
182    except support.ResourceDenied as exc:
183        if not quiet and not pgo:
184            print(f"{test_name} skipped -- {exc}", flush=True)
185        result.state = State.RESOURCE_DENIED
186        return
187    except unittest.SkipTest as exc:
188        if not quiet and not pgo:
189            print(f"{test_name} skipped -- {exc}", flush=True)
190        result.state = State.SKIPPED
191        return
192    except support.TestFailedWithDetails as exc:
193        msg = f"test {test_name} failed"
194        if display_failure:
195            msg = f"{msg} -- {exc}"
196        print(msg, file=sys.stderr, flush=True)
197        result.state = State.FAILED
198        result.errors = exc.errors
199        result.failures = exc.failures
200        result.stats = exc.stats
201        return
202    except support.TestFailed as exc:
203        msg = f"test {test_name} failed"
204        if display_failure:
205            msg = f"{msg} -- {exc}"
206        print(msg, file=sys.stderr, flush=True)
207        result.state = State.FAILED
208        result.stats = exc.stats
209        return
210    except support.TestDidNotRun:
211        result.state = State.DID_NOT_RUN
212        return
213    except KeyboardInterrupt:
214        print()
215        result.state = State.INTERRUPTED
216        return
217    except:
218        if not pgo:
219            msg = traceback.format_exc()
220            print(f"test {test_name} crashed -- {msg}",
221                  file=sys.stderr, flush=True)
222        result.state = State.UNCAUGHT_EXC
223        return
224
225    if support.environment_altered:
226        result.set_env_changed()
227    # Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
228    if result.state is None:
229        result.state = State.PASSED
230
231
232def _runtest(result: TestResult, runtests: RunTests) -> None:
233    # Capture stdout and stderr, set faulthandler timeout,
234    # and create JUnit XML report.
235    verbose = runtests.verbose
236    output_on_failure = runtests.output_on_failure
237    timeout = runtests.timeout
238
239    if timeout is not None and threading_helper.can_start_thread:
240        use_timeout = True
241        faulthandler.dump_traceback_later(timeout, exit=True)
242    else:
243        use_timeout = False
244
245    try:
246        setup_tests(runtests)
247
248        if output_on_failure:
249            support.verbose = True
250
251            stream = io.StringIO()
252            orig_stdout = sys.stdout
253            orig_stderr = sys.stderr
254            print_warning = support.print_warning
255            orig_print_warnings_stderr = print_warning.orig_stderr
256
257            output = None
258            try:
259                sys.stdout = stream
260                sys.stderr = stream
261                # print_warning() writes into the temporary stream to preserve
262                # messages order. If support.environment_altered becomes true,
263                # warnings will be written to sys.stderr below.
264                print_warning.orig_stderr = stream
265
266                _runtest_env_changed_exc(result, runtests, display_failure=False)
267                # Ignore output if the test passed successfully
268                if result.state != State.PASSED:
269                    output = stream.getvalue()
270            finally:
271                sys.stdout = orig_stdout
272                sys.stderr = orig_stderr
273                print_warning.orig_stderr = orig_print_warnings_stderr
274
275            if output is not None:
276                sys.stderr.write(output)
277                sys.stderr.flush()
278        else:
279            # Tell tests to be moderately quiet
280            support.verbose = verbose
281            _runtest_env_changed_exc(result, runtests,
282                                     display_failure=not verbose)
283
284        xml_list = support.junit_xml_list
285        if xml_list:
286            result.xml_data = xml_list
287    finally:
288        if use_timeout:
289            faulthandler.cancel_dump_traceback_later()
290        support.junit_xml_list = None
291
292
293def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
294    """Run a single test.
295
296    test_name -- the name of the test
297
298    Returns a TestResult.
299
300    If runtests.use_junit, xml_data is a list containing each generated
301    testsuite element.
302    """
303    start_time = time.perf_counter()
304    result = TestResult(test_name)
305    pgo = runtests.pgo
306    try:
307        # gh-117783: don't immortalize deferred objects when tracking
308        # refleaks. Only releveant for the free-threaded build.
309        with support.suppress_immortalization(runtests.hunt_refleak):
310            _runtest(result, runtests)
311    except:
312        if not pgo:
313            msg = traceback.format_exc()
314            print(f"test {test_name} crashed -- {msg}",
315                  file=sys.stderr, flush=True)
316        result.state = State.UNCAUGHT_EXC
317
318    sys.stdout.flush()
319    sys.stderr.flush()
320
321    result.duration = time.perf_counter() - start_time
322    return result
323