import faulthandler import locale import os import platform import random import re import sys import sysconfig import tempfile import time import unittest from test.libregrtest.cmdline import _parse_args from test.libregrtest.runtest import ( findtests, runtest, get_abs_module, is_failed, STDTESTS, NOTTESTS, PROGRESS_MIN_TIME, Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted, ChildError, DidNotRun) from test.libregrtest.setup import setup_tests from test.libregrtest.pgo import setup_pgo_tests from test.libregrtest.utils import removepy, count, format_duration, printlist from test import support from test.support import os_helper # bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()). # Used to protect against threading._shutdown() hang. # Must be smaller than buildbot "1200 seconds without output" limit. EXIT_TIMEOUT = 120.0 class Regrtest: """Execute a test suite. This also parses command-line options and modifies its behavior accordingly. tests -- a list of strings containing test names (optional) testdir -- the directory in which to look for tests (optional) Users other than the Python test suite will certainly want to specify testdir; if it's omitted, the directory containing the Python test suite is searched for. If the tests argument is omitted, the tests listed on the command-line will be used. If that's empty, too, then all *.py files beginning with test_ will be used. The other default arguments (verbose, quiet, exclude, single, randomize, findleaks, use_resources, trace, coverdir, print_slow, and random_seed) allow programmers calling main() directly to set the values that would normally be set by flags on the command line. """ def __init__(self): # Namespace of command line options self.ns = None # tests self.tests = [] self.selected = [] # test results self.good = [] self.bad = [] self.skipped = [] self.resource_denieds = [] self.environment_changed = [] self.run_no_tests = [] self.need_rerun = [] self.rerun = [] self.first_result = None self.interrupted = False # used by --slow self.test_times = [] # used by --coverage, trace.Trace instance self.tracer = None # used to display the progress bar "[ 3/100]" self.start_time = time.monotonic() self.test_count = '' self.test_count_width = 1 # used by --single self.next_single_test = None self.next_single_filename = None # used by --junit-xml self.testsuite_xml = None # misc self.win_load_tracker = None self.tmp_dir = None self.worker_test_name = None def get_executed(self): return (set(self.good) | set(self.bad) | set(self.skipped) | set(self.resource_denieds) | set(self.environment_changed) | set(self.run_no_tests)) def accumulate_result(self, result, rerun=False): test_name = result.name if not isinstance(result, (ChildError, Interrupted)) and not rerun: self.test_times.append((result.duration_sec, test_name)) if isinstance(result, Passed): self.good.append(test_name) elif isinstance(result, ResourceDenied): self.skipped.append(test_name) self.resource_denieds.append(test_name) elif isinstance(result, Skipped): self.skipped.append(test_name) elif isinstance(result, EnvChanged): self.environment_changed.append(test_name) elif isinstance(result, Failed): if not rerun: self.bad.append(test_name) self.need_rerun.append(result) elif isinstance(result, DidNotRun): self.run_no_tests.append(test_name) elif isinstance(result, Interrupted): self.interrupted = True else: raise ValueError("invalid test result: %r" % result) if rerun and not isinstance(result, (Failed, Interrupted)): self.bad.remove(test_name) xml_data = result.xml_data if xml_data: import xml.etree.ElementTree as ET for e in xml_data: try: self.testsuite_xml.append(ET.fromstring(e)) except ET.ParseError: print(xml_data, file=sys.__stderr__) raise def log(self, line=''): empty = not line # add the system load prefix: "load avg: 1.80 " load_avg = self.getloadavg() if load_avg is not None: line = f"load avg: {load_avg:.2f} {line}" # add the timestamp prefix: "0:01:05 " test_time = time.monotonic() - self.start_time mins, secs = divmod(int(test_time), 60) hours, mins = divmod(mins, 60) test_time = "%d:%02d:%02d" % (hours, mins, secs) line = f"{test_time} {line}" if empty: line = line[:-1] print(line, flush=True) def display_progress(self, test_index, text): if self.ns.quiet: return # "[ 51/405/1] test_tcl passed" line = f"{test_index:{self.test_count_width}}{self.test_count}" fails = len(self.bad) + len(self.environment_changed) if fails and not self.ns.pgo: line = f"{line}/{fails}" self.log(f"[{line}] {text}") def parse_args(self, kwargs): ns = _parse_args(sys.argv[1:], **kwargs) if ns.xmlpath: support.junit_xml_list = self.testsuite_xml = [] worker_args = ns.worker_args if worker_args is not None: from test.libregrtest.runtest_mp import parse_worker_args ns, test_name = parse_worker_args(ns.worker_args) ns.worker_args = worker_args self.worker_test_name = test_name # Strip .py extensions. removepy(ns.args) if ns.huntrleaks: warmup, repetitions, _ = ns.huntrleaks if warmup < 1 or repetitions < 1: msg = ("Invalid values for the --huntrleaks/-R parameters. The " "number of warmups and repetitions must be at least 1 " "each (1:1).") print(msg, file=sys.stderr, flush=True) sys.exit(2) if ns.tempdir: ns.tempdir = os.path.expanduser(ns.tempdir) self.ns = ns def find_tests(self, tests): self.tests = tests if self.ns.single: self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest') try: with open(self.next_single_filename, 'r') as fp: next_test = fp.read().strip() self.tests = [next_test] except OSError: pass if self.ns.fromfile: self.tests = [] # regex to match 'test_builtin' in line: # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b') with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp: for line in fp: line = line.split('#', 1)[0] line = line.strip() match = regex.search(line) if match is not None: self.tests.append(match.group()) removepy(self.tests) if self.ns.pgo: # add default PGO tests if no tests are specified setup_pgo_tests(self.ns) stdtests = STDTESTS[:] nottests = NOTTESTS.copy() if self.ns.exclude: for arg in self.ns.args: if arg in stdtests: stdtests.remove(arg) nottests.add(arg) self.ns.args = [] # if testdir is set, then we are not running the python tests suite, so # don't add default tests to be executed or skipped (pass empty values) if self.ns.testdir: alltests = findtests(self.ns.testdir, list(), set()) else: alltests = findtests(self.ns.testdir, stdtests, nottests) if not self.ns.fromfile: self.selected = self.tests or self.ns.args or alltests else: self.selected = self.tests if self.ns.single: self.selected = self.selected[:1] try: pos = alltests.index(self.selected[0]) self.next_single_test = alltests[pos + 1] except IndexError: pass # Remove all the selected tests that precede start if it's set. if self.ns.start: try: del self.selected[:self.selected.index(self.ns.start)] except ValueError: print("Couldn't find starting test (%s), using all tests" % self.ns.start, file=sys.stderr) if self.ns.randomize: if self.ns.random_seed is None: self.ns.random_seed = random.randrange(10000000) random.seed(self.ns.random_seed) random.shuffle(self.selected) def list_tests(self): for name in self.selected: print(name) def _list_cases(self, suite): for test in suite: if isinstance(test, unittest.loader._FailedTest): continue if isinstance(test, unittest.TestSuite): self._list_cases(test) elif isinstance(test, unittest.TestCase): if support.match_test(test): print(test.id()) def list_cases(self): support.verbose = False support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests) for test_name in self.selected: abstest = get_abs_module(self.ns, test_name) try: suite = unittest.defaultTestLoader.loadTestsFromName(abstest) self._list_cases(suite) except unittest.SkipTest: self.skipped.append(test_name) if self.skipped: print(file=sys.stderr) print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr) printlist(self.skipped, file=sys.stderr) def rerun_failed_tests(self): self.ns.verbose = True self.ns.failfast = False self.ns.verbose3 = False self.first_result = self.get_tests_result() self.log() self.log("Re-running failed tests in verbose mode") rerun_list = list(self.need_rerun) self.need_rerun.clear() for result in rerun_list: test_name = result.name self.rerun.append(test_name) errors = result.errors or [] failures = result.failures or [] error_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in errors] failure_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in failures] self.ns.verbose = True orig_match_tests = self.ns.match_tests if errors or failures: if self.ns.match_tests is None: self.ns.match_tests = [] self.ns.match_tests.extend(error_names) self.ns.match_tests.extend(failure_names) matching = "matching: " + ", ".join(self.ns.match_tests) self.log(f"Re-running {test_name} in verbose mode ({matching})") else: self.log(f"Re-running {test_name} in verbose mode") result = runtest(self.ns, test_name) self.ns.match_tests = orig_match_tests self.accumulate_result(result, rerun=True) if isinstance(result, Interrupted): break if self.bad: print(count(len(self.bad), 'test'), "failed again:") printlist(self.bad) self.display_result() def display_result(self): # If running the test suite for PGO then no one cares about results. if self.ns.pgo: return print() print("== Tests result: %s ==" % self.get_tests_result()) if self.interrupted: print("Test suite interrupted by signal SIGINT.") omitted = set(self.selected) - self.get_executed() if omitted: print() print(count(len(omitted), "test"), "omitted:") printlist(omitted) if self.good and not self.ns.quiet: print() if (not self.bad and not self.skipped and not self.interrupted and len(self.good) > 1): print("All", end=' ') print(count(len(self.good), "test"), "OK.") if self.ns.print_slow: self.test_times.sort(reverse=True) print() print("10 slowest tests:") for test_time, test in self.test_times[:10]: print("- %s: %s" % (test, format_duration(test_time))) if self.bad: print() print(count(len(self.bad), "test"), "failed:") printlist(self.bad) if self.environment_changed: print() print("{} altered the execution environment:".format( count(len(self.environment_changed), "test"))) printlist(self.environment_changed) if self.skipped and not self.ns.quiet: print() print(count(len(self.skipped), "test"), "skipped:") printlist(self.skipped) if self.rerun: print() print("%s:" % count(len(self.rerun), "re-run test")) printlist(self.rerun) if self.run_no_tests: print() print(count(len(self.run_no_tests), "test"), "run no tests:") printlist(self.run_no_tests) def run_tests_sequential(self): if self.ns.trace: import trace self.tracer = trace.Trace(trace=False, count=True) save_modules = sys.modules.keys() msg = "Run tests sequentially" if self.ns.timeout: msg += " (timeout: %s)" % format_duration(self.ns.timeout) self.log(msg) previous_test = None for test_index, test_name in enumerate(self.tests, 1): start_time = time.monotonic() text = test_name if previous_test: text = '%s -- %s' % (text, previous_test) self.display_progress(test_index, text) if self.tracer: # If we're tracing code coverage, then we don't exit with status # if on a false return value from main. cmd = ('result = runtest(self.ns, test_name); ' 'self.accumulate_result(result)') ns = dict(locals()) self.tracer.runctx(cmd, globals=globals(), locals=ns) result = ns['result'] else: result = runtest(self.ns, test_name) self.accumulate_result(result) if isinstance(result, Interrupted): break previous_test = str(result) test_time = time.monotonic() - start_time if test_time >= PROGRESS_MIN_TIME: previous_test = "%s in %s" % (previous_test, format_duration(test_time)) elif isinstance(result, Passed): # be quiet: say nothing if the test passed shortly previous_test = None # Unload the newly imported modules (best effort finalization) for module in sys.modules.keys(): if module not in save_modules and module.startswith("test."): support.unload(module) if self.ns.failfast and is_failed(result, self.ns): break if previous_test: print(previous_test) def _test_forever(self, tests): while True: for test_name in tests: yield test_name if self.bad: return if self.ns.fail_env_changed and self.environment_changed: return def display_header(self): # Print basic platform information print("==", platform.python_implementation(), *sys.version.split()) print("==", platform.platform(aliased=True), "%s-endian" % sys.byteorder) print("== cwd:", os.getcwd()) cpu_count = os.cpu_count() if cpu_count: print("== CPU count:", cpu_count) print("== encodings: locale=%s, FS=%s" % (locale.getpreferredencoding(False), sys.getfilesystemencoding())) def get_tests_result(self): result = [] if self.bad: result.append("FAILURE") elif self.ns.fail_env_changed and self.environment_changed: result.append("ENV CHANGED") elif not any((self.good, self.bad, self.skipped, self.interrupted, self.environment_changed)): result.append("NO TEST RUN") if self.interrupted: result.append("INTERRUPTED") if not result: result.append("SUCCESS") result = ', '.join(result) if self.first_result: result = '%s then %s' % (self.first_result, result) return result def run_tests(self): # For a partial run, we do not need to clutter the output. if (self.ns.header or not(self.ns.pgo or self.ns.quiet or self.ns.single or self.tests or self.ns.args)): self.display_header() if self.ns.huntrleaks: warmup, repetitions, _ = self.ns.huntrleaks if warmup < 3: msg = ("WARNING: Running tests with --huntrleaks/-R and less than " "3 warmup repetitions can give false positives!") print(msg, file=sys.stdout, flush=True) if self.ns.randomize: print("Using random seed", self.ns.random_seed) if self.ns.forever: self.tests = self._test_forever(list(self.selected)) self.test_count = '' self.test_count_width = 3 else: self.tests = iter(self.selected) self.test_count = '/{}'.format(len(self.selected)) self.test_count_width = len(self.test_count) - 1 if self.ns.use_mp: from test.libregrtest.runtest_mp import run_tests_multiprocess run_tests_multiprocess(self) else: self.run_tests_sequential() def finalize(self): if self.next_single_filename: if self.next_single_test: with open(self.next_single_filename, 'w') as fp: fp.write(self.next_single_test + '\n') else: os.unlink(self.next_single_filename) if self.tracer: r = self.tracer.results() r.write_results(show_missing=True, summary=True, coverdir=self.ns.coverdir) print() duration = time.monotonic() - self.start_time print("Total duration: %s" % format_duration(duration)) print("Tests result: %s" % self.get_tests_result()) if self.ns.runleaks: os.system("leaks %d" % os.getpid()) def save_xml_result(self): if not self.ns.xmlpath and not self.testsuite_xml: return import xml.etree.ElementTree as ET root = ET.Element("testsuites") # Manually count the totals for the overall summary totals = {'tests': 0, 'errors': 0, 'failures': 0} for suite in self.testsuite_xml: root.append(suite) for k in totals: try: totals[k] += int(suite.get(k, 0)) except ValueError: pass for k, v in totals.items(): root.set(k, str(v)) xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath) with open(xmlpath, 'wb') as f: for s in ET.tostringlist(root): f.write(s) def set_temp_dir(self): if self.ns.tempdir: self.tmp_dir = self.ns.tempdir if not self.tmp_dir: # When tests are run from the Python build directory, it is best practice # to keep the test files in a subfolder. This eases the cleanup of leftover # files using the "make distclean" command. if sysconfig.is_python_build(): self.tmp_dir = sysconfig.get_config_var('abs_builddir') if self.tmp_dir is None: # bpo-30284: On Windows, only srcdir is available. Using # abs_builddir mostly matters on UNIX when building Python # out of the source tree, especially when the source tree # is read only. self.tmp_dir = sysconfig.get_config_var('srcdir') self.tmp_dir = os.path.join(self.tmp_dir, 'build') else: self.tmp_dir = tempfile.gettempdir() self.tmp_dir = os.path.abspath(self.tmp_dir) def create_temp_dir(self): os.makedirs(self.tmp_dir, exist_ok=True) # Define a writable temp dir that will be used as cwd while running # the tests. The name of the dir includes the pid to allow parallel # testing (see the -j option). pid = os.getpid() if self.worker_test_name is not None: test_cwd = 'test_python_worker_{}'.format(pid) else: test_cwd = 'test_python_{}'.format(pid) test_cwd += os_helper.FS_NONASCII test_cwd = os.path.join(self.tmp_dir, test_cwd) return test_cwd def cleanup(self): import glob path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*') print("Cleanup %s directory" % self.tmp_dir) for name in glob.glob(path): if os.path.isdir(name): print("Remove directory: %s" % name) os_helper.rmtree(name) else: print("Remove file: %s" % name) os_helper.unlink(name) def main(self, tests=None, **kwargs): self.parse_args(kwargs) self.set_temp_dir() if self.ns.cleanup: self.cleanup() sys.exit(0) test_cwd = self.create_temp_dir() try: # Run the tests in a context manager that temporarily changes the CWD # to a temporary and writable directory. If it's not possible to # create or change the CWD, the original CWD will be used. # The original CWD is available from os_helper.SAVEDCWD. with os_helper.temp_cwd(test_cwd, quiet=True): # When using multiprocessing, worker processes will use test_cwd # as their parent temporary directory. So when the main process # exit, it removes also subdirectories of worker processes. self.ns.tempdir = test_cwd self._main(tests, kwargs) except SystemExit as exc: # bpo-38203: Python can hang at exit in Py_Finalize(), especially # on threading._shutdown() call: put a timeout faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True) sys.exit(exc.code) def getloadavg(self): if self.win_load_tracker is not None: return self.win_load_tracker.getloadavg() if hasattr(os, 'getloadavg'): return os.getloadavg()[0] return None def _main(self, tests, kwargs): if self.worker_test_name is not None: from test.libregrtest.runtest_mp import run_tests_worker run_tests_worker(self.ns, self.worker_test_name) if self.ns.wait: input("Press any key to continue...") support.PGO = self.ns.pgo support.PGO_EXTENDED = self.ns.pgo_extended setup_tests(self.ns) self.find_tests(tests) if self.ns.list_tests: self.list_tests() sys.exit(0) if self.ns.list_cases: self.list_cases() sys.exit(0) # If we're on windows and this is the parent runner (not a worker), # track the load average. if sys.platform == 'win32' and self.worker_test_name is None: from test.libregrtest.win_utils import WindowsLoadTracker try: self.win_load_tracker = WindowsLoadTracker() except FileNotFoundError as error: # Windows IoT Core and Windows Nano Server do not provide # typeperf.exe for x64, x86 or ARM print(f'Failed to create WindowsLoadTracker: {error}') try: self.run_tests() self.display_result() if self.ns.verbose2 and self.bad: self.rerun_failed_tests() finally: if self.win_load_tracker is not None: self.win_load_tracker.close() self.win_load_tracker = None self.finalize() self.save_xml_result() if self.bad: sys.exit(2) if self.interrupted: sys.exit(130) if self.ns.fail_env_changed and self.environment_changed: sys.exit(3) sys.exit(0) def main(tests=None, **kwargs): """Run the Python suite.""" Regrtest().main(tests=tests, **kwargs)