1import datetime 2import faulthandler 3import locale 4import os 5import platform 6import random 7import re 8import sys 9import sysconfig 10import tempfile 11import time 12import unittest 13from test.libregrtest.cmdline import _parse_args 14from test.libregrtest.runtest import ( 15 findtests, runtest, get_abs_module, 16 STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, 17 INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT, 18 PROGRESS_MIN_TIME, format_test_result, is_failed) 19from test.libregrtest.setup import setup_tests 20from test.libregrtest.pgo import setup_pgo_tests 21from test.libregrtest.utils import removepy, count, format_duration, printlist 22from test import support 23 24 25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). 26# Used to protect against threading._shutdown() hang. 27# Must be smaller than buildbot "1200 seconds without output" limit. 28EXIT_TIMEOUT = 120.0 29 30 31class Regrtest: 32 """Execute a test suite. 33 34 This also parses command-line options and modifies its behavior 35 accordingly. 36 37 tests -- a list of strings containing test names (optional) 38 testdir -- the directory in which to look for tests (optional) 39 40 Users other than the Python test suite will certainly want to 41 specify testdir; if it's omitted, the directory containing the 42 Python test suite is searched for. 43 44 If the tests argument is omitted, the tests listed on the 45 command-line will be used. If that's empty, too, then all *.py 46 files beginning with test_ will be used. 47 48 The other default arguments (verbose, quiet, exclude, 49 single, randomize, findleaks, use_resources, trace, coverdir, 50 print_slow, and random_seed) allow programmers calling main() 51 directly to set the values that would normally be set by flags 52 on the command line. 53 """ 54 def __init__(self): 55 # Namespace of command line options 56 self.ns = None 57 58 # tests 59 self.tests = [] 60 self.selected = [] 61 62 # test results 63 self.good = [] 64 self.bad = [] 65 self.skipped = [] 66 self.resource_denieds = [] 67 self.environment_changed = [] 68 self.run_no_tests = [] 69 self.rerun = [] 70 self.first_result = None 71 self.interrupted = False 72 73 # used by --slow 74 self.test_times = [] 75 76 # used by --coverage, trace.Trace instance 77 self.tracer = None 78 79 # used to display the progress bar "[ 3/100]" 80 self.start_time = time.monotonic() 81 self.test_count = '' 82 self.test_count_width = 1 83 84 # used by --single 85 self.next_single_test = None 86 self.next_single_filename = None 87 88 # used by --junit-xml 89 self.testsuite_xml = None 90 91 # misc 92 self.win_load_tracker = None 93 self.tmp_dir = None 94 self.worker_test_name = None 95 96 def get_executed(self): 97 return (set(self.good) | set(self.bad) | set(self.skipped) 98 | set(self.resource_denieds) | set(self.environment_changed) 99 | set(self.run_no_tests)) 100 101 def accumulate_result(self, result, rerun=False): 102 test_name = result.test_name 103 ok = result.result 104 105 if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun: 106 self.test_times.append((result.test_time, test_name)) 107 108 if ok == PASSED: 109 self.good.append(test_name) 110 elif ok in (FAILED, CHILD_ERROR): 111 if not rerun: 112 self.bad.append(test_name) 113 elif ok == ENV_CHANGED: 114 self.environment_changed.append(test_name) 115 elif ok == SKIPPED: 116 self.skipped.append(test_name) 117 elif ok == RESOURCE_DENIED: 118 self.skipped.append(test_name) 119 self.resource_denieds.append(test_name) 120 elif ok == TEST_DID_NOT_RUN: 121 self.run_no_tests.append(test_name) 122 elif ok == INTERRUPTED: 123 self.interrupted = True 124 elif ok == TIMEOUT: 125 self.bad.append(test_name) 126 else: 127 raise ValueError("invalid test result: %r" % ok) 128 129 if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}: 130 self.bad.remove(test_name) 131 132 xml_data = result.xml_data 133 if xml_data: 134 import xml.etree.ElementTree as ET 135 for e in xml_data: 136 try: 137 self.testsuite_xml.append(ET.fromstring(e)) 138 except ET.ParseError: 139 print(xml_data, file=sys.__stderr__) 140 raise 141 142 def log(self, line=''): 143 empty = not line 144 145 # add the system load prefix: "load avg: 1.80 " 146 load_avg = self.getloadavg() 147 if load_avg is not None: 148 line = f"load avg: {load_avg:.2f} {line}" 149 150 # add the timestamp prefix: "0:01:05 " 151 test_time = time.monotonic() - self.start_time 152 test_time = datetime.timedelta(seconds=int(test_time)) 153 line = f"{test_time} {line}" 154 155 if empty: 156 line = line[:-1] 157 158 print(line, flush=True) 159 160 def display_progress(self, test_index, text): 161 if self.ns.quiet: 162 return 163 164 # "[ 51/405/1] test_tcl passed" 165 line = f"{test_index:{self.test_count_width}}{self.test_count}" 166 fails = len(self.bad) + len(self.environment_changed) 167 if fails and not self.ns.pgo: 168 line = f"{line}/{fails}" 169 self.log(f"[{line}] {text}") 170 171 def parse_args(self, kwargs): 172 ns = _parse_args(sys.argv[1:], **kwargs) 173 174 if ns.xmlpath: 175 support.junit_xml_list = self.testsuite_xml = [] 176 177 worker_args = ns.worker_args 178 if worker_args is not None: 179 from test.libregrtest.runtest_mp import parse_worker_args 180 ns, test_name = parse_worker_args(ns.worker_args) 181 ns.worker_args = worker_args 182 self.worker_test_name = test_name 183 184 # Strip .py extensions. 185 removepy(ns.args) 186 187 if ns.huntrleaks: 188 warmup, repetitions, _ = ns.huntrleaks 189 if warmup < 1 or repetitions < 1: 190 msg = ("Invalid values for the --huntrleaks/-R parameters. The " 191 "number of warmups and repetitions must be at least 1 " 192 "each (1:1).") 193 print(msg, file=sys.stderr, flush=True) 194 sys.exit(2) 195 196 if ns.tempdir: 197 ns.tempdir = os.path.expanduser(ns.tempdir) 198 199 self.ns = ns 200 201 def find_tests(self, tests): 202 self.tests = tests 203 204 if self.ns.single: 205 self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') 206 try: 207 with open(self.next_single_filename, 'r') as fp: 208 next_test = fp.read().strip() 209 self.tests = [next_test] 210 except OSError: 211 pass 212 213 if self.ns.fromfile: 214 self.tests = [] 215 # regex to match 'test_builtin' in line: 216 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 217 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') 218 with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: 219 for line in fp: 220 line = line.split('#', 1)[0] 221 line = line.strip() 222 match = regex.search(line) 223 if match is not None: 224 self.tests.append(match.group()) 225 226 removepy(self.tests) 227 228 if self.ns.pgo: 229 # add default PGO tests if no tests are specified 230 setup_pgo_tests(self.ns) 231 232 stdtests = STDTESTS[:] 233 nottests = NOTTESTS.copy() 234 if self.ns.exclude: 235 for arg in self.ns.args: 236 if arg in stdtests: 237 stdtests.remove(arg) 238 nottests.add(arg) 239 self.ns.args = [] 240 241 # if testdir is set, then we are not running the python tests suite, so 242 # don't add default tests to be executed or skipped (pass empty values) 243 if self.ns.testdir: 244 alltests = findtests(self.ns.testdir, list(), set()) 245 else: 246 alltests = findtests(self.ns.testdir, stdtests, nottests) 247 248 if not self.ns.fromfile: 249 self.selected = self.tests or self.ns.args or alltests 250 else: 251 self.selected = self.tests 252 if self.ns.single: 253 self.selected = self.selected[:1] 254 try: 255 pos = alltests.index(self.selected[0]) 256 self.next_single_test = alltests[pos + 1] 257 except IndexError: 258 pass 259 260 # Remove all the selected tests that precede start if it's set. 261 if self.ns.start: 262 try: 263 del self.selected[:self.selected.index(self.ns.start)] 264 except ValueError: 265 print("Couldn't find starting test (%s), using all tests" 266 % self.ns.start, file=sys.stderr) 267 268 if self.ns.randomize: 269 if self.ns.random_seed is None: 270 self.ns.random_seed = random.randrange(10000000) 271 random.seed(self.ns.random_seed) 272 random.shuffle(self.selected) 273 274 def list_tests(self): 275 for name in self.selected: 276 print(name) 277 278 def _list_cases(self, suite): 279 for test in suite: 280 if isinstance(test, unittest.loader._FailedTest): 281 continue 282 if isinstance(test, unittest.TestSuite): 283 self._list_cases(test) 284 elif isinstance(test, unittest.TestCase): 285 if support.match_test(test): 286 print(test.id()) 287 288 def list_cases(self): 289 support.verbose = False 290 support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) 291 292 for test_name in self.selected: 293 abstest = get_abs_module(self.ns, test_name) 294 try: 295 suite = unittest.defaultTestLoader.loadTestsFromName(abstest) 296 self._list_cases(suite) 297 except unittest.SkipTest: 298 self.skipped.append(test_name) 299 300 if self.skipped: 301 print(file=sys.stderr) 302 print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) 303 printlist(self.skipped, file=sys.stderr) 304 305 def rerun_failed_tests(self): 306 self.ns.verbose = True 307 self.ns.failfast = False 308 self.ns.verbose3 = False 309 310 self.first_result = self.get_tests_result() 311 312 self.log() 313 self.log("Re-running failed tests in verbose mode") 314 self.rerun = self.bad[:] 315 for test_name in self.rerun: 316 self.log(f"Re-running {test_name} in verbose mode") 317 self.ns.verbose = True 318 result = runtest(self.ns, test_name) 319 320 self.accumulate_result(result, rerun=True) 321 322 if result.result == INTERRUPTED: 323 break 324 325 if self.bad: 326 print(count(len(self.bad), 'test'), "failed again:") 327 printlist(self.bad) 328 329 self.display_result() 330 331 def display_result(self): 332 # If running the test suite for PGO then no one cares about results. 333 if self.ns.pgo: 334 return 335 336 print() 337 print("== Tests result: %s ==" % self.get_tests_result()) 338 339 if self.interrupted: 340 print("Test suite interrupted by signal SIGINT.") 341 342 omitted = set(self.selected) - self.get_executed() 343 if omitted: 344 print() 345 print(count(len(omitted), "test"), "omitted:") 346 printlist(omitted) 347 348 if self.good and not self.ns.quiet: 349 print() 350 if (not self.bad 351 and not self.skipped 352 and not self.interrupted 353 and len(self.good) > 1): 354 print("All", end=' ') 355 print(count(len(self.good), "test"), "OK.") 356 357 if self.ns.print_slow: 358 self.test_times.sort(reverse=True) 359 print() 360 print("10 slowest tests:") 361 for test_time, test in self.test_times[:10]: 362 print("- %s: %s" % (test, format_duration(test_time))) 363 364 if self.bad: 365 print() 366 print(count(len(self.bad), "test"), "failed:") 367 printlist(self.bad) 368 369 if self.environment_changed: 370 print() 371 print("{} altered the execution environment:".format( 372 count(len(self.environment_changed), "test"))) 373 printlist(self.environment_changed) 374 375 if self.skipped and not self.ns.quiet: 376 print() 377 print(count(len(self.skipped), "test"), "skipped:") 378 printlist(self.skipped) 379 380 if self.rerun: 381 print() 382 print("%s:" % count(len(self.rerun), "re-run test")) 383 printlist(self.rerun) 384 385 if self.run_no_tests: 386 print() 387 print(count(len(self.run_no_tests), "test"), "run no tests:") 388 printlist(self.run_no_tests) 389 390 def run_tests_sequential(self): 391 if self.ns.trace: 392 import trace 393 self.tracer = trace.Trace(trace=False, count=True) 394 395 save_modules = sys.modules.keys() 396 397 msg = "Run tests sequentially" 398 if self.ns.timeout: 399 msg += " (timeout: %s)" % format_duration(self.ns.timeout) 400 self.log(msg) 401 402 previous_test = None 403 for test_index, test_name in enumerate(self.tests, 1): 404 start_time = time.monotonic() 405 406 text = test_name 407 if previous_test: 408 text = '%s -- %s' % (text, previous_test) 409 self.display_progress(test_index, text) 410 411 if self.tracer: 412 # If we're tracing code coverage, then we don't exit with status 413 # if on a false return value from main. 414 cmd = ('result = runtest(self.ns, test_name); ' 415 'self.accumulate_result(result)') 416 ns = dict(locals()) 417 self.tracer.runctx(cmd, globals=globals(), locals=ns) 418 result = ns['result'] 419 else: 420 result = runtest(self.ns, test_name) 421 self.accumulate_result(result) 422 423 if result.result == INTERRUPTED: 424 break 425 426 previous_test = format_test_result(result) 427 test_time = time.monotonic() - start_time 428 if test_time >= PROGRESS_MIN_TIME: 429 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 430 elif result.result == PASSED: 431 # be quiet: say nothing if the test passed shortly 432 previous_test = None 433 434 # Unload the newly imported modules (best effort finalization) 435 for module in sys.modules.keys(): 436 if module not in save_modules and module.startswith("test."): 437 support.unload(module) 438 439 if self.ns.failfast and is_failed(result, self.ns): 440 break 441 442 if previous_test: 443 print(previous_test) 444 445 def _test_forever(self, tests): 446 while True: 447 for test_name in tests: 448 yield test_name 449 if self.bad: 450 return 451 if self.ns.fail_env_changed and self.environment_changed: 452 return 453 454 def display_header(self): 455 # Print basic platform information 456 print("==", platform.python_implementation(), *sys.version.split()) 457 print("==", platform.platform(aliased=True), 458 "%s-endian" % sys.byteorder) 459 print("== cwd:", os.getcwd()) 460 cpu_count = os.cpu_count() 461 if cpu_count: 462 print("== CPU count:", cpu_count) 463 print("== encodings: locale=%s, FS=%s" 464 % (locale.getpreferredencoding(False), 465 sys.getfilesystemencoding())) 466 467 def get_tests_result(self): 468 result = [] 469 if self.bad: 470 result.append("FAILURE") 471 elif self.ns.fail_env_changed and self.environment_changed: 472 result.append("ENV CHANGED") 473 elif not any((self.good, self.bad, self.skipped, self.interrupted, 474 self.environment_changed)): 475 result.append("NO TEST RUN") 476 477 if self.interrupted: 478 result.append("INTERRUPTED") 479 480 if not result: 481 result.append("SUCCESS") 482 483 result = ', '.join(result) 484 if self.first_result: 485 result = '%s then %s' % (self.first_result, result) 486 return result 487 488 def run_tests(self): 489 # For a partial run, we do not need to clutter the output. 490 if (self.ns.header 491 or not(self.ns.pgo or self.ns.quiet or self.ns.single 492 or self.tests or self.ns.args)): 493 self.display_header() 494 495 if self.ns.huntrleaks: 496 warmup, repetitions, _ = self.ns.huntrleaks 497 if warmup < 3: 498 msg = ("WARNING: Running tests with --huntrleaks/-R and less than " 499 "3 warmup repetitions can give false positives!") 500 print(msg, file=sys.stdout, flush=True) 501 502 if self.ns.randomize: 503 print("Using random seed", self.ns.random_seed) 504 505 if self.ns.forever: 506 self.tests = self._test_forever(list(self.selected)) 507 self.test_count = '' 508 self.test_count_width = 3 509 else: 510 self.tests = iter(self.selected) 511 self.test_count = '/{}'.format(len(self.selected)) 512 self.test_count_width = len(self.test_count) - 1 513 514 if self.ns.use_mp: 515 from test.libregrtest.runtest_mp import run_tests_multiprocess 516 run_tests_multiprocess(self) 517 else: 518 self.run_tests_sequential() 519 520 def finalize(self): 521 if self.next_single_filename: 522 if self.next_single_test: 523 with open(self.next_single_filename, 'w') as fp: 524 fp.write(self.next_single_test + '\n') 525 else: 526 os.unlink(self.next_single_filename) 527 528 if self.tracer: 529 r = self.tracer.results() 530 r.write_results(show_missing=True, summary=True, 531 coverdir=self.ns.coverdir) 532 533 print() 534 duration = time.monotonic() - self.start_time 535 print("Total duration: %s" % format_duration(duration)) 536 print("Tests result: %s" % self.get_tests_result()) 537 538 if self.ns.runleaks: 539 os.system("leaks %d" % os.getpid()) 540 541 def save_xml_result(self): 542 if not self.ns.xmlpath and not self.testsuite_xml: 543 return 544 545 import xml.etree.ElementTree as ET 546 root = ET.Element("testsuites") 547 548 # Manually count the totals for the overall summary 549 totals = {'tests': 0, 'errors': 0, 'failures': 0} 550 for suite in self.testsuite_xml: 551 root.append(suite) 552 for k in totals: 553 try: 554 totals[k] += int(suite.get(k, 0)) 555 except ValueError: 556 pass 557 558 for k, v in totals.items(): 559 root.set(k, str(v)) 560 561 xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) 562 with open(xmlpath, 'wb') as f: 563 for s in ET.tostringlist(root): 564 f.write(s) 565 566 def set_temp_dir(self): 567 if self.ns.tempdir: 568 self.tmp_dir = self.ns.tempdir 569 570 if not self.tmp_dir: 571 # When tests are run from the Python build directory, it is best practice 572 # to keep the test files in a subfolder. This eases the cleanup of leftover 573 # files using the "make distclean" command. 574 if sysconfig.is_python_build(): 575 self.tmp_dir = sysconfig.get_config_var('abs_builddir') 576 if self.tmp_dir is None: 577 # bpo-30284: On Windows, only srcdir is available. Using 578 # abs_builddir mostly matters on UNIX when building Python 579 # out of the source tree, especially when the source tree 580 # is read only. 581 self.tmp_dir = sysconfig.get_config_var('srcdir') 582 self.tmp_dir = os.path.join(self.tmp_dir, 'build') 583 else: 584 self.tmp_dir = tempfile.gettempdir() 585 586 self.tmp_dir = os.path.abspath(self.tmp_dir) 587 588 def create_temp_dir(self): 589 os.makedirs(self.tmp_dir, exist_ok=True) 590 591 # Define a writable temp dir that will be used as cwd while running 592 # the tests. The name of the dir includes the pid to allow parallel 593 # testing (see the -j option). 594 pid = os.getpid() 595 if self.worker_test_name is not None: 596 test_cwd = 'test_python_worker_{}'.format(pid) 597 else: 598 test_cwd = 'test_python_{}'.format(pid) 599 test_cwd += support.FS_NONASCII 600 test_cwd = os.path.join(self.tmp_dir, test_cwd) 601 return test_cwd 602 603 def cleanup(self): 604 import glob 605 606 path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') 607 print("Cleanup %s directory" % self.tmp_dir) 608 for name in glob.glob(path): 609 if os.path.isdir(name): 610 print("Remove directory: %s" % name) 611 support.rmtree(name) 612 else: 613 print("Remove file: %s" % name) 614 support.unlink(name) 615 616 def main(self, tests=None, **kwargs): 617 self.parse_args(kwargs) 618 619 self.set_temp_dir() 620 621 if self.ns.cleanup: 622 self.cleanup() 623 sys.exit(0) 624 625 test_cwd = self.create_temp_dir() 626 627 try: 628 # Run the tests in a context manager that temporarily changes the CWD 629 # to a temporary and writable directory. If it's not possible to 630 # create or change the CWD, the original CWD will be used. 631 # The original CWD is available from support.SAVEDCWD. 632 with support.temp_cwd(test_cwd, quiet=True): 633 # When using multiprocessing, worker processes will use test_cwd 634 # as their parent temporary directory. So when the main process 635 # exit, it removes also subdirectories of worker processes. 636 self.ns.tempdir = test_cwd 637 638 self._main(tests, kwargs) 639 except SystemExit as exc: 640 # bpo-38203: Python can hang at exit in Py_Finalize(), especially 641 # on threading._shutdown() call: put a timeout 642 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) 643 644 sys.exit(exc.code) 645 646 def getloadavg(self): 647 if self.win_load_tracker is not None: 648 return self.win_load_tracker.getloadavg() 649 650 if hasattr(os, 'getloadavg'): 651 return os.getloadavg()[0] 652 653 return None 654 655 def _main(self, tests, kwargs): 656 if self.worker_test_name is not None: 657 from test.libregrtest.runtest_mp import run_tests_worker 658 run_tests_worker(self.ns, self.worker_test_name) 659 660 if self.ns.wait: 661 input("Press any key to continue...") 662 663 support.PGO = self.ns.pgo 664 support.PGO_EXTENDED = self.ns.pgo_extended 665 666 setup_tests(self.ns) 667 668 self.find_tests(tests) 669 670 if self.ns.list_tests: 671 self.list_tests() 672 sys.exit(0) 673 674 if self.ns.list_cases: 675 self.list_cases() 676 sys.exit(0) 677 678 # If we're on windows and this is the parent runner (not a worker), 679 # track the load average. 680 if sys.platform == 'win32' and self.worker_test_name is None: 681 from test.libregrtest.win_utils import WindowsLoadTracker 682 683 try: 684 self.win_load_tracker = WindowsLoadTracker() 685 except FileNotFoundError as error: 686 # Windows IoT Core and Windows Nano Server do not provide 687 # typeperf.exe for x64, x86 or ARM 688 print(f'Failed to create WindowsLoadTracker: {error}') 689 690 try: 691 self.run_tests() 692 self.display_result() 693 694 if self.ns.verbose2 and self.bad: 695 self.rerun_failed_tests() 696 finally: 697 if self.win_load_tracker is not None: 698 self.win_load_tracker.close() 699 self.win_load_tracker = None 700 701 self.finalize() 702 703 self.save_xml_result() 704 705 if self.bad: 706 sys.exit(2) 707 if self.interrupted: 708 sys.exit(130) 709 if self.ns.fail_env_changed and self.environment_changed: 710 sys.exit(3) 711 sys.exit(0) 712 713 714def main(tests=None, **kwargs): 715 """Run the Python suite.""" 716 Regrtest().main(tests=tests, **kwargs) 717