1import datetime 2import faulthandler 3import locale 4import os 5import platform 6import random 7import re 8import sys 9import sysconfig 10import tempfile 11import time 12import unittest 13from test.libregrtest.cmdline import _parse_args 14from test.libregrtest.runtest import ( 15 findtests, runtest, get_abs_module, 16 STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, 17 INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT, 18 PROGRESS_MIN_TIME, format_test_result, is_failed) 19from test.libregrtest.setup import setup_tests 20from test.libregrtest.pgo import setup_pgo_tests 21from test.libregrtest.utils import removepy, count, format_duration, printlist 22from test import support 23 24 25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). 26# Used to protect against threading._shutdown() hang. 27# Must be smaller than buildbot "1200 seconds without output" limit. 28EXIT_TIMEOUT = 120.0 29 30 31class Regrtest: 32 """Execute a test suite. 33 34 This also parses command-line options and modifies its behavior 35 accordingly. 36 37 tests -- a list of strings containing test names (optional) 38 testdir -- the directory in which to look for tests (optional) 39 40 Users other than the Python test suite will certainly want to 41 specify testdir; if it's omitted, the directory containing the 42 Python test suite is searched for. 43 44 If the tests argument is omitted, the tests listed on the 45 command-line will be used. If that's empty, too, then all *.py 46 files beginning with test_ will be used. 47 48 The other default arguments (verbose, quiet, exclude, 49 single, randomize, findleaks, use_resources, trace, coverdir, 50 print_slow, and random_seed) allow programmers calling main() 51 directly to set the values that would normally be set by flags 52 on the command line. 53 """ 54 def __init__(self): 55 # Namespace of command line options 56 self.ns = None 57 58 # tests 59 self.tests = [] 60 self.selected = [] 61 62 # test results 63 self.good = [] 64 self.bad = [] 65 self.skipped = [] 66 self.resource_denieds = [] 67 self.environment_changed = [] 68 self.run_no_tests = [] 69 self.rerun = [] 70 self.first_result = None 71 self.interrupted = False 72 73 # used by --slow 74 self.test_times = [] 75 76 # used by --coverage, trace.Trace instance 77 self.tracer = None 78 79 # used to display the progress bar "[ 3/100]" 80 self.start_time = time.monotonic() 81 self.test_count = '' 82 self.test_count_width = 1 83 84 # used by --single 85 self.next_single_test = None 86 self.next_single_filename = None 87 88 # used by --junit-xml 89 self.testsuite_xml = None 90 91 # misc 92 self.win_load_tracker = None 93 self.tmp_dir = None 94 self.worker_test_name = None 95 96 def get_executed(self): 97 return (set(self.good) | set(self.bad) | set(self.skipped) 98 | set(self.resource_denieds) | set(self.environment_changed) 99 | set(self.run_no_tests)) 100 101 def accumulate_result(self, result, rerun=False): 102 test_name = result.test_name 103 ok = result.result 104 105 if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun: 106 self.test_times.append((result.test_time, test_name)) 107 108 if ok == PASSED: 109 self.good.append(test_name) 110 elif ok in (FAILED, CHILD_ERROR): 111 if not rerun: 112 self.bad.append(test_name) 113 elif ok == ENV_CHANGED: 114 self.environment_changed.append(test_name) 115 elif ok == SKIPPED: 116 self.skipped.append(test_name) 117 elif ok == RESOURCE_DENIED: 118 self.skipped.append(test_name) 119 self.resource_denieds.append(test_name) 120 elif ok == TEST_DID_NOT_RUN: 121 self.run_no_tests.append(test_name) 122 elif ok == INTERRUPTED: 123 self.interrupted = True 124 elif ok == TIMEOUT: 125 self.bad.append(test_name) 126 else: 127 raise ValueError("invalid test result: %r" % ok) 128 129 if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}: 130 self.bad.remove(test_name) 131 132 xml_data = result.xml_data 133 if xml_data: 134 import xml.etree.ElementTree as ET 135 for e in xml_data: 136 try: 137 self.testsuite_xml.append(ET.fromstring(e)) 138 except ET.ParseError: 139 print(xml_data, file=sys.__stderr__) 140 raise 141 142 def log(self, line=''): 143 empty = not line 144 145 # add the system load prefix: "load avg: 1.80 " 146 load_avg = self.getloadavg() 147 if load_avg is not None: 148 line = f"load avg: {load_avg:.2f} {line}" 149 150 # add the timestamp prefix: "0:01:05 " 151 test_time = time.monotonic() - self.start_time 152 test_time = datetime.timedelta(seconds=int(test_time)) 153 line = f"{test_time} {line}" 154 155 if empty: 156 line = line[:-1] 157 158 print(line, flush=True) 159 160 def display_progress(self, test_index, text): 161 if self.ns.quiet: 162 return 163 164 # "[ 51/405/1] test_tcl passed" 165 line = f"{test_index:{self.test_count_width}}{self.test_count}" 166 fails = len(self.bad) + len(self.environment_changed) 167 if fails and not self.ns.pgo: 168 line = f"{line}/{fails}" 169 self.log(f"[{line}] {text}") 170 171 def parse_args(self, kwargs): 172 ns = _parse_args(sys.argv[1:], **kwargs) 173 174 if ns.xmlpath: 175 support.junit_xml_list = self.testsuite_xml = [] 176 177 worker_args = ns.worker_args 178 if worker_args is not None: 179 from test.libregrtest.runtest_mp import parse_worker_args 180 ns, test_name = parse_worker_args(ns.worker_args) 181 ns.worker_args = worker_args 182 self.worker_test_name = test_name 183 184 # Strip .py extensions. 185 removepy(ns.args) 186 187 if ns.huntrleaks: 188 warmup, repetitions, _ = ns.huntrleaks 189 if warmup < 1 or repetitions < 1: 190 msg = ("Invalid values for the --huntrleaks/-R parameters. The " 191 "number of warmups and repetitions must be at least 1 " 192 "each (1:1).") 193 print(msg, file=sys.stderr, flush=True) 194 sys.exit(2) 195 196 if ns.tempdir: 197 ns.tempdir = os.path.expanduser(ns.tempdir) 198 199 self.ns = ns 200 201 def find_tests(self, tests): 202 self.tests = tests 203 204 if self.ns.single: 205 self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') 206 try: 207 with open(self.next_single_filename, 'r') as fp: 208 next_test = fp.read().strip() 209 self.tests = [next_test] 210 except OSError: 211 pass 212 213 if self.ns.fromfile: 214 self.tests = [] 215 # regex to match 'test_builtin' in line: 216 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 217 regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') 218 with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: 219 for line in fp: 220 line = line.split('#', 1)[0] 221 line = line.strip() 222 match = regex.search(line) 223 if match is not None: 224 self.tests.append(match.group()) 225 226 removepy(self.tests) 227 228 if self.ns.pgo: 229 # add default PGO tests if no tests are specified 230 setup_pgo_tests(self.ns) 231 232 stdtests = STDTESTS[:] 233 nottests = NOTTESTS.copy() 234 if self.ns.exclude: 235 for arg in self.ns.args: 236 if arg in stdtests: 237 stdtests.remove(arg) 238 nottests.add(arg) 239 self.ns.args = [] 240 241 # if testdir is set, then we are not running the python tests suite, so 242 # don't add default tests to be executed or skipped (pass empty values) 243 if self.ns.testdir: 244 alltests = findtests(self.ns.testdir, list(), set()) 245 else: 246 alltests = findtests(self.ns.testdir, stdtests, nottests) 247 248 if not self.ns.fromfile: 249 self.selected = self.tests or self.ns.args or alltests 250 else: 251 self.selected = self.tests 252 if self.ns.single: 253 self.selected = self.selected[:1] 254 try: 255 pos = alltests.index(self.selected[0]) 256 self.next_single_test = alltests[pos + 1] 257 except IndexError: 258 pass 259 260 # Remove all the selected tests that precede start if it's set. 261 if self.ns.start: 262 try: 263 del self.selected[:self.selected.index(self.ns.start)] 264 except ValueError: 265 print("Couldn't find starting test (%s), using all tests" 266 % self.ns.start, file=sys.stderr) 267 268 if self.ns.randomize: 269 if self.ns.random_seed is None: 270 self.ns.random_seed = random.randrange(10000000) 271 random.seed(self.ns.random_seed) 272 random.shuffle(self.selected) 273 274 def list_tests(self): 275 for name in self.selected: 276 print(name) 277 278 def _list_cases(self, suite): 279 for test in suite: 280 if isinstance(test, unittest.loader._FailedTest): 281 continue 282 if isinstance(test, unittest.TestSuite): 283 self._list_cases(test) 284 elif isinstance(test, unittest.TestCase): 285 if support.match_test(test): 286 print(test.id()) 287 288 def list_cases(self): 289 support.verbose = False 290 support.set_match_tests(self.ns.match_tests) 291 292 for test_name in self.selected: 293 abstest = get_abs_module(self.ns, test_name) 294 try: 295 suite = unittest.defaultTestLoader.loadTestsFromName(abstest) 296 self._list_cases(suite) 297 except unittest.SkipTest: 298 self.skipped.append(test_name) 299 300 if self.skipped: 301 print(file=sys.stderr) 302 print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) 303 printlist(self.skipped, file=sys.stderr) 304 305 def rerun_failed_tests(self): 306 self.ns.verbose = True 307 self.ns.failfast = False 308 self.ns.verbose3 = False 309 310 self.first_result = self.get_tests_result() 311 312 self.log() 313 self.log("Re-running failed tests in verbose mode") 314 self.rerun = self.bad[:] 315 for test_name in self.rerun: 316 self.log(f"Re-running {test_name} in verbose mode") 317 self.ns.verbose = True 318 result = runtest(self.ns, test_name) 319 320 self.accumulate_result(result, rerun=True) 321 322 if result.result == INTERRUPTED: 323 break 324 325 if self.bad: 326 print(count(len(self.bad), 'test'), "failed again:") 327 printlist(self.bad) 328 329 self.display_result() 330 331 def display_result(self): 332 # If running the test suite for PGO then no one cares about results. 333 if self.ns.pgo: 334 return 335 336 print() 337 print("== Tests result: %s ==" % self.get_tests_result()) 338 339 if self.interrupted: 340 print("Test suite interrupted by signal SIGINT.") 341 342 omitted = set(self.selected) - self.get_executed() 343 if omitted: 344 print() 345 print(count(len(omitted), "test"), "omitted:") 346 printlist(omitted) 347 348 if self.good and not self.ns.quiet: 349 print() 350 if (not self.bad 351 and not self.skipped 352 and not self.interrupted 353 and len(self.good) > 1): 354 print("All", end=' ') 355 print(count(len(self.good), "test"), "OK.") 356 357 if self.ns.print_slow: 358 self.test_times.sort(reverse=True) 359 print() 360 print("10 slowest tests:") 361 for test_time, test in self.test_times[:10]: 362 print("- %s: %s" % (test, format_duration(test_time))) 363 364 if self.bad: 365 print() 366 print(count(len(self.bad), "test"), "failed:") 367 printlist(self.bad) 368 369 if self.environment_changed: 370 print() 371 print("{} altered the execution environment:".format( 372 count(len(self.environment_changed), "test"))) 373 printlist(self.environment_changed) 374 375 if self.skipped and not self.ns.quiet: 376 print() 377 print(count(len(self.skipped), "test"), "skipped:") 378 printlist(self.skipped) 379 380 if self.rerun: 381 print() 382 print("%s:" % count(len(self.rerun), "re-run test")) 383 printlist(self.rerun) 384 385 if self.run_no_tests: 386 print() 387 print(count(len(self.run_no_tests), "test"), "run no tests:") 388 printlist(self.run_no_tests) 389 390 def run_tests_sequential(self): 391 if self.ns.trace: 392 import trace 393 self.tracer = trace.Trace(trace=False, count=True) 394 395 save_modules = sys.modules.keys() 396 397 self.log("Run tests sequentially") 398 399 previous_test = None 400 for test_index, test_name in enumerate(self.tests, 1): 401 start_time = time.monotonic() 402 403 text = test_name 404 if previous_test: 405 text = '%s -- %s' % (text, previous_test) 406 self.display_progress(test_index, text) 407 408 if self.tracer: 409 # If we're tracing code coverage, then we don't exit with status 410 # if on a false return value from main. 411 cmd = ('result = runtest(self.ns, test_name); ' 412 'self.accumulate_result(result)') 413 ns = dict(locals()) 414 self.tracer.runctx(cmd, globals=globals(), locals=ns) 415 result = ns['result'] 416 else: 417 result = runtest(self.ns, test_name) 418 self.accumulate_result(result) 419 420 if result.result == INTERRUPTED: 421 break 422 423 previous_test = format_test_result(result) 424 test_time = time.monotonic() - start_time 425 if test_time >= PROGRESS_MIN_TIME: 426 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 427 elif result.result == PASSED: 428 # be quiet: say nothing if the test passed shortly 429 previous_test = None 430 431 # Unload the newly imported modules (best effort finalization) 432 for module in sys.modules.keys(): 433 if module not in save_modules and module.startswith("test."): 434 support.unload(module) 435 436 if self.ns.failfast and is_failed(result, self.ns): 437 break 438 439 if previous_test: 440 print(previous_test) 441 442 def _test_forever(self, tests): 443 while True: 444 for test_name in tests: 445 yield test_name 446 if self.bad: 447 return 448 if self.ns.fail_env_changed and self.environment_changed: 449 return 450 451 def display_header(self): 452 # Print basic platform information 453 print("==", platform.python_implementation(), *sys.version.split()) 454 print("==", platform.platform(aliased=True), 455 "%s-endian" % sys.byteorder) 456 print("== cwd:", os.getcwd()) 457 cpu_count = os.cpu_count() 458 if cpu_count: 459 print("== CPU count:", cpu_count) 460 print("== encodings: locale=%s, FS=%s" 461 % (locale.getpreferredencoding(False), 462 sys.getfilesystemencoding())) 463 464 def get_tests_result(self): 465 result = [] 466 if self.bad: 467 result.append("FAILURE") 468 elif self.ns.fail_env_changed and self.environment_changed: 469 result.append("ENV CHANGED") 470 elif not any((self.good, self.bad, self.skipped, self.interrupted, 471 self.environment_changed)): 472 result.append("NO TEST RUN") 473 474 if self.interrupted: 475 result.append("INTERRUPTED") 476 477 if not result: 478 result.append("SUCCESS") 479 480 result = ', '.join(result) 481 if self.first_result: 482 result = '%s then %s' % (self.first_result, result) 483 return result 484 485 def run_tests(self): 486 # For a partial run, we do not need to clutter the output. 487 if (self.ns.header 488 or not(self.ns.pgo or self.ns.quiet or self.ns.single 489 or self.tests or self.ns.args)): 490 self.display_header() 491 492 if self.ns.huntrleaks: 493 warmup, repetitions, _ = self.ns.huntrleaks 494 if warmup < 3: 495 msg = ("WARNING: Running tests with --huntrleaks/-R and less than " 496 "3 warmup repetitions can give false positives!") 497 print(msg, file=sys.stdout, flush=True) 498 499 if self.ns.randomize: 500 print("Using random seed", self.ns.random_seed) 501 502 if self.ns.forever: 503 self.tests = self._test_forever(list(self.selected)) 504 self.test_count = '' 505 self.test_count_width = 3 506 else: 507 self.tests = iter(self.selected) 508 self.test_count = '/{}'.format(len(self.selected)) 509 self.test_count_width = len(self.test_count) - 1 510 511 if self.ns.use_mp: 512 from test.libregrtest.runtest_mp import run_tests_multiprocess 513 run_tests_multiprocess(self) 514 else: 515 self.run_tests_sequential() 516 517 def finalize(self): 518 if self.next_single_filename: 519 if self.next_single_test: 520 with open(self.next_single_filename, 'w') as fp: 521 fp.write(self.next_single_test + '\n') 522 else: 523 os.unlink(self.next_single_filename) 524 525 if self.tracer: 526 r = self.tracer.results() 527 r.write_results(show_missing=True, summary=True, 528 coverdir=self.ns.coverdir) 529 530 print() 531 duration = time.monotonic() - self.start_time 532 print("Total duration: %s" % format_duration(duration)) 533 print("Tests result: %s" % self.get_tests_result()) 534 535 if self.ns.runleaks: 536 os.system("leaks %d" % os.getpid()) 537 538 def save_xml_result(self): 539 if not self.ns.xmlpath and not self.testsuite_xml: 540 return 541 542 import xml.etree.ElementTree as ET 543 root = ET.Element("testsuites") 544 545 # Manually count the totals for the overall summary 546 totals = {'tests': 0, 'errors': 0, 'failures': 0} 547 for suite in self.testsuite_xml: 548 root.append(suite) 549 for k in totals: 550 try: 551 totals[k] += int(suite.get(k, 0)) 552 except ValueError: 553 pass 554 555 for k, v in totals.items(): 556 root.set(k, str(v)) 557 558 xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath) 559 with open(xmlpath, 'wb') as f: 560 for s in ET.tostringlist(root): 561 f.write(s) 562 563 def set_temp_dir(self): 564 if self.ns.tempdir: 565 self.tmp_dir = self.ns.tempdir 566 567 if not self.tmp_dir: 568 # When tests are run from the Python build directory, it is best practice 569 # to keep the test files in a subfolder. This eases the cleanup of leftover 570 # files using the "make distclean" command. 571 if sysconfig.is_python_build(): 572 self.tmp_dir = sysconfig.get_config_var('abs_builddir') 573 if self.tmp_dir is None: 574 # bpo-30284: On Windows, only srcdir is available. Using 575 # abs_builddir mostly matters on UNIX when building Python 576 # out of the source tree, especially when the source tree 577 # is read only. 578 self.tmp_dir = sysconfig.get_config_var('srcdir') 579 self.tmp_dir = os.path.join(self.tmp_dir, 'build') 580 else: 581 self.tmp_dir = tempfile.gettempdir() 582 583 self.tmp_dir = os.path.abspath(self.tmp_dir) 584 585 def create_temp_dir(self): 586 os.makedirs(self.tmp_dir, exist_ok=True) 587 588 # Define a writable temp dir that will be used as cwd while running 589 # the tests. The name of the dir includes the pid to allow parallel 590 # testing (see the -j option). 591 pid = os.getpid() 592 if self.worker_test_name is not None: 593 test_cwd = 'test_python_worker_{}'.format(pid) 594 else: 595 test_cwd = 'test_python_{}'.format(pid) 596 test_cwd = os.path.join(self.tmp_dir, test_cwd) 597 return test_cwd 598 599 def cleanup(self): 600 import glob 601 602 path = os.path.join(self.tmp_dir, 'test_python_*') 603 print("Cleanup %s directory" % self.tmp_dir) 604 for name in glob.glob(path): 605 if os.path.isdir(name): 606 print("Remove directory: %s" % name) 607 support.rmtree(name) 608 else: 609 print("Remove file: %s" % name) 610 support.unlink(name) 611 612 def main(self, tests=None, **kwargs): 613 self.parse_args(kwargs) 614 615 self.set_temp_dir() 616 617 if self.ns.cleanup: 618 self.cleanup() 619 sys.exit(0) 620 621 test_cwd = self.create_temp_dir() 622 623 try: 624 # Run the tests in a context manager that temporarily changes the CWD 625 # to a temporary and writable directory. If it's not possible to 626 # create or change the CWD, the original CWD will be used. 627 # The original CWD is available from support.SAVEDCWD. 628 with support.temp_cwd(test_cwd, quiet=True): 629 # When using multiprocessing, worker processes will use test_cwd 630 # as their parent temporary directory. So when the main process 631 # exit, it removes also subdirectories of worker processes. 632 self.ns.tempdir = test_cwd 633 634 self._main(tests, kwargs) 635 except SystemExit as exc: 636 # bpo-38203: Python can hang at exit in Py_Finalize(), especially 637 # on threading._shutdown() call: put a timeout 638 faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) 639 640 sys.exit(exc.code) 641 642 def getloadavg(self): 643 if self.win_load_tracker is not None: 644 return self.win_load_tracker.getloadavg() 645 646 if hasattr(os, 'getloadavg'): 647 return os.getloadavg()[0] 648 649 return None 650 651 def _main(self, tests, kwargs): 652 if self.worker_test_name is not None: 653 from test.libregrtest.runtest_mp import run_tests_worker 654 run_tests_worker(self.ns, self.worker_test_name) 655 656 if self.ns.wait: 657 input("Press any key to continue...") 658 659 support.PGO = self.ns.pgo 660 support.PGO_EXTENDED = self.ns.pgo_extended 661 662 setup_tests(self.ns) 663 664 self.find_tests(tests) 665 666 if self.ns.list_tests: 667 self.list_tests() 668 sys.exit(0) 669 670 if self.ns.list_cases: 671 self.list_cases() 672 sys.exit(0) 673 674 # If we're on windows and this is the parent runner (not a worker), 675 # track the load average. 676 if sys.platform == 'win32' and self.worker_test_name is None: 677 from test.libregrtest.win_utils import WindowsLoadTracker 678 679 try: 680 self.win_load_tracker = WindowsLoadTracker() 681 except FileNotFoundError as error: 682 # Windows IoT Core and Windows Nano Server do not provide 683 # typeperf.exe for x64, x86 or ARM 684 print(f'Failed to create WindowsLoadTracker: {error}') 685 686 try: 687 self.run_tests() 688 self.display_result() 689 690 if self.ns.verbose2 and self.bad: 691 self.rerun_failed_tests() 692 finally: 693 if self.win_load_tracker is not None: 694 self.win_load_tracker.close() 695 self.win_load_tracker = None 696 697 self.finalize() 698 699 self.save_xml_result() 700 701 if self.bad: 702 sys.exit(2) 703 if self.interrupted: 704 sys.exit(130) 705 if self.ns.fail_env_changed and self.environment_changed: 706 sys.exit(3) 707 sys.exit(0) 708 709 710def main(tests=None, **kwargs): 711 """Run the Python suite.""" 712 Regrtest().main(tests=tests, **kwargs) 713