1import collections 2import faulthandler 3import functools 4import gc 5import importlib 6import io 7import os 8import sys 9import time 10import traceback 11import unittest 12 13from test import support 14from test.libregrtest.refleak import dash_R, clear_caches 15from test.libregrtest.save_env import saved_test_environment 16from test.libregrtest.utils import format_duration, print_warning 17 18 19# Test result constants. 20PASSED = 1 21FAILED = 0 22ENV_CHANGED = -1 23SKIPPED = -2 24RESOURCE_DENIED = -3 25INTERRUPTED = -4 26CHILD_ERROR = -5 # error in a child process 27TEST_DID_NOT_RUN = -6 28TIMEOUT = -7 29 30_FORMAT_TEST_RESULT = { 31 PASSED: '%s passed', 32 FAILED: '%s failed', 33 ENV_CHANGED: '%s failed (env changed)', 34 SKIPPED: '%s skipped', 35 RESOURCE_DENIED: '%s skipped (resource denied)', 36 INTERRUPTED: '%s interrupted', 37 CHILD_ERROR: '%s crashed', 38 TEST_DID_NOT_RUN: '%s run no tests', 39 TIMEOUT: '%s timed out', 40} 41 42# Minimum duration of a test to display its duration or to mention that 43# the test is running in background 44PROGRESS_MIN_TIME = 30.0 # seconds 45 46# small set of tests to determine if we have a basically functioning interpreter 47# (i.e. if any of these fail, then anything else is likely to follow) 48STDTESTS = [ 49 'test_grammar', 50 'test_opcodes', 51 'test_dict', 52 'test_builtin', 53 'test_exceptions', 54 'test_types', 55 'test_unittest', 56 'test_doctest', 57 'test_doctest2', 58 'test_support' 59] 60 61# set of tests that we don't want to be executed when using regrtest 62NOTTESTS = set() 63 64 65# used by --findleaks, store for gc.garbage 66FOUND_GARBAGE = [] 67 68 69def is_failed(result, ns): 70 ok = result.result 71 if ok in (PASSED, RESOURCE_DENIED, SKIPPED, TEST_DID_NOT_RUN): 72 return False 73 if ok == ENV_CHANGED: 74 return ns.fail_env_changed 75 return True 76 77 78def format_test_result(result): 79 fmt = _FORMAT_TEST_RESULT.get(result.result, "%s") 80 text = fmt % result.test_name 81 if result.result == TIMEOUT: 82 text = '%s (%s)' % (text, format_duration(result.test_time)) 83 return text 84 85 86def findtestdir(path=None): 87 return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir 88 89 90def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS): 91 """Return a list of all applicable test modules.""" 92 testdir = findtestdir(testdir) 93 names = os.listdir(testdir) 94 tests = [] 95 others = set(stdtests) | nottests 96 for name in names: 97 mod, ext = os.path.splitext(name) 98 if mod[:5] == "test_" and ext in (".py", "") and mod not in others: 99 tests.append(mod) 100 return stdtests + sorted(tests) 101 102 103def get_abs_module(ns, test_name): 104 if test_name.startswith('test.') or ns.testdir: 105 return test_name 106 else: 107 # Import it from the test package 108 return 'test.' + test_name 109 110 111TestResult = collections.namedtuple('TestResult', 112 'test_name result test_time xml_data') 113 114def _runtest(ns, test_name): 115 # Handle faulthandler timeout, capture stdout+stderr, XML serialization 116 # and measure time. 117 118 output_on_failure = ns.verbose3 119 120 use_timeout = (ns.timeout is not None) 121 if use_timeout: 122 faulthandler.dump_traceback_later(ns.timeout, exit=True) 123 124 start_time = time.perf_counter() 125 try: 126 support.set_match_tests(ns.match_tests) 127 support.junit_xml_list = xml_list = [] if ns.xmlpath else None 128 if ns.failfast: 129 support.failfast = True 130 131 if output_on_failure: 132 support.verbose = True 133 134 stream = io.StringIO() 135 orig_stdout = sys.stdout 136 orig_stderr = sys.stderr 137 try: 138 sys.stdout = stream 139 sys.stderr = stream 140 result = _runtest_inner(ns, test_name, 141 display_failure=False) 142 if result != PASSED: 143 output = stream.getvalue() 144 orig_stderr.write(output) 145 orig_stderr.flush() 146 finally: 147 sys.stdout = orig_stdout 148 sys.stderr = orig_stderr 149 else: 150 # Tell tests to be moderately quiet 151 support.verbose = ns.verbose 152 153 result = _runtest_inner(ns, test_name, 154 display_failure=not ns.verbose) 155 156 if xml_list: 157 import xml.etree.ElementTree as ET 158 xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list] 159 else: 160 xml_data = None 161 162 test_time = time.perf_counter() - start_time 163 164 return TestResult(test_name, result, test_time, xml_data) 165 finally: 166 if use_timeout: 167 faulthandler.cancel_dump_traceback_later() 168 support.junit_xml_list = None 169 170 171def runtest(ns, test_name): 172 """Run a single test. 173 174 ns -- regrtest namespace of options 175 test_name -- the name of the test 176 177 Returns the tuple (result, test_time, xml_data), where result is one 178 of the constants: 179 180 INTERRUPTED KeyboardInterrupt 181 RESOURCE_DENIED test skipped because resource denied 182 SKIPPED test skipped for some other reason 183 ENV_CHANGED test failed because it changed the execution environment 184 FAILED test failed 185 PASSED test passed 186 EMPTY_TEST_SUITE test ran no subtests. 187 TIMEOUT test timed out. 188 189 If ns.xmlpath is not None, xml_data is a list containing each 190 generated testsuite element. 191 """ 192 try: 193 return _runtest(ns, test_name) 194 except: 195 if not ns.pgo: 196 msg = traceback.format_exc() 197 print(f"test {test_name} crashed -- {msg}", 198 file=sys.stderr, flush=True) 199 return TestResult(test_name, FAILED, 0.0, None) 200 201 202def _test_module(the_module): 203 loader = unittest.TestLoader() 204 tests = loader.loadTestsFromModule(the_module) 205 for error in loader.errors: 206 print(error, file=sys.stderr) 207 if loader.errors: 208 raise Exception("errors while loading tests") 209 support.run_unittest(tests) 210 211 212def _runtest_inner2(ns, test_name): 213 # Load the test function, run the test function, handle huntrleaks 214 # and findleaks to detect leaks 215 216 abstest = get_abs_module(ns, test_name) 217 218 # remove the module from sys.module to reload it if it was already imported 219 support.unload(abstest) 220 221 the_module = importlib.import_module(abstest) 222 223 # If the test has a test_main, that will run the appropriate 224 # tests. If not, use normal unittest test loading. 225 test_runner = getattr(the_module, "test_main", None) 226 if test_runner is None: 227 test_runner = functools.partial(_test_module, the_module) 228 229 try: 230 if ns.huntrleaks: 231 # Return True if the test leaked references 232 refleak = dash_R(ns, test_name, test_runner) 233 else: 234 test_runner() 235 refleak = False 236 finally: 237 cleanup_test_droppings(test_name, ns.verbose) 238 239 support.gc_collect() 240 241 if gc.garbage: 242 support.environment_altered = True 243 print_warning(f"{test_name} created {len(gc.garbage)} " 244 f"uncollectable object(s).") 245 246 # move the uncollectable objects somewhere, 247 # so we don't see them again 248 FOUND_GARBAGE.extend(gc.garbage) 249 gc.garbage.clear() 250 251 support.reap_children() 252 253 return refleak 254 255 256def _runtest_inner(ns, test_name, display_failure=True): 257 # Detect environment changes, handle exceptions. 258 259 # Reset the environment_altered flag to detect if a test altered 260 # the environment 261 support.environment_altered = False 262 263 if ns.pgo: 264 display_failure = False 265 266 try: 267 clear_caches() 268 269 with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment: 270 refleak = _runtest_inner2(ns, test_name) 271 except support.ResourceDenied as msg: 272 if not ns.quiet and not ns.pgo: 273 print(f"{test_name} skipped -- {msg}", flush=True) 274 return RESOURCE_DENIED 275 except unittest.SkipTest as msg: 276 if not ns.quiet and not ns.pgo: 277 print(f"{test_name} skipped -- {msg}", flush=True) 278 return SKIPPED 279 except support.TestFailed as exc: 280 msg = f"test {test_name} failed" 281 if display_failure: 282 msg = f"{msg} -- {exc}" 283 print(msg, file=sys.stderr, flush=True) 284 return FAILED 285 except support.TestDidNotRun: 286 return TEST_DID_NOT_RUN 287 except KeyboardInterrupt: 288 print() 289 return INTERRUPTED 290 except: 291 if not ns.pgo: 292 msg = traceback.format_exc() 293 print(f"test {test_name} crashed -- {msg}", 294 file=sys.stderr, flush=True) 295 return FAILED 296 297 if refleak: 298 return FAILED 299 if environment.changed: 300 return ENV_CHANGED 301 return PASSED 302 303 304def cleanup_test_droppings(test_name, verbose): 305 # First kill any dangling references to open files etc. 306 # This can also issue some ResourceWarnings which would otherwise get 307 # triggered during the following test run, and possibly produce failures. 308 support.gc_collect() 309 310 # Try to clean up junk commonly left behind. While tests shouldn't leave 311 # any files or directories behind, when a test fails that can be tedious 312 # for it to arrange. The consequences can be especially nasty on Windows, 313 # since if a test leaves a file open, it cannot be deleted by name (while 314 # there's nothing we can do about that here either, we can display the 315 # name of the offending test, which is a real help). 316 for name in (support.TESTFN,): 317 if not os.path.exists(name): 318 continue 319 320 if os.path.isdir(name): 321 import shutil 322 kind, nuker = "directory", shutil.rmtree 323 elif os.path.isfile(name): 324 kind, nuker = "file", os.unlink 325 else: 326 raise RuntimeError(f"os.path says {name!r} exists but is neither " 327 f"directory nor file") 328 329 if verbose: 330 print_warning("%r left behind %s %r" % (test_name, kind, name)) 331 support.environment_altered = True 332 333 try: 334 import stat 335 # fix possible permissions problems that might prevent cleanup 336 os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) 337 nuker(name) 338 except Exception as exc: 339 print_warning(f"{test_name} left behind {kind} {name!r} " 340 f"and it couldn't be removed: {exc}") 341