1import datetime 2import faulthandler 3import json 4import locale 5import os 6import platform 7import random 8import re 9import sys 10import sysconfig 11import tempfile 12import time 13import unittest 14from test.libregrtest.cmdline import _parse_args 15from test.libregrtest.runtest import ( 16 findtests, runtest, get_abs_module, 17 STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, 18 INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, 19 PROGRESS_MIN_TIME, format_test_result) 20from test.libregrtest.setup import setup_tests 21from test.libregrtest.utils import removepy, count, format_duration, printlist 22from test import support 23try: 24 import gc 25except ImportError: 26 gc = None 27 28 29# When tests are run from the Python build directory, it is best practice 30# to keep the test files in a subfolder. This eases the cleanup of leftover 31# files using the "make distclean" command. 32if sysconfig.is_python_build(): 33 TEMPDIR = sysconfig.get_config_var('abs_builddir') 34 if TEMPDIR is None: 35 # bpo-30284: On Windows, only srcdir is available. Using abs_builddir 36 # mostly matters on UNIX when building Python out of the source tree, 37 # especially when the source tree is read only. 38 TEMPDIR = sysconfig.get_config_var('srcdir') 39 TEMPDIR = os.path.join(TEMPDIR, 'build') 40else: 41 TEMPDIR = tempfile.gettempdir() 42TEMPDIR = os.path.abspath(TEMPDIR) 43 44 45class Regrtest: 46 """Execute a test suite. 47 48 This also parses command-line options and modifies its behavior 49 accordingly. 50 51 tests -- a list of strings containing test names (optional) 52 testdir -- the directory in which to look for tests (optional) 53 54 Users other than the Python test suite will certainly want to 55 specify testdir; if it's omitted, the directory containing the 56 Python test suite is searched for. 57 58 If the tests argument is omitted, the tests listed on the 59 command-line will be used. If that's empty, too, then all *.py 60 files beginning with test_ will be used. 61 62 The other default arguments (verbose, quiet, exclude, 63 single, randomize, findleaks, use_resources, trace, coverdir, 64 print_slow, and random_seed) allow programmers calling main() 65 directly to set the values that would normally be set by flags 66 on the command line. 67 """ 68 def __init__(self): 69 # Namespace of command line options 70 self.ns = None 71 72 # tests 73 self.tests = [] 74 self.selected = [] 75 76 # test results 77 self.good = [] 78 self.bad = [] 79 self.skipped = [] 80 self.resource_denieds = [] 81 self.environment_changed = [] 82 self.rerun = [] 83 self.run_no_tests = [] 84 self.first_result = None 85 self.interrupted = False 86 87 # used by --slow 88 self.test_times = [] 89 90 # used by --coverage, trace.Trace instance 91 self.tracer = None 92 93 # used by --findleaks, store for gc.garbage 94 self.found_garbage = [] 95 96 # used to display the progress bar "[ 3/100]" 97 self.start_time = time.monotonic() 98 self.test_count = '' 99 self.test_count_width = 1 100 101 # used by --single 102 self.next_single_test = None 103 self.next_single_filename = None 104 105 # used by --junit-xml 106 self.testsuite_xml = None 107 108 def accumulate_result(self, test, result): 109 ok, test_time, xml_data = result 110 if ok not in (CHILD_ERROR, INTERRUPTED): 111 self.test_times.append((test_time, test)) 112 if ok == PASSED: 113 self.good.append(test) 114 elif ok in (FAILED, CHILD_ERROR): 115 self.bad.append(test) 116 elif ok == ENV_CHANGED: 117 self.environment_changed.append(test) 118 elif ok == SKIPPED: 119 self.skipped.append(test) 120 elif ok == RESOURCE_DENIED: 121 self.skipped.append(test) 122 self.resource_denieds.append(test) 123 elif ok == TEST_DID_NOT_RUN: 124 self.run_no_tests.append(test) 125 elif ok != INTERRUPTED: 126 raise ValueError("invalid test result: %r" % ok) 127 128 if xml_data: 129 import xml.etree.ElementTree as ET 130 for e in xml_data: 131 try: 132 self.testsuite_xml.append(ET.fromstring(e)) 133 except ET.ParseError: 134 print(xml_data, file=sys.__stderr__) 135 raise 136 137 def display_progress(self, test_index, test): 138 if self.ns.quiet: 139 return 140 141 # "[ 51/405/1] test_tcl passed" 142 line = f"{test_index:{self.test_count_width}}{self.test_count}" 143 fails = len(self.bad) + len(self.environment_changed) 144 if fails and not self.ns.pgo: 145 line = f"{line}/{fails}" 146 line = f"[{line}] {test}" 147 148 # add the system load prefix: "load avg: 1.80 " 149 if hasattr(os, 'getloadavg'): 150 load_avg_1min = os.getloadavg()[0] 151 line = f"load avg: {load_avg_1min:.2f} {line}" 152 153 # add the timestamp prefix: "0:01:05 " 154 test_time = time.monotonic() - self.start_time 155 test_time = datetime.timedelta(seconds=int(test_time)) 156 line = f"{test_time} {line}" 157 print(line, flush=True) 158 159 def parse_args(self, kwargs): 160 ns = _parse_args(sys.argv[1:], **kwargs) 161 162 if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'): 163 print("Warning: The timeout option requires " 164 "faulthandler.dump_traceback_later", file=sys.stderr) 165 ns.timeout = None 166 167 if ns.threshold is not None and gc is None: 168 print('No GC available, ignore --threshold.', file=sys.stderr) 169 ns.threshold = None 170 171 if ns.findleaks: 172 if gc is not None: 173 # Uncomment the line below to report garbage that is not 174 # freeable by reference counting alone. By default only 175 # garbage that is not collectable by the GC is reported. 176 pass 177 #gc.set_debug(gc.DEBUG_SAVEALL) 178 else: 179 print('No GC available, disabling --findleaks', 180 file=sys.stderr) 181 ns.findleaks = False 182 183 if ns.xmlpath: 184 support.junit_xml_list = self.testsuite_xml = [] 185 186 # Strip .py extensions. 187 removepy(ns.args) 188 189 return ns 190 191 def find_tests(self, tests): 192 self.tests = tests 193 194 if self.ns.single: 195 self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest') 196 try: 197 with open(self.next_single_filename, 'r') as fp: 198 next_test = fp.read().strip() 199 self.tests = [next_test] 200 except OSError: 201 pass 202 203 if self.ns.fromfile: 204 self.tests = [] 205 # regex to match 'test_builtin' in line: 206 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 207 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') 208 with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: 209 for line in fp: 210 line = line.split('#', 1)[0] 211 line = line.strip() 212 match = regex.search(line) 213 if match is not None: 214 self.tests.append(match.group()) 215 216 removepy(self.tests) 217 218 stdtests = STDTESTS[:] 219 nottests = NOTTESTS.copy() 220 if self.ns.exclude: 221 for arg in self.ns.args: 222 if arg in stdtests: 223 stdtests.remove(arg) 224 nottests.add(arg) 225 self.ns.args = [] 226 227 # if testdir is set, then we are not running the python tests suite, so 228 # don't add default tests to be executed or skipped (pass empty values) 229 if self.ns.testdir: 230 alltests = findtests(self.ns.testdir, list(), set()) 231 else: 232 alltests = findtests(self.ns.testdir, stdtests, nottests) 233 234 if not self.ns.fromfile: 235 self.selected = self.tests or self.ns.args or alltests 236 else: 237 self.selected = self.tests 238 if self.ns.single: 239 self.selected = self.selected[:1] 240 try: 241 pos = alltests.index(self.selected[0]) 242 self.next_single_test = alltests[pos + 1] 243 except IndexError: 244 pass 245 246 # Remove all the selected tests that precede start if it's set. 247 if self.ns.start: 248 try: 249 del self.selected[:self.selected.index(self.ns.start)] 250 except ValueError: 251 print("Couldn't find starting test (%s), using all tests" 252 % self.ns.start, file=sys.stderr) 253 254 if self.ns.randomize: 255 if self.ns.random_seed is None: 256 self.ns.random_seed = random.randrange(10000000) 257 random.seed(self.ns.random_seed) 258 random.shuffle(self.selected) 259 260 def list_tests(self): 261 for name in self.selected: 262 print(name) 263 264 def _list_cases(self, suite): 265 for test in suite: 266 if isinstance(test, unittest.loader._FailedTest): 267 continue 268 if isinstance(test, unittest.TestSuite): 269 self._list_cases(test) 270 elif isinstance(test, unittest.TestCase): 271 if support.match_test(test): 272 print(test.id()) 273 274 def list_cases(self): 275 support.verbose = False 276 support.set_match_tests(self.ns.match_tests) 277 278 for test in self.selected: 279 abstest = get_abs_module(self.ns, test) 280 try: 281 suite = unittest.defaultTestLoader.loadTestsFromName(abstest) 282 self._list_cases(suite) 283 except unittest.SkipTest: 284 self.skipped.append(test) 285 286 if self.skipped: 287 print(file=sys.stderr) 288 print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) 289 printlist(self.skipped, file=sys.stderr) 290 291 def rerun_failed_tests(self): 292 self.ns.verbose = True 293 self.ns.failfast = False 294 self.ns.verbose3 = False 295 296 self.first_result = self.get_tests_result() 297 298 print() 299 print("Re-running failed tests in verbose mode") 300 self.rerun = self.bad[:] 301 for test in self.rerun: 302 print("Re-running test %r in verbose mode" % test, flush=True) 303 try: 304 self.ns.verbose = True 305 ok = runtest(self.ns, test) 306 except KeyboardInterrupt: 307 self.interrupted = True 308 # print a newline separate from the ^C 309 print() 310 break 311 else: 312 if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: 313 self.bad.remove(test) 314 else: 315 if self.bad: 316 print(count(len(self.bad), 'test'), "failed again:") 317 printlist(self.bad) 318 319 self.display_result() 320 321 def display_result(self): 322 # If running the test suite for PGO then no one cares about results. 323 if self.ns.pgo: 324 return 325 326 print() 327 print("== Tests result: %s ==" % self.get_tests_result()) 328 329 if self.interrupted: 330 print() 331 # print a newline after ^C 332 print("Test suite interrupted by signal SIGINT.") 333 executed = set(self.good) | set(self.bad) | set(self.skipped) 334 omitted = set(self.selected) - executed 335 print(count(len(omitted), "test"), "omitted:") 336 printlist(omitted) 337 338 if self.good and not self.ns.quiet: 339 print() 340 if (not self.bad 341 and not self.skipped 342 and not self.interrupted 343 and len(self.good) > 1): 344 print("All", end=' ') 345 print(count(len(self.good), "test"), "OK.") 346 347 if self.ns.print_slow: 348 self.test_times.sort(reverse=True) 349 print() 350 print("10 slowest tests:") 351 for time, test in self.test_times[:10]: 352 print("- %s: %s" % (test, format_duration(time))) 353 354 if self.bad: 355 print() 356 print(count(len(self.bad), "test"), "failed:") 357 printlist(self.bad) 358 359 if self.environment_changed: 360 print() 361 print("{} altered the execution environment:".format( 362 count(len(self.environment_changed), "test"))) 363 printlist(self.environment_changed) 364 365 if self.skipped and not self.ns.quiet: 366 print() 367 print(count(len(self.skipped), "test"), "skipped:") 368 printlist(self.skipped) 369 370 if self.rerun: 371 print() 372 print("%s:" % count(len(self.rerun), "re-run test")) 373 printlist(self.rerun) 374 375 if self.run_no_tests: 376 print() 377 print(count(len(self.run_no_tests), "test"), "run no tests:") 378 printlist(self.run_no_tests) 379 380 def run_tests_sequential(self): 381 if self.ns.trace: 382 import trace 383 self.tracer = trace.Trace(trace=False, count=True) 384 385 save_modules = sys.modules.keys() 386 387 print("Run tests sequentially") 388 389 previous_test = None 390 for test_index, test in enumerate(self.tests, 1): 391 start_time = time.monotonic() 392 393 text = test 394 if previous_test: 395 text = '%s -- %s' % (text, previous_test) 396 self.display_progress(test_index, text) 397 398 if self.tracer: 399 # If we're tracing code coverage, then we don't exit with status 400 # if on a false return value from main. 401 cmd = ('result = runtest(self.ns, test); ' 402 'self.accumulate_result(test, result)') 403 ns = dict(locals()) 404 self.tracer.runctx(cmd, globals=globals(), locals=ns) 405 result = ns['result'] 406 else: 407 try: 408 result = runtest(self.ns, test) 409 except KeyboardInterrupt: 410 self.interrupted = True 411 self.accumulate_result(test, (INTERRUPTED, None, None)) 412 break 413 else: 414 self.accumulate_result(test, result) 415 416 previous_test = format_test_result(test, result[0]) 417 test_time = time.monotonic() - start_time 418 if test_time >= PROGRESS_MIN_TIME: 419 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 420 elif result[0] == PASSED: 421 # be quiet: say nothing if the test passed shortly 422 previous_test = None 423 424 if self.ns.findleaks: 425 gc.collect() 426 if gc.garbage: 427 print("Warning: test created", len(gc.garbage), end=' ') 428 print("uncollectable object(s).") 429 # move the uncollectable objects somewhere so we don't see 430 # them again 431 self.found_garbage.extend(gc.garbage) 432 del gc.garbage[:] 433 434 # Unload the newly imported modules (best effort finalization) 435 for module in sys.modules.keys(): 436 if module not in save_modules and module.startswith("test."): 437 support.unload(module) 438 439 if previous_test: 440 print(previous_test) 441 442 def _test_forever(self, tests): 443 while True: 444 for test in tests: 445 yield test 446 if self.bad: 447 return 448 if self.ns.fail_env_changed and self.environment_changed: 449 return 450 451 def display_header(self): 452 # Print basic platform information 453 print("==", platform.python_implementation(), *sys.version.split()) 454 print("==", platform.platform(aliased=True), 455 "%s-endian" % sys.byteorder) 456 print("== cwd:", os.getcwd()) 457 cpu_count = os.cpu_count() 458 if cpu_count: 459 print("== CPU count:", cpu_count) 460 print("== encodings: locale=%s, FS=%s" 461 % (locale.getpreferredencoding(False), 462 sys.getfilesystemencoding())) 463 464 def get_tests_result(self): 465 result = [] 466 if self.bad: 467 result.append("FAILURE") 468 elif self.ns.fail_env_changed and self.environment_changed: 469 result.append("ENV CHANGED") 470 elif not any((self.good, self.bad, self.skipped, self.interrupted, 471 self.environment_changed)): 472 result.append("NO TEST RUN") 473 474 if self.interrupted: 475 result.append("INTERRUPTED") 476 477 if not result: 478 result.append("SUCCESS") 479 480 result = ', '.join(result) 481 if self.first_result: 482 result = '%s then %s' % (self.first_result, result) 483 return result 484 485 def run_tests(self): 486 # For a partial run, we do not need to clutter the output. 487 if (self.ns.header 488 or not(self.ns.pgo or self.ns.quiet or self.ns.single 489 or self.tests or self.ns.args)): 490 self.display_header() 491 492 if self.ns.huntrleaks: 493 warmup, repetitions, _ = self.ns.huntrleaks 494 if warmup < 3: 495 msg = ("WARNING: Running tests with --huntrleaks/-R and less than " 496 "3 warmup repetitions can give false positives!") 497 print(msg, file=sys.stdout, flush=True) 498 499 if self.ns.randomize: 500 print("Using random seed", self.ns.random_seed) 501 502 if self.ns.forever: 503 self.tests = self._test_forever(list(self.selected)) 504 self.test_count = '' 505 self.test_count_width = 3 506 else: 507 self.tests = iter(self.selected) 508 self.test_count = '/{}'.format(len(self.selected)) 509 self.test_count_width = len(self.test_count) - 1 510 511 if self.ns.use_mp: 512 from test.libregrtest.runtest_mp import run_tests_multiprocess 513 run_tests_multiprocess(self) 514 else: 515 self.run_tests_sequential() 516 517 def finalize(self): 518 if self.next_single_filename: 519 if self.next_single_test: 520 with open(self.next_single_filename, 'w') as fp: 521 fp.write(self.next_single_test + '\n') 522 else: 523 os.unlink(self.next_single_filename) 524 525 if self.tracer: 526 r = self.tracer.results() 527 r.write_results(show_missing=True, summary=True, 528 coverdir=self.ns.coverdir) 529 530 print() 531 duration = time.monotonic() - self.start_time 532 print("Total duration: %s" % format_duration(duration)) 533 print("Tests result: %s" % self.get_tests_result()) 534 535 if self.ns.runleaks: 536 os.system("leaks %d" % os.getpid()) 537 538 def save_xml_result(self): 539 if not self.ns.xmlpath and not self.testsuite_xml: 540 return 541 542 import xml.etree.ElementTree as ET 543 root = ET.Element("testsuites") 544 545 # Manually count the totals for the overall summary 546 totals = {'tests': 0, 'errors': 0, 'failures': 0} 547 for suite in self.testsuite_xml: 548 root.append(suite) 549 for k in totals: 550 try: 551 totals[k] += int(suite.get(k, 0)) 552 except ValueError: 553 pass 554 555 for k, v in totals.items(): 556 root.set(k, str(v)) 557 558 xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) 559 with open(xmlpath, 'wb') as f: 560 for s in ET.tostringlist(root): 561 f.write(s) 562 563 def main(self, tests=None, **kwargs): 564 global TEMPDIR 565 self.ns = self.parse_args(kwargs) 566 567 if self.ns.tempdir: 568 TEMPDIR = self.ns.tempdir 569 elif self.ns.worker_args: 570 ns_dict, _ = json.loads(self.ns.worker_args) 571 TEMPDIR = ns_dict.get("tempdir") or TEMPDIR 572 573 os.makedirs(TEMPDIR, exist_ok=True) 574 575 # Define a writable temp dir that will be used as cwd while running 576 # the tests. The name of the dir includes the pid to allow parallel 577 # testing (see the -j option). 578 test_cwd = 'test_python_{}'.format(os.getpid()) 579 test_cwd = os.path.join(TEMPDIR, test_cwd) 580 581 # Run the tests in a context manager that temporarily changes the CWD to a 582 # temporary and writable directory. If it's not possible to create or 583 # change the CWD, the original CWD will be used. The original CWD is 584 # available from support.SAVEDCWD. 585 with support.temp_cwd(test_cwd, quiet=True): 586 self._main(tests, kwargs) 587 588 def _main(self, tests, kwargs): 589 if self.ns.huntrleaks: 590 warmup, repetitions, _ = self.ns.huntrleaks 591 if warmup < 1 or repetitions < 1: 592 msg = ("Invalid values for the --huntrleaks/-R parameters. The " 593 "number of warmups and repetitions must be at least 1 " 594 "each (1:1).") 595 print(msg, file=sys.stderr, flush=True) 596 sys.exit(2) 597 598 if self.ns.worker_args is not None: 599 from test.libregrtest.runtest_mp import run_tests_worker 600 run_tests_worker(self.ns.worker_args) 601 602 if self.ns.wait: 603 input("Press any key to continue...") 604 605 support.PGO = self.ns.pgo 606 607 setup_tests(self.ns) 608 609 self.find_tests(tests) 610 611 if self.ns.list_tests: 612 self.list_tests() 613 sys.exit(0) 614 615 if self.ns.list_cases: 616 self.list_cases() 617 sys.exit(0) 618 619 self.run_tests() 620 self.display_result() 621 622 if self.ns.verbose2 and self.bad: 623 self.rerun_failed_tests() 624 625 self.finalize() 626 627 self.save_xml_result() 628 629 if self.bad: 630 sys.exit(2) 631 if self.interrupted: 632 sys.exit(130) 633 if self.ns.fail_env_changed and self.environment_changed: 634 sys.exit(3) 635 sys.exit(0) 636 637 638def main(tests=None, **kwargs): 639 """Run the Python suite.""" 640 Regrtest().main(tests=tests, **kwargs) 641