1import faulthandler 2import locale 3import os 4import platform 5import random 6import re 7import sys 8import sysconfig 9import tempfile 10import time 11import unittest 12from test.libregrtest.cmdline import _parse_args 13from test.libregrtest.runtest import ( 14 findtests, runtest, get_abs_module, is_failed, 15 STDTESTS, NOTTESTS, PROGRESS_MIN_TIME, 16 Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted, 17 ChildError, DidNotRun) 18from test.libregrtest.setup import setup_tests 19from test.libregrtest.pgo import setup_pgo_tests 20from test.libregrtest.utils import removepy, count, format_duration, printlist 21from test import support 22from test.support import os_helper 23 24 25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). 26# Used to protect against threading._shutdown() hang. 27# Must be smaller than buildbot "1200 seconds without output" limit. 28EXIT_TIMEOUT = 120.0 29 30 31class Regrtest: 32 """Execute a test suite. 33 34 This also parses command-line options and modifies its behavior 35 accordingly. 36 37 tests -- a list of strings containing test names (optional) 38 testdir -- the directory in which to look for tests (optional) 39 40 Users other than the Python test suite will certainly want to 41 specify testdir; if it's omitted, the directory containing the 42 Python test suite is searched for. 43 44 If the tests argument is omitted, the tests listed on the 45 command-line will be used. If that's empty, too, then all *.py 46 files beginning with test_ will be used. 47 48 The other default arguments (verbose, quiet, exclude, 49 single, randomize, findleaks, use_resources, trace, coverdir, 50 print_slow, and random_seed) allow programmers calling main() 51 directly to set the values that would normally be set by flags 52 on the command line. 53 """ 54 def __init__(self): 55 # Namespace of command line options 56 self.ns = None 57 58 # tests 59 self.tests = [] 60 self.selected = [] 61 62 # test results 63 self.good = [] 64 self.bad = [] 65 self.skipped = [] 66 self.resource_denieds = [] 67 self.environment_changed = [] 68 self.run_no_tests = [] 69 self.need_rerun = [] 70 self.rerun = [] 71 self.first_result = None 72 self.interrupted = False 73 74 # used by --slow 75 self.test_times = [] 76 77 # used by --coverage, trace.Trace instance 78 self.tracer = None 79 80 # used to display the progress bar "[ 3/100]" 81 self.start_time = time.monotonic() 82 self.test_count = '' 83 self.test_count_width = 1 84 85 # used by --single 86 self.next_single_test = None 87 self.next_single_filename = None 88 89 # used by --junit-xml 90 self.testsuite_xml = None 91 92 # misc 93 self.win_load_tracker = None 94 self.tmp_dir = None 95 self.worker_test_name = None 96 97 def get_executed(self): 98 return (set(self.good) | set(self.bad) | set(self.skipped) 99 | set(self.resource_denieds) | set(self.environment_changed) 100 | set(self.run_no_tests)) 101 102 def accumulate_result(self, result, rerun=False): 103 test_name = result.name 104 105 if not isinstance(result, (ChildError, Interrupted)) and not rerun: 106 self.test_times.append((result.duration_sec, test_name)) 107 108 if isinstance(result, Passed): 109 self.good.append(test_name) 110 elif isinstance(result, ResourceDenied): 111 self.skipped.append(test_name) 112 self.resource_denieds.append(test_name) 113 elif isinstance(result, Skipped): 114 self.skipped.append(test_name) 115 elif isinstance(result, EnvChanged): 116 self.environment_changed.append(test_name) 117 elif isinstance(result, Failed): 118 if not rerun: 119 self.bad.append(test_name) 120 self.need_rerun.append(result) 121 elif isinstance(result, DidNotRun): 122 self.run_no_tests.append(test_name) 123 elif isinstance(result, Interrupted): 124 self.interrupted = True 125 else: 126 raise ValueError("invalid test result: %r" % result) 127 128 if rerun and not isinstance(result, (Failed, Interrupted)): 129 self.bad.remove(test_name) 130 131 xml_data = result.xml_data 132 if xml_data: 133 import xml.etree.ElementTree as ET 134 for e in xml_data: 135 try: 136 self.testsuite_xml.append(ET.fromstring(e)) 137 except ET.ParseError: 138 print(xml_data, file=sys.__stderr__) 139 raise 140 141 def log(self, line=''): 142 empty = not line 143 144 # add the system load prefix: "load avg: 1.80 " 145 load_avg = self.getloadavg() 146 if load_avg is not None: 147 line = f"load avg: {load_avg:.2f} {line}" 148 149 # add the timestamp prefix: "0:01:05 " 150 test_time = time.monotonic() - self.start_time 151 152 mins, secs = divmod(int(test_time), 60) 153 hours, mins = divmod(mins, 60) 154 test_time = "%d:%02d:%02d" % (hours, mins, secs) 155 156 line = f"{test_time} {line}" 157 if empty: 158 line = line[:-1] 159 160 print(line, flush=True) 161 162 def display_progress(self, test_index, text): 163 if self.ns.quiet: 164 return 165 166 # "[ 51/405/1] test_tcl passed" 167 line = f"{test_index:{self.test_count_width}}{self.test_count}" 168 fails = len(self.bad) + len(self.environment_changed) 169 if fails and not self.ns.pgo: 170 line = f"{line}/{fails}" 171 self.log(f"[{line}] {text}") 172 173 def parse_args(self, kwargs): 174 ns = _parse_args(sys.argv[1:], **kwargs) 175 176 if ns.xmlpath: 177 support.junit_xml_list = self.testsuite_xml = [] 178 179 worker_args = ns.worker_args 180 if worker_args is not None: 181 from test.libregrtest.runtest_mp import parse_worker_args 182 ns, test_name = parse_worker_args(ns.worker_args) 183 ns.worker_args = worker_args 184 self.worker_test_name = test_name 185 186 # Strip .py extensions. 187 removepy(ns.args) 188 189 if ns.huntrleaks: 190 warmup, repetitions, _ = ns.huntrleaks 191 if warmup < 1 or repetitions < 1: 192 msg = ("Invalid values for the --huntrleaks/-R parameters. The " 193 "number of warmups and repetitions must be at least 1 " 194 "each (1:1).") 195 print(msg, file=sys.stderr, flush=True) 196 sys.exit(2) 197 198 if ns.tempdir: 199 ns.tempdir = os.path.expanduser(ns.tempdir) 200 201 self.ns = ns 202 203 def find_tests(self, tests): 204 self.tests = tests 205 206 if self.ns.single: 207 self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') 208 try: 209 with open(self.next_single_filename, 'r') as fp: 210 next_test = fp.read().strip() 211 self.tests = [next_test] 212 except OSError: 213 pass 214 215 if self.ns.fromfile: 216 self.tests = [] 217 # regex to match 'test_builtin' in line: 218 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 219 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') 220 with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp: 221 for line in fp: 222 line = line.split('#', 1)[0] 223 line = line.strip() 224 match = regex.search(line) 225 if match is not None: 226 self.tests.append(match.group()) 227 228 removepy(self.tests) 229 230 if self.ns.pgo: 231 # add default PGO tests if no tests are specified 232 setup_pgo_tests(self.ns) 233 234 stdtests = STDTESTS[:] 235 nottests = NOTTESTS.copy() 236 if self.ns.exclude: 237 for arg in self.ns.args: 238 if arg in stdtests: 239 stdtests.remove(arg) 240 nottests.add(arg) 241 self.ns.args = [] 242 243 # if testdir is set, then we are not running the python tests suite, so 244 # don't add default tests to be executed or skipped (pass empty values) 245 if self.ns.testdir: 246 alltests = findtests(self.ns.testdir, list(), set()) 247 else: 248 alltests = findtests(self.ns.testdir, stdtests, nottests) 249 250 if not self.ns.fromfile: 251 self.selected = self.tests or self.ns.args or alltests 252 else: 253 self.selected = self.tests 254 if self.ns.single: 255 self.selected = self.selected[:1] 256 try: 257 pos = alltests.index(self.selected[0]) 258 self.next_single_test = alltests[pos + 1] 259 except IndexError: 260 pass 261 262 # Remove all the selected tests that precede start if it's set. 263 if self.ns.start: 264 try: 265 del self.selected[:self.selected.index(self.ns.start)] 266 except ValueError: 267 print("Couldn't find starting test (%s), using all tests" 268 % self.ns.start, file=sys.stderr) 269 270 if self.ns.randomize: 271 if self.ns.random_seed is None: 272 self.ns.random_seed = random.randrange(10000000) 273 random.seed(self.ns.random_seed) 274 random.shuffle(self.selected) 275 276 def list_tests(self): 277 for name in self.selected: 278 print(name) 279 280 def _list_cases(self, suite): 281 for test in suite: 282 if isinstance(test, unittest.loader._FailedTest): 283 continue 284 if isinstance(test, unittest.TestSuite): 285 self._list_cases(test) 286 elif isinstance(test, unittest.TestCase): 287 if support.match_test(test): 288 print(test.id()) 289 290 def list_cases(self): 291 support.verbose = False 292 support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) 293 294 for test_name in self.selected: 295 abstest = get_abs_module(self.ns, test_name) 296 try: 297 suite = unittest.defaultTestLoader.loadTestsFromName(abstest) 298 self._list_cases(suite) 299 except unittest.SkipTest: 300 self.skipped.append(test_name) 301 302 if self.skipped: 303 print(file=sys.stderr) 304 print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) 305 printlist(self.skipped, file=sys.stderr) 306 307 def rerun_failed_tests(self): 308 self.ns.verbose = True 309 self.ns.failfast = False 310 self.ns.verbose3 = False 311 312 self.first_result = self.get_tests_result() 313 314 self.log() 315 self.log("Re-running failed tests in verbose mode") 316 rerun_list = list(self.need_rerun) 317 self.need_rerun.clear() 318 for result in rerun_list: 319 test_name = result.name 320 self.rerun.append(test_name) 321 322 errors = result.errors or [] 323 failures = result.failures or [] 324 error_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in errors] 325 failure_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in failures] 326 self.ns.verbose = True 327 orig_match_tests = self.ns.match_tests 328 if errors or failures: 329 if self.ns.match_tests is None: 330 self.ns.match_tests = [] 331 self.ns.match_tests.extend(error_names) 332 self.ns.match_tests.extend(failure_names) 333 matching = "matching: " + ", ".join(self.ns.match_tests) 334 self.log(f"Re-running {test_name} in verbose mode ({matching})") 335 else: 336 self.log(f"Re-running {test_name} in verbose mode") 337 result = runtest(self.ns, test_name) 338 self.ns.match_tests = orig_match_tests 339 340 self.accumulate_result(result, rerun=True) 341 342 if isinstance(result, Interrupted): 343 break 344 345 if self.bad: 346 print(count(len(self.bad), 'test'), "failed again:") 347 printlist(self.bad) 348 349 self.display_result() 350 351 def display_result(self): 352 # If running the test suite for PGO then no one cares about results. 353 if self.ns.pgo: 354 return 355 356 print() 357 print("== Tests result: %s ==" % self.get_tests_result()) 358 359 if self.interrupted: 360 print("Test suite interrupted by signal SIGINT.") 361 362 omitted = set(self.selected) - self.get_executed() 363 if omitted: 364 print() 365 print(count(len(omitted), "test"), "omitted:") 366 printlist(omitted) 367 368 if self.good and not self.ns.quiet: 369 print() 370 if (not self.bad 371 and not self.skipped 372 and not self.interrupted 373 and len(self.good) > 1): 374 print("All", end=' ') 375 print(count(len(self.good), "test"), "OK.") 376 377 if self.ns.print_slow: 378 self.test_times.sort(reverse=True) 379 print() 380 print("10 slowest tests:") 381 for test_time, test in self.test_times[:10]: 382 print("- %s: %s" % (test, format_duration(test_time))) 383 384 if self.bad: 385 print() 386 print(count(len(self.bad), "test"), "failed:") 387 printlist(self.bad) 388 389 if self.environment_changed: 390 print() 391 print("{} altered the execution environment:".format( 392 count(len(self.environment_changed), "test"))) 393 printlist(self.environment_changed) 394 395 if self.skipped and not self.ns.quiet: 396 print() 397 print(count(len(self.skipped), "test"), "skipped:") 398 printlist(self.skipped) 399 400 if self.rerun: 401 print() 402 print("%s:" % count(len(self.rerun), "re-run test")) 403 printlist(self.rerun) 404 405 if self.run_no_tests: 406 print() 407 print(count(len(self.run_no_tests), "test"), "run no tests:") 408 printlist(self.run_no_tests) 409 410 def run_tests_sequential(self): 411 if self.ns.trace: 412 import trace 413 self.tracer = trace.Trace(trace=False, count=True) 414 415 save_modules = sys.modules.keys() 416 417 msg = "Run tests sequentially" 418 if self.ns.timeout: 419 msg += " (timeout: %s)" % format_duration(self.ns.timeout) 420 self.log(msg) 421 422 previous_test = None 423 for test_index, test_name in enumerate(self.tests, 1): 424 start_time = time.monotonic() 425 426 text = test_name 427 if previous_test: 428 text = '%s -- %s' % (text, previous_test) 429 self.display_progress(test_index, text) 430 431 if self.tracer: 432 # If we're tracing code coverage, then we don't exit with status 433 # if on a false return value from main. 434 cmd = ('result = runtest(self.ns, test_name); ' 435 'self.accumulate_result(result)') 436 ns = dict(locals()) 437 self.tracer.runctx(cmd, globals=globals(), locals=ns) 438 result = ns['result'] 439 else: 440 result = runtest(self.ns, test_name) 441 self.accumulate_result(result) 442 443 if isinstance(result, Interrupted): 444 break 445 446 previous_test = str(result) 447 test_time = time.monotonic() - start_time 448 if test_time >= PROGRESS_MIN_TIME: 449 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 450 elif isinstance(result, Passed): 451 # be quiet: say nothing if the test passed shortly 452 previous_test = None 453 454 # Unload the newly imported modules (best effort finalization) 455 for module in sys.modules.keys(): 456 if module not in save_modules and module.startswith("test."): 457 support.unload(module) 458 459 if self.ns.failfast and is_failed(result, self.ns): 460 break 461 462 if previous_test: 463 print(previous_test) 464 465 def _test_forever(self, tests): 466 while True: 467 for test_name in tests: 468 yield test_name 469 if self.bad: 470 return 471 if self.ns.fail_env_changed and self.environment_changed: 472 return 473 474 def display_header(self): 475 # Print basic platform information 476 print("==", platform.python_implementation(), *sys.version.split()) 477 print("==", platform.platform(aliased=True), 478 "%s-endian" % sys.byteorder) 479 print("== cwd:", os.getcwd()) 480 cpu_count = os.cpu_count() 481 if cpu_count: 482 print("== CPU count:", cpu_count) 483 print("== encodings: locale=%s, FS=%s" 484 % (locale.getpreferredencoding(False), 485 sys.getfilesystemencoding())) 486 487 def get_tests_result(self): 488 result = [] 489 if self.bad: 490 result.append("FAILURE") 491 elif self.ns.fail_env_changed and self.environment_changed: 492 result.append("ENV CHANGED") 493 elif not any((self.good, self.bad, self.skipped, self.interrupted, 494 self.environment_changed)): 495 result.append("NO TEST RUN") 496 497 if self.interrupted: 498 result.append("INTERRUPTED") 499 500 if not result: 501 result.append("SUCCESS") 502 503 result = ', '.join(result) 504 if self.first_result: 505 result = '%s then %s' % (self.first_result, result) 506 return result 507 508 def run_tests(self): 509 # For a partial run, we do not need to clutter the output. 510 if (self.ns.header 511 or not(self.ns.pgo or self.ns.quiet or self.ns.single 512 or self.tests or self.ns.args)): 513 self.display_header() 514 515 if self.ns.huntrleaks: 516 warmup, repetitions, _ = self.ns.huntrleaks 517 if warmup < 3: 518 msg = ("WARNING: Running tests with --huntrleaks/-R and less than " 519 "3 warmup repetitions can give false positives!") 520 print(msg, file=sys.stdout, flush=True) 521 522 if self.ns.randomize: 523 print("Using random seed", self.ns.random_seed) 524 525 if self.ns.forever: 526 self.tests = self._test_forever(list(self.selected)) 527 self.test_count = '' 528 self.test_count_width = 3 529 else: 530 self.tests = iter(self.selected) 531 self.test_count = '/{}'.format(len(self.selected)) 532 self.test_count_width = len(self.test_count) - 1 533 534 if self.ns.use_mp: 535 from test.libregrtest.runtest_mp import run_tests_multiprocess 536 run_tests_multiprocess(self) 537 else: 538 self.run_tests_sequential() 539 540 def finalize(self): 541 if self.next_single_filename: 542 if self.next_single_test: 543 with open(self.next_single_filename, 'w') as fp: 544 fp.write(self.next_single_test + '\n') 545 else: 546 os.unlink(self.next_single_filename) 547 548 if self.tracer: 549 r = self.tracer.results() 550 r.write_results(show_missing=True, summary=True, 551 coverdir=self.ns.coverdir) 552 553 print() 554 duration = time.monotonic() - self.start_time 555 print("Total duration: %s" % format_duration(duration)) 556 print("Tests result: %s" % self.get_tests_result()) 557 558 if self.ns.runleaks: 559 os.system("leaks %d" % os.getpid()) 560 561 def save_xml_result(self): 562 if not self.ns.xmlpath and not self.testsuite_xml: 563 return 564 565 import xml.etree.ElementTree as ET 566 root = ET.Element("testsuites") 567 568 # Manually count the totals for the overall summary 569 totals = {'tests': 0, 'errors': 0, 'failures': 0} 570 for suite in self.testsuite_xml: 571 root.append(suite) 572 for k in totals: 573 try: 574 totals[k] += int(suite.get(k, 0)) 575 except ValueError: 576 pass 577 578 for k, v in totals.items(): 579 root.set(k, str(v)) 580 581 xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath) 582 with open(xmlpath, 'wb') as f: 583 for s in ET.tostringlist(root): 584 f.write(s) 585 586 def set_temp_dir(self): 587 if self.ns.tempdir: 588 self.tmp_dir = self.ns.tempdir 589 590 if not self.tmp_dir: 591 # When tests are run from the Python build directory, it is best practice 592 # to keep the test files in a subfolder. This eases the cleanup of leftover 593 # files using the "make distclean" command. 594 if sysconfig.is_python_build(): 595 self.tmp_dir = sysconfig.get_config_var('abs_builddir') 596 if self.tmp_dir is None: 597 # bpo-30284: On Windows, only srcdir is available. Using 598 # abs_builddir mostly matters on UNIX when building Python 599 # out of the source tree, especially when the source tree 600 # is read only. 601 self.tmp_dir = sysconfig.get_config_var('srcdir') 602 self.tmp_dir = os.path.join(self.tmp_dir, 'build') 603 else: 604 self.tmp_dir = tempfile.gettempdir() 605 606 self.tmp_dir = os.path.abspath(self.tmp_dir) 607 608 def create_temp_dir(self): 609 os.makedirs(self.tmp_dir, exist_ok=True) 610 611 # Define a writable temp dir that will be used as cwd while running 612 # the tests. The name of the dir includes the pid to allow parallel 613 # testing (see the -j option). 614 pid = os.getpid() 615 if self.worker_test_name is not None: 616 test_cwd = 'test_python_worker_{}'.format(pid) 617 else: 618 test_cwd = 'test_python_{}'.format(pid) 619 test_cwd += os_helper.FS_NONASCII 620 test_cwd = os.path.join(self.tmp_dir, test_cwd) 621 return test_cwd 622 623 def cleanup(self): 624 import glob 625 626 path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') 627 print("Cleanup %s directory" % self.tmp_dir) 628 for name in glob.glob(path): 629 if os.path.isdir(name): 630 print("Remove directory: %s" % name) 631 os_helper.rmtree(name) 632 else: 633 print("Remove file: %s" % name) 634 os_helper.unlink(name) 635 636 def main(self, tests=None, **kwargs): 637 self.parse_args(kwargs) 638 639 self.set_temp_dir() 640 641 if self.ns.cleanup: 642 self.cleanup() 643 sys.exit(0) 644 645 test_cwd = self.create_temp_dir() 646 647 try: 648 # Run the tests in a context manager that temporarily changes the CWD 649 # to a temporary and writable directory. If it's not possible to 650 # create or change the CWD, the original CWD will be used. 651 # The original CWD is available from os_helper.SAVEDCWD. 652 with os_helper.temp_cwd(test_cwd, quiet=True): 653 # When using multiprocessing, worker processes will use test_cwd 654 # as their parent temporary directory. So when the main process 655 # exit, it removes also subdirectories of worker processes. 656 self.ns.tempdir = test_cwd 657 658 self._main(tests, kwargs) 659 except SystemExit as exc: 660 # bpo-38203: Python can hang at exit in Py_Finalize(), especially 661 # on threading._shutdown() call: put a timeout 662 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) 663 664 sys.exit(exc.code) 665 666 def getloadavg(self): 667 if self.win_load_tracker is not None: 668 return self.win_load_tracker.getloadavg() 669 670 if hasattr(os, 'getloadavg'): 671 return os.getloadavg()[0] 672 673 return None 674 675 def _main(self, tests, kwargs): 676 if self.worker_test_name is not None: 677 from test.libregrtest.runtest_mp import run_tests_worker 678 run_tests_worker(self.ns, self.worker_test_name) 679 680 if self.ns.wait: 681 input("Press any key to continue...") 682 683 support.PGO = self.ns.pgo 684 support.PGO_EXTENDED = self.ns.pgo_extended 685 686 setup_tests(self.ns) 687 688 self.find_tests(tests) 689 690 if self.ns.list_tests: 691 self.list_tests() 692 sys.exit(0) 693 694 if self.ns.list_cases: 695 self.list_cases() 696 sys.exit(0) 697 698 # If we're on windows and this is the parent runner (not a worker), 699 # track the load average. 700 if sys.platform == 'win32' and self.worker_test_name is None: 701 from test.libregrtest.win_utils import WindowsLoadTracker 702 703 try: 704 self.win_load_tracker = WindowsLoadTracker() 705 except FileNotFoundError as error: 706 # Windows IoT Core and Windows Nano Server do not provide 707 # typeperf.exe for x64, x86 or ARM 708 print(f'Failed to create WindowsLoadTracker: {error}') 709 710 try: 711 self.run_tests() 712 self.display_result() 713 714 if self.ns.verbose2 and self.bad: 715 self.rerun_failed_tests() 716 finally: 717 if self.win_load_tracker is not None: 718 self.win_load_tracker.close() 719 self.win_load_tracker = None 720 721 self.finalize() 722 723 self.save_xml_result() 724 725 if self.bad: 726 sys.exit(2) 727 if self.interrupted: 728 sys.exit(130) 729 if self.ns.fail_env_changed and self.environment_changed: 730 sys.exit(3) 731 sys.exit(0) 732 733 734def main(tests=None, **kwargs): 735 """Run the Python suite.""" 736 Regrtest().main(tests=tests, **kwargs) 737