1import faulthandler 2import gc 3import importlib 4import io 5import sys 6import time 7import traceback 8import unittest 9 10from test import support 11from test.support import threading_helper 12 13from .filter import match_test 14from .result import State, TestResult, TestStats 15from .runtests import RunTests 16from .save_env import saved_test_environment 17from .setup import setup_tests 18from .testresult import get_test_runner 19from .utils import ( 20 TestName, 21 clear_caches, remove_testfn, abs_module_name, print_warning) 22 23 24# Minimum duration of a test to display its duration or to mention that 25# the test is running in background 26PROGRESS_MIN_TIME = 30.0 # seconds 27 28 29def run_unittest(test_mod): 30 loader = unittest.TestLoader() 31 tests = loader.loadTestsFromModule(test_mod) 32 for error in loader.errors: 33 print(error, file=sys.stderr) 34 if loader.errors: 35 raise Exception("errors while loading tests") 36 _filter_suite(tests, match_test) 37 return _run_suite(tests) 38 39def _filter_suite(suite, pred): 40 """Recursively filter test cases in a suite based on a predicate.""" 41 newtests = [] 42 for test in suite._tests: 43 if isinstance(test, unittest.TestSuite): 44 _filter_suite(test, pred) 45 newtests.append(test) 46 else: 47 if pred(test): 48 newtests.append(test) 49 suite._tests = newtests 50 51def _run_suite(suite): 52 """Run tests from a unittest.TestSuite-derived class.""" 53 runner = get_test_runner(sys.stdout, 54 verbosity=support.verbose, 55 capture_output=(support.junit_xml_list is not None)) 56 57 result = runner.run(suite) 58 59 if support.junit_xml_list is not None: 60 import xml.etree.ElementTree as ET 61 xml_elem = result.get_xml_element() 62 xml_str = ET.tostring(xml_elem).decode('ascii') 63 support.junit_xml_list.append(xml_str) 64 65 if not result.testsRun and not result.skipped and not result.errors: 66 raise support.TestDidNotRun 67 if not result.wasSuccessful(): 68 stats = TestStats.from_unittest(result) 69 if len(result.errors) == 1 and not result.failures: 70 err = result.errors[0][1] 71 elif len(result.failures) == 1 and not result.errors: 72 err = result.failures[0][1] 73 else: 74 err = "multiple errors occurred" 75 if not support.verbose: err += "; run in verbose mode for details" 76 errors = [(str(tc), exc_str) for tc, exc_str in result.errors] 77 failures = [(str(tc), exc_str) for tc, exc_str in result.failures] 78 raise support.TestFailedWithDetails(err, errors, failures, stats=stats) 79 return result 80 81 82def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None: 83 # Run test_func(), collect statistics, and detect reference and memory 84 # leaks. 85 if runtests.hunt_refleak: 86 from .refleak import runtest_refleak 87 refleak, test_result = runtest_refleak(result.test_name, test_func, 88 runtests.hunt_refleak, 89 runtests.quiet) 90 else: 91 test_result = test_func() 92 refleak = False 93 94 if refleak: 95 result.state = State.REFLEAK 96 97 stats: TestStats | None 98 99 match test_result: 100 case TestStats(): 101 stats = test_result 102 case unittest.TestResult(): 103 stats = TestStats.from_unittest(test_result) 104 case None: 105 print_warning(f"{result.test_name} test runner returned None: {test_func}") 106 stats = None 107 case _: 108 # Don't import doctest at top level since only few tests return 109 # a doctest.TestResult instance. 110 import doctest 111 if isinstance(test_result, doctest.TestResults): 112 stats = TestStats.from_doctest(test_result) 113 else: 114 print_warning(f"Unknown test result type: {type(test_result)}") 115 stats = None 116 117 result.stats = stats 118 119 120# Storage of uncollectable GC objects (gc.garbage) 121GC_GARBAGE = [] 122 123 124def _load_run_test(result: TestResult, runtests: RunTests) -> None: 125 # Load the test module and run the tests. 126 test_name = result.test_name 127 module_name = abs_module_name(test_name, runtests.test_dir) 128 test_mod = importlib.import_module(module_name) 129 130 if hasattr(test_mod, "test_main"): 131 # https://github.com/python/cpython/issues/89392 132 raise Exception(f"Module {test_name} defines test_main() which " 133 f"is no longer supported by regrtest") 134 def test_func(): 135 return run_unittest(test_mod) 136 137 try: 138 regrtest_runner(result, test_func, runtests) 139 finally: 140 # First kill any dangling references to open files etc. 141 # This can also issue some ResourceWarnings which would otherwise get 142 # triggered during the following test run, and possibly produce 143 # failures. 144 support.gc_collect() 145 146 remove_testfn(test_name, runtests.verbose) 147 148 if gc.garbage: 149 support.environment_altered = True 150 print_warning(f"{test_name} created {len(gc.garbage)} " 151 f"uncollectable object(s)") 152 153 # move the uncollectable objects somewhere, 154 # so we don't see them again 155 GC_GARBAGE.extend(gc.garbage) 156 gc.garbage.clear() 157 158 support.reap_children() 159 160 161def _runtest_env_changed_exc(result: TestResult, runtests: RunTests, 162 display_failure: bool = True) -> None: 163 # Handle exceptions, detect environment changes. 164 165 # Reset the environment_altered flag to detect if a test altered 166 # the environment 167 support.environment_altered = False 168 169 pgo = runtests.pgo 170 if pgo: 171 display_failure = False 172 quiet = runtests.quiet 173 174 test_name = result.test_name 175 try: 176 clear_caches() 177 support.gc_collect() 178 179 with saved_test_environment(test_name, 180 runtests.verbose, quiet, pgo=pgo): 181 _load_run_test(result, runtests) 182 except support.ResourceDenied as exc: 183 if not quiet and not pgo: 184 print(f"{test_name} skipped -- {exc}", flush=True) 185 result.state = State.RESOURCE_DENIED 186 return 187 except unittest.SkipTest as exc: 188 if not quiet and not pgo: 189 print(f"{test_name} skipped -- {exc}", flush=True) 190 result.state = State.SKIPPED 191 return 192 except support.TestFailedWithDetails as exc: 193 msg = f"test {test_name} failed" 194 if display_failure: 195 msg = f"{msg} -- {exc}" 196 print(msg, file=sys.stderr, flush=True) 197 result.state = State.FAILED 198 result.errors = exc.errors 199 result.failures = exc.failures 200 result.stats = exc.stats 201 return 202 except support.TestFailed as exc: 203 msg = f"test {test_name} failed" 204 if display_failure: 205 msg = f"{msg} -- {exc}" 206 print(msg, file=sys.stderr, flush=True) 207 result.state = State.FAILED 208 result.stats = exc.stats 209 return 210 except support.TestDidNotRun: 211 result.state = State.DID_NOT_RUN 212 return 213 except KeyboardInterrupt: 214 print() 215 result.state = State.INTERRUPTED 216 return 217 except: 218 if not pgo: 219 msg = traceback.format_exc() 220 print(f"test {test_name} crashed -- {msg}", 221 file=sys.stderr, flush=True) 222 result.state = State.UNCAUGHT_EXC 223 return 224 225 if support.environment_altered: 226 result.set_env_changed() 227 # Don't override the state if it was already set (REFLEAK or ENV_CHANGED) 228 if result.state is None: 229 result.state = State.PASSED 230 231 232def _runtest(result: TestResult, runtests: RunTests) -> None: 233 # Capture stdout and stderr, set faulthandler timeout, 234 # and create JUnit XML report. 235 verbose = runtests.verbose 236 output_on_failure = runtests.output_on_failure 237 timeout = runtests.timeout 238 239 if timeout is not None and threading_helper.can_start_thread: 240 use_timeout = True 241 faulthandler.dump_traceback_later(timeout, exit=True) 242 else: 243 use_timeout = False 244 245 try: 246 setup_tests(runtests) 247 248 if output_on_failure: 249 support.verbose = True 250 251 stream = io.StringIO() 252 orig_stdout = sys.stdout 253 orig_stderr = sys.stderr 254 print_warning = support.print_warning 255 orig_print_warnings_stderr = print_warning.orig_stderr 256 257 output = None 258 try: 259 sys.stdout = stream 260 sys.stderr = stream 261 # print_warning() writes into the temporary stream to preserve 262 # messages order. If support.environment_altered becomes true, 263 # warnings will be written to sys.stderr below. 264 print_warning.orig_stderr = stream 265 266 _runtest_env_changed_exc(result, runtests, display_failure=False) 267 # Ignore output if the test passed successfully 268 if result.state != State.PASSED: 269 output = stream.getvalue() 270 finally: 271 sys.stdout = orig_stdout 272 sys.stderr = orig_stderr 273 print_warning.orig_stderr = orig_print_warnings_stderr 274 275 if output is not None: 276 sys.stderr.write(output) 277 sys.stderr.flush() 278 else: 279 # Tell tests to be moderately quiet 280 support.verbose = verbose 281 _runtest_env_changed_exc(result, runtests, 282 display_failure=not verbose) 283 284 xml_list = support.junit_xml_list 285 if xml_list: 286 result.xml_data = xml_list 287 finally: 288 if use_timeout: 289 faulthandler.cancel_dump_traceback_later() 290 support.junit_xml_list = None 291 292 293def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult: 294 """Run a single test. 295 296 test_name -- the name of the test 297 298 Returns a TestResult. 299 300 If runtests.use_junit, xml_data is a list containing each generated 301 testsuite element. 302 """ 303 start_time = time.perf_counter() 304 result = TestResult(test_name) 305 pgo = runtests.pgo 306 try: 307 # gh-117783: don't immortalize deferred objects when tracking 308 # refleaks. Only releveant for the free-threaded build. 309 with support.suppress_immortalization(runtests.hunt_refleak): 310 _runtest(result, runtests) 311 except: 312 if not pgo: 313 msg = traceback.format_exc() 314 print(f"test {test_name} crashed -- {msg}", 315 file=sys.stderr, flush=True) 316 result.state = State.UNCAUGHT_EXC 317 318 sys.stdout.flush() 319 sys.stderr.flush() 320 321 result.duration = time.perf_counter() - start_time 322 return result 323