1import datetime 2import faulthandler 3import locale 4import os 5import platform 6import random 7import re 8import sys 9import sysconfig 10import tempfile 11import textwrap 12import time 13from test.libregrtest.cmdline import _parse_args 14from test.libregrtest.runtest import ( 15 findtests, runtest, 16 STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED, 17 INTERRUPTED, CHILD_ERROR, 18 PROGRESS_MIN_TIME, format_test_result) 19from test.libregrtest.setup import setup_tests 20from test import support 21try: 22 import gc 23except ImportError: 24 gc = None 25 26 27# When tests are run from the Python build directory, it is best practice 28# to keep the test files in a subfolder. This eases the cleanup of leftover 29# files using the "make distclean" command. 30if sysconfig.is_python_build(): 31 TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build') 32else: 33 TEMPDIR = tempfile.gettempdir() 34TEMPDIR = os.path.abspath(TEMPDIR) 35 36 37def format_duration(seconds): 38 if seconds < 1.0: 39 return '%.0f ms' % (seconds * 1e3) 40 if seconds < 60.0: 41 return '%.0f sec' % seconds 42 43 minutes, seconds = divmod(seconds, 60.0) 44 return '%.0f min %.0f sec' % (minutes, seconds) 45 46 47class Regrtest: 48 """Execute a test suite. 49 50 This also parses command-line options and modifies its behavior 51 accordingly. 52 53 tests -- a list of strings containing test names (optional) 54 testdir -- the directory in which to look for tests (optional) 55 56 Users other than the Python test suite will certainly want to 57 specify testdir; if it's omitted, the directory containing the 58 Python test suite is searched for. 59 60 If the tests argument is omitted, the tests listed on the 61 command-line will be used. If that's empty, too, then all *.py 62 files beginning with test_ will be used. 63 64 The other default arguments (verbose, quiet, exclude, 65 single, randomize, findleaks, use_resources, trace, coverdir, 66 print_slow, and random_seed) allow programmers calling main() 67 directly to set the values that would normally be set by flags 68 on the command line. 69 """ 70 def __init__(self): 71 # Namespace of command line options 72 self.ns = None 73 74 # tests 75 self.tests = [] 76 self.selected = [] 77 78 # test results 79 self.good = [] 80 self.bad = [] 81 self.skipped = [] 82 self.resource_denieds = [] 83 self.environment_changed = [] 84 self.interrupted = False 85 86 # used by --slow 87 self.test_times = [] 88 89 # used by --coverage, trace.Trace instance 90 self.tracer = None 91 92 # used by --findleaks, store for gc.garbage 93 self.found_garbage = [] 94 95 # used to display the progress bar "[ 3/100]" 96 self.start_time = time.monotonic() 97 self.test_count = '' 98 self.test_count_width = 1 99 100 # used by --single 101 self.next_single_test = None 102 self.next_single_filename = None 103 104 def accumulate_result(self, test, result): 105 ok, test_time = result 106 if ok not in (CHILD_ERROR, INTERRUPTED): 107 self.test_times.append((test_time, test)) 108 if ok == PASSED: 109 self.good.append(test) 110 elif ok == FAILED: 111 self.bad.append(test) 112 elif ok == ENV_CHANGED: 113 self.environment_changed.append(test) 114 elif ok == SKIPPED: 115 self.skipped.append(test) 116 elif ok == RESOURCE_DENIED: 117 self.skipped.append(test) 118 self.resource_denieds.append(test) 119 120 def display_progress(self, test_index, test): 121 if self.ns.quiet: 122 return 123 if self.bad and not self.ns.pgo: 124 fmt = "{time} [{test_index:{count_width}}{test_count}/{nbad}] {test_name}" 125 else: 126 fmt = "{time} [{test_index:{count_width}}{test_count}] {test_name}" 127 test_time = time.monotonic() - self.start_time 128 test_time = datetime.timedelta(seconds=int(test_time)) 129 line = fmt.format(count_width=self.test_count_width, 130 test_index=test_index, 131 test_count=self.test_count, 132 nbad=len(self.bad), 133 test_name=test, 134 time=test_time) 135 print(line, flush=True) 136 137 def parse_args(self, kwargs): 138 ns = _parse_args(sys.argv[1:], **kwargs) 139 140 if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'): 141 print("Warning: The timeout option requires " 142 "faulthandler.dump_traceback_later", file=sys.stderr) 143 ns.timeout = None 144 145 if ns.threshold is not None and gc is None: 146 print('No GC available, ignore --threshold.', file=sys.stderr) 147 ns.threshold = None 148 149 if ns.findleaks: 150 if gc is not None: 151 # Uncomment the line below to report garbage that is not 152 # freeable by reference counting alone. By default only 153 # garbage that is not collectable by the GC is reported. 154 pass 155 #gc.set_debug(gc.DEBUG_SAVEALL) 156 else: 157 print('No GC available, disabling --findleaks', 158 file=sys.stderr) 159 ns.findleaks = False 160 161 # Strip .py extensions. 162 removepy(ns.args) 163 164 return ns 165 166 def find_tests(self, tests): 167 self.tests = tests 168 169 if self.ns.single: 170 self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest') 171 try: 172 with open(self.next_single_filename, 'r') as fp: 173 next_test = fp.read().strip() 174 self.tests = [next_test] 175 except OSError: 176 pass 177 178 if self.ns.fromfile: 179 self.tests = [] 180 # regex to match 'test_builtin' in line: 181 # '0:00:00 [ 4/400] test_builtin -- test_dict took 1 sec' 182 regex = (r'^(?:[0-9]+:[0-9]+:[0-9]+ *)?' 183 r'(?:\[[0-9/ ]+\] *)?' 184 r'(test_[a-zA-Z0-9_]+)') 185 regex = re.compile(regex) 186 with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp: 187 for line in fp: 188 line = line.strip() 189 if line.startswith('#'): 190 continue 191 match = regex.match(line) 192 if match is None: 193 continue 194 self.tests.append(match.group(1)) 195 196 removepy(self.tests) 197 198 stdtests = STDTESTS[:] 199 nottests = NOTTESTS.copy() 200 if self.ns.exclude: 201 for arg in self.ns.args: 202 if arg in stdtests: 203 stdtests.remove(arg) 204 nottests.add(arg) 205 self.ns.args = [] 206 207 # if testdir is set, then we are not running the python tests suite, so 208 # don't add default tests to be executed or skipped (pass empty values) 209 if self.ns.testdir: 210 alltests = findtests(self.ns.testdir, list(), set()) 211 else: 212 alltests = findtests(self.ns.testdir, stdtests, nottests) 213 214 if not self.ns.fromfile: 215 self.selected = self.tests or self.ns.args or alltests 216 else: 217 self.selected = self.tests 218 if self.ns.single: 219 self.selected = self.selected[:1] 220 try: 221 pos = alltests.index(self.selected[0]) 222 self.next_single_test = alltests[pos + 1] 223 except IndexError: 224 pass 225 226 # Remove all the selected tests that precede start if it's set. 227 if self.ns.start: 228 try: 229 del self.selected[:self.selected.index(self.ns.start)] 230 except ValueError: 231 print("Couldn't find starting test (%s), using all tests" 232 % self.ns.start, file=sys.stderr) 233 234 if self.ns.randomize: 235 if self.ns.random_seed is None: 236 self.ns.random_seed = random.randrange(10000000) 237 random.seed(self.ns.random_seed) 238 random.shuffle(self.selected) 239 240 def list_tests(self): 241 for name in self.selected: 242 print(name) 243 244 def rerun_failed_tests(self): 245 self.ns.verbose = True 246 self.ns.failfast = False 247 self.ns.verbose3 = False 248 self.ns.match_tests = None 249 250 print("Re-running failed tests in verbose mode") 251 for test in self.bad[:]: 252 print("Re-running test %r in verbose mode" % test, flush=True) 253 try: 254 self.ns.verbose = True 255 ok = runtest(self.ns, test) 256 except KeyboardInterrupt: 257 self.interrupted = True 258 # print a newline separate from the ^C 259 print() 260 break 261 else: 262 if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}: 263 self.bad.remove(test) 264 else: 265 if self.bad: 266 print(count(len(self.bad), 'test'), "failed again:") 267 printlist(self.bad) 268 269 def display_result(self): 270 if self.interrupted: 271 # print a newline after ^C 272 print() 273 print("Test suite interrupted by signal SIGINT.") 274 executed = set(self.good) | set(self.bad) | set(self.skipped) 275 omitted = set(self.selected) - executed 276 print(count(len(omitted), "test"), "omitted:") 277 printlist(omitted) 278 279 # If running the test suite for PGO then no one cares about 280 # results. 281 if self.ns.pgo: 282 return 283 284 if self.good and not self.ns.quiet: 285 if (not self.bad 286 and not self.skipped 287 and not self.interrupted 288 and len(self.good) > 1): 289 print("All", end=' ') 290 print(count(len(self.good), "test"), "OK.") 291 292 if self.ns.print_slow: 293 self.test_times.sort(reverse=True) 294 print() 295 print("10 slowest tests:") 296 for time, test in self.test_times[:10]: 297 print("- %s: %s" % (test, format_duration(time))) 298 299 if self.bad: 300 print() 301 print(count(len(self.bad), "test"), "failed:") 302 printlist(self.bad) 303 304 if self.environment_changed: 305 print() 306 print("{} altered the execution environment:".format( 307 count(len(self.environment_changed), "test"))) 308 printlist(self.environment_changed) 309 310 if self.skipped and not self.ns.quiet: 311 print() 312 print(count(len(self.skipped), "test"), "skipped:") 313 printlist(self.skipped) 314 315 def run_tests_sequential(self): 316 if self.ns.trace: 317 import trace 318 self.tracer = trace.Trace(trace=False, count=True) 319 320 save_modules = sys.modules.keys() 321 322 print("Run tests sequentially") 323 324 previous_test = None 325 for test_index, test in enumerate(self.tests, 1): 326 start_time = time.monotonic() 327 328 text = test 329 if previous_test: 330 text = '%s -- %s' % (text, previous_test) 331 self.display_progress(test_index, text) 332 333 if self.tracer: 334 # If we're tracing code coverage, then we don't exit with status 335 # if on a false return value from main. 336 cmd = ('result = runtest(self.ns, test); ' 337 'self.accumulate_result(test, result)') 338 ns = dict(locals()) 339 self.tracer.runctx(cmd, globals=globals(), locals=ns) 340 result = ns['result'] 341 else: 342 try: 343 result = runtest(self.ns, test) 344 except KeyboardInterrupt: 345 self.interrupted = True 346 self.accumulate_result(test, (INTERRUPTED, None)) 347 break 348 else: 349 self.accumulate_result(test, result) 350 351 previous_test = format_test_result(test, result[0]) 352 test_time = time.monotonic() - start_time 353 if test_time >= PROGRESS_MIN_TIME: 354 previous_test = "%s in %s" % (previous_test, format_duration(test_time)) 355 elif result[0] == PASSED: 356 # be quiet: say nothing if the test passed shortly 357 previous_test = None 358 359 if self.ns.findleaks: 360 gc.collect() 361 if gc.garbage: 362 print("Warning: test created", len(gc.garbage), end=' ') 363 print("uncollectable object(s).") 364 # move the uncollectable objects somewhere so we don't see 365 # them again 366 self.found_garbage.extend(gc.garbage) 367 del gc.garbage[:] 368 369 # Unload the newly imported modules (best effort finalization) 370 for module in sys.modules.keys(): 371 if module not in save_modules and module.startswith("test."): 372 support.unload(module) 373 374 if previous_test: 375 print(previous_test) 376 377 def _test_forever(self, tests): 378 while True: 379 for test in tests: 380 yield test 381 if self.bad: 382 return 383 384 def run_tests(self): 385 # For a partial run, we do not need to clutter the output. 386 if (self.ns.verbose 387 or self.ns.header 388 or not (self.ns.pgo or self.ns.quiet or self.ns.single 389 or self.tests or self.ns.args)): 390 # Print basic platform information 391 print("==", platform.python_implementation(), *sys.version.split()) 392 print("== ", platform.platform(aliased=True), 393 "%s-endian" % sys.byteorder) 394 print("== ", "hash algorithm:", sys.hash_info.algorithm, 395 "64bit" if sys.maxsize > 2**32 else "32bit") 396 print("== cwd:", os.getcwd()) 397 print("== encodings: locale=%s, FS=%s" 398 % (locale.getpreferredencoding(False), 399 sys.getfilesystemencoding())) 400 print("Testing with flags:", sys.flags) 401 402 if self.ns.randomize: 403 print("Using random seed", self.ns.random_seed) 404 405 if self.ns.forever: 406 self.tests = self._test_forever(list(self.selected)) 407 self.test_count = '' 408 self.test_count_width = 3 409 else: 410 self.tests = iter(self.selected) 411 self.test_count = '/{}'.format(len(self.selected)) 412 self.test_count_width = len(self.test_count) - 1 413 414 if self.ns.use_mp: 415 from test.libregrtest.runtest_mp import run_tests_multiprocess 416 run_tests_multiprocess(self) 417 else: 418 self.run_tests_sequential() 419 420 def finalize(self): 421 if self.next_single_filename: 422 if self.next_single_test: 423 with open(self.next_single_filename, 'w') as fp: 424 fp.write(self.next_single_test + '\n') 425 else: 426 os.unlink(self.next_single_filename) 427 428 if self.tracer: 429 r = self.tracer.results() 430 r.write_results(show_missing=True, summary=True, 431 coverdir=self.ns.coverdir) 432 433 print() 434 duration = time.monotonic() - self.start_time 435 print("Total duration: %s" % format_duration(duration)) 436 437 if self.bad: 438 result = "FAILURE" 439 elif self.interrupted: 440 result = "INTERRUPTED" 441 else: 442 result = "SUCCESS" 443 print("Tests result: %s" % result) 444 445 if self.ns.runleaks: 446 os.system("leaks %d" % os.getpid()) 447 448 def main(self, tests=None, **kwargs): 449 global TEMPDIR 450 451 if sysconfig.is_python_build(): 452 try: 453 os.mkdir(TEMPDIR) 454 except FileExistsError: 455 pass 456 457 # Define a writable temp dir that will be used as cwd while running 458 # the tests. The name of the dir includes the pid to allow parallel 459 # testing (see the -j option). 460 test_cwd = 'test_python_{}'.format(os.getpid()) 461 test_cwd = os.path.join(TEMPDIR, test_cwd) 462 463 # Run the tests in a context manager that temporarily changes the CWD to a 464 # temporary and writable directory. If it's not possible to create or 465 # change the CWD, the original CWD will be used. The original CWD is 466 # available from support.SAVEDCWD. 467 with support.temp_cwd(test_cwd, quiet=True): 468 self._main(tests, kwargs) 469 470 def _main(self, tests, kwargs): 471 self.ns = self.parse_args(kwargs) 472 473 if self.ns.slaveargs is not None: 474 from test.libregrtest.runtest_mp import run_tests_slave 475 run_tests_slave(self.ns.slaveargs) 476 477 if self.ns.wait: 478 input("Press any key to continue...") 479 480 support.PGO = self.ns.pgo 481 482 setup_tests(self.ns) 483 484 self.find_tests(tests) 485 486 if self.ns.list_tests: 487 self.list_tests() 488 sys.exit(0) 489 490 self.run_tests() 491 self.display_result() 492 493 if self.ns.verbose2 and self.bad: 494 self.rerun_failed_tests() 495 496 self.finalize() 497 sys.exit(len(self.bad) > 0 or self.interrupted) 498 499 500def removepy(names): 501 if not names: 502 return 503 for idx, name in enumerate(names): 504 basename, ext = os.path.splitext(name) 505 if ext == '.py': 506 names[idx] = basename 507 508 509def count(n, word): 510 if n == 1: 511 return "%d %s" % (n, word) 512 else: 513 return "%d %ss" % (n, word) 514 515 516def printlist(x, width=70, indent=4): 517 """Print the elements of iterable x to stdout. 518 519 Optional arg width (default 70) is the maximum line length. 520 Optional arg indent (default 4) is the number of blanks with which to 521 begin each line. 522 """ 523 524 blanks = ' ' * indent 525 # Print the sorted list: 'x' may be a '--random' list or a set() 526 print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width, 527 initial_indent=blanks, subsequent_indent=blanks)) 528 529 530def main(tests=None, **kwargs): 531 """Run the Python suite.""" 532 Regrtest().main(tests=tests, **kwargs) 533