• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import datetime
2import faulthandler
3import locale
4import os
5import platform
6import random
7import re
8import sys
9import sysconfig
10import tempfile
11import textwrap
12import time
13from test.libregrtest.cmdline import _parse_args
14from test.libregrtest.runtest import (
15    findtests, runtest,
16    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
17    INTERRUPTED, CHILD_ERROR,
18    PROGRESS_MIN_TIME, format_test_result)
19from test.libregrtest.setup import setup_tests
20from test import support
21try:
22    import gc
23except ImportError:
24    gc = None
25
26
27# When tests are run from the Python build directory, it is best practice
28# to keep the test files in a subfolder.  This eases the cleanup of leftover
29# files using the "make distclean" command.
30if sysconfig.is_python_build():
31    TEMPDIR = os.path.join(sysconfig.get_config_var('srcdir'), 'build')
32else:
33    TEMPDIR = tempfile.gettempdir()
34TEMPDIR = os.path.abspath(TEMPDIR)
35
36
37def format_duration(seconds):
38    if seconds < 1.0:
39        return '%.0f ms' % (seconds * 1e3)
40    if seconds < 60.0:
41        return '%.0f sec' % seconds
42
43    minutes, seconds = divmod(seconds, 60.0)
44    return '%.0f min %.0f sec' % (minutes, seconds)
45
46
47class Regrtest:
48    """Execute a test suite.
49
50    This also parses command-line options and modifies its behavior
51    accordingly.
52
53    tests -- a list of strings containing test names (optional)
54    testdir -- the directory in which to look for tests (optional)
55
56    Users other than the Python test suite will certainly want to
57    specify testdir; if it's omitted, the directory containing the
58    Python test suite is searched for.
59
60    If the tests argument is omitted, the tests listed on the
61    command-line will be used.  If that's empty, too, then all *.py
62    files beginning with test_ will be used.
63
64    The other default arguments (verbose, quiet, exclude,
65    single, randomize, findleaks, use_resources, trace, coverdir,
66    print_slow, and random_seed) allow programmers calling main()
67    directly to set the values that would normally be set by flags
68    on the command line.
69    """
70    def __init__(self):
71        # Namespace of command line options
72        self.ns = None
73
74        # tests
75        self.tests = []
76        self.selected = []
77
78        # test results
79        self.good = []
80        self.bad = []
81        self.skipped = []
82        self.resource_denieds = []
83        self.environment_changed = []
84        self.interrupted = False
85
86        # used by --slow
87        self.test_times = []
88
89        # used by --coverage, trace.Trace instance
90        self.tracer = None
91
92        # used by --findleaks, store for gc.garbage
93        self.found_garbage = []
94
95        # used to display the progress bar "[ 3/100]"
96        self.start_time = time.monotonic()
97        self.test_count = ''
98        self.test_count_width = 1
99
100        # used by --single
101        self.next_single_test = None
102        self.next_single_filename = None
103
104    def accumulate_result(self, test, result):
105        ok, test_time = result
106        if ok not in (CHILD_ERROR, INTERRUPTED):
107            self.test_times.append((test_time, test))
108        if ok == PASSED:
109            self.good.append(test)
110        elif ok == FAILED:
111            self.bad.append(test)
112        elif ok == ENV_CHANGED:
113            self.environment_changed.append(test)
114        elif ok == SKIPPED:
115            self.skipped.append(test)
116        elif ok == RESOURCE_DENIED:
117            self.skipped.append(test)
118            self.resource_denieds.append(test)
119
120    def display_progress(self, test_index, test):
121        if self.ns.quiet:
122            return
123        if self.bad and not self.ns.pgo:
124            fmt = "{time} [{test_index:{count_width}}{test_count}/{nbad}] {test_name}"
125        else:
126            fmt = "{time} [{test_index:{count_width}}{test_count}] {test_name}"
127        test_time = time.monotonic() - self.start_time
128        test_time = datetime.timedelta(seconds=int(test_time))
129        line = fmt.format(count_width=self.test_count_width,
130                          test_index=test_index,
131                          test_count=self.test_count,
132                          nbad=len(self.bad),
133                          test_name=test,
134                          time=test_time)
135        print(line, flush=True)
136
137    def parse_args(self, kwargs):
138        ns = _parse_args(sys.argv[1:], **kwargs)
139
140        if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'):
141            print("Warning: The timeout option requires "
142                  "faulthandler.dump_traceback_later", file=sys.stderr)
143            ns.timeout = None
144
145        if ns.threshold is not None and gc is None:
146            print('No GC available, ignore --threshold.', file=sys.stderr)
147            ns.threshold = None
148
149        if ns.findleaks:
150            if gc is not None:
151                # Uncomment the line below to report garbage that is not
152                # freeable by reference counting alone.  By default only
153                # garbage that is not collectable by the GC is reported.
154                pass
155                #gc.set_debug(gc.DEBUG_SAVEALL)
156            else:
157                print('No GC available, disabling --findleaks',
158                      file=sys.stderr)
159                ns.findleaks = False
160
161        # Strip .py extensions.
162        removepy(ns.args)
163
164        return ns
165
166    def find_tests(self, tests):
167        self.tests = tests
168
169        if self.ns.single:
170            self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
171            try:
172                with open(self.next_single_filename, 'r') as fp:
173                    next_test = fp.read().strip()
174                    self.tests = [next_test]
175            except OSError:
176                pass
177
178        if self.ns.fromfile:
179            self.tests = []
180            # regex to match 'test_builtin' in line:
181            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
182            regex = (r'^(?:[0-9]+:[0-9]+:[0-9]+ *)?'
183                     r'(?:\[[0-9/ ]+\] *)?'
184                     r'(test_[a-zA-Z0-9_]+)')
185            regex = re.compile(regex)
186            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
187                for line in fp:
188                    line = line.strip()
189                    if line.startswith('#'):
190                        continue
191                    match = regex.match(line)
192                    if match is None:
193                        continue
194                    self.tests.append(match.group(1))
195
196        removepy(self.tests)
197
198        stdtests = STDTESTS[:]
199        nottests = NOTTESTS.copy()
200        if self.ns.exclude:
201            for arg in self.ns.args:
202                if arg in stdtests:
203                    stdtests.remove(arg)
204                nottests.add(arg)
205            self.ns.args = []
206
207        # if testdir is set, then we are not running the python tests suite, so
208        # don't add default tests to be executed or skipped (pass empty values)
209        if self.ns.testdir:
210            alltests = findtests(self.ns.testdir, list(), set())
211        else:
212            alltests = findtests(self.ns.testdir, stdtests, nottests)
213
214        if not self.ns.fromfile:
215            self.selected = self.tests or self.ns.args or alltests
216        else:
217            self.selected = self.tests
218        if self.ns.single:
219            self.selected = self.selected[:1]
220            try:
221                pos = alltests.index(self.selected[0])
222                self.next_single_test = alltests[pos + 1]
223            except IndexError:
224                pass
225
226        # Remove all the selected tests that precede start if it's set.
227        if self.ns.start:
228            try:
229                del self.selected[:self.selected.index(self.ns.start)]
230            except ValueError:
231                print("Couldn't find starting test (%s), using all tests"
232                      % self.ns.start, file=sys.stderr)
233
234        if self.ns.randomize:
235            if self.ns.random_seed is None:
236                self.ns.random_seed = random.randrange(10000000)
237            random.seed(self.ns.random_seed)
238            random.shuffle(self.selected)
239
240    def list_tests(self):
241        for name in self.selected:
242            print(name)
243
244    def rerun_failed_tests(self):
245        self.ns.verbose = True
246        self.ns.failfast = False
247        self.ns.verbose3 = False
248        self.ns.match_tests = None
249
250        print("Re-running failed tests in verbose mode")
251        for test in self.bad[:]:
252            print("Re-running test %r in verbose mode" % test, flush=True)
253            try:
254                self.ns.verbose = True
255                ok = runtest(self.ns, test)
256            except KeyboardInterrupt:
257                self.interrupted = True
258                # print a newline separate from the ^C
259                print()
260                break
261            else:
262                if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
263                    self.bad.remove(test)
264        else:
265            if self.bad:
266                print(count(len(self.bad), 'test'), "failed again:")
267                printlist(self.bad)
268
269    def display_result(self):
270        if self.interrupted:
271            # print a newline after ^C
272            print()
273            print("Test suite interrupted by signal SIGINT.")
274            executed = set(self.good) | set(self.bad) | set(self.skipped)
275            omitted = set(self.selected) - executed
276            print(count(len(omitted), "test"), "omitted:")
277            printlist(omitted)
278
279        # If running the test suite for PGO then no one cares about
280        # results.
281        if self.ns.pgo:
282            return
283
284        if self.good and not self.ns.quiet:
285            if (not self.bad
286                and not self.skipped
287                and not self.interrupted
288                and len(self.good) > 1):
289                print("All", end=' ')
290            print(count(len(self.good), "test"), "OK.")
291
292        if self.ns.print_slow:
293            self.test_times.sort(reverse=True)
294            print()
295            print("10 slowest tests:")
296            for time, test in self.test_times[:10]:
297                print("- %s: %s" % (test, format_duration(time)))
298
299        if self.bad:
300            print()
301            print(count(len(self.bad), "test"), "failed:")
302            printlist(self.bad)
303
304        if self.environment_changed:
305            print()
306            print("{} altered the execution environment:".format(
307                     count(len(self.environment_changed), "test")))
308            printlist(self.environment_changed)
309
310        if self.skipped and not self.ns.quiet:
311            print()
312            print(count(len(self.skipped), "test"), "skipped:")
313            printlist(self.skipped)
314
315    def run_tests_sequential(self):
316        if self.ns.trace:
317            import trace
318            self.tracer = trace.Trace(trace=False, count=True)
319
320        save_modules = sys.modules.keys()
321
322        print("Run tests sequentially")
323
324        previous_test = None
325        for test_index, test in enumerate(self.tests, 1):
326            start_time = time.monotonic()
327
328            text = test
329            if previous_test:
330                text = '%s -- %s' % (text, previous_test)
331            self.display_progress(test_index, text)
332
333            if self.tracer:
334                # If we're tracing code coverage, then we don't exit with status
335                # if on a false return value from main.
336                cmd = ('result = runtest(self.ns, test); '
337                       'self.accumulate_result(test, result)')
338                ns = dict(locals())
339                self.tracer.runctx(cmd, globals=globals(), locals=ns)
340                result = ns['result']
341            else:
342                try:
343                    result = runtest(self.ns, test)
344                except KeyboardInterrupt:
345                    self.interrupted = True
346                    self.accumulate_result(test, (INTERRUPTED, None))
347                    break
348                else:
349                    self.accumulate_result(test, result)
350
351            previous_test = format_test_result(test, result[0])
352            test_time = time.monotonic() - start_time
353            if test_time >= PROGRESS_MIN_TIME:
354                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
355            elif result[0] == PASSED:
356                # be quiet: say nothing if the test passed shortly
357                previous_test = None
358
359            if self.ns.findleaks:
360                gc.collect()
361                if gc.garbage:
362                    print("Warning: test created", len(gc.garbage), end=' ')
363                    print("uncollectable object(s).")
364                    # move the uncollectable objects somewhere so we don't see
365                    # them again
366                    self.found_garbage.extend(gc.garbage)
367                    del gc.garbage[:]
368
369            # Unload the newly imported modules (best effort finalization)
370            for module in sys.modules.keys():
371                if module not in save_modules and module.startswith("test."):
372                    support.unload(module)
373
374        if previous_test:
375            print(previous_test)
376
377    def _test_forever(self, tests):
378        while True:
379            for test in tests:
380                yield test
381                if self.bad:
382                    return
383
384    def run_tests(self):
385        # For a partial run, we do not need to clutter the output.
386        if (self.ns.verbose
387            or self.ns.header
388            or not (self.ns.pgo or self.ns.quiet or self.ns.single
389                    or self.tests or self.ns.args)):
390            # Print basic platform information
391            print("==", platform.python_implementation(), *sys.version.split())
392            print("==  ", platform.platform(aliased=True),
393                          "%s-endian" % sys.byteorder)
394            print("==  ", "hash algorithm:", sys.hash_info.algorithm,
395                  "64bit" if sys.maxsize > 2**32 else "32bit")
396            print("==  cwd:", os.getcwd())
397            print("==  encodings: locale=%s, FS=%s"
398                  % (locale.getpreferredencoding(False),
399                     sys.getfilesystemencoding()))
400            print("Testing with flags:", sys.flags)
401
402        if self.ns.randomize:
403            print("Using random seed", self.ns.random_seed)
404
405        if self.ns.forever:
406            self.tests = self._test_forever(list(self.selected))
407            self.test_count = ''
408            self.test_count_width = 3
409        else:
410            self.tests = iter(self.selected)
411            self.test_count = '/{}'.format(len(self.selected))
412            self.test_count_width = len(self.test_count) - 1
413
414        if self.ns.use_mp:
415            from test.libregrtest.runtest_mp import run_tests_multiprocess
416            run_tests_multiprocess(self)
417        else:
418            self.run_tests_sequential()
419
420    def finalize(self):
421        if self.next_single_filename:
422            if self.next_single_test:
423                with open(self.next_single_filename, 'w') as fp:
424                    fp.write(self.next_single_test + '\n')
425            else:
426                os.unlink(self.next_single_filename)
427
428        if self.tracer:
429            r = self.tracer.results()
430            r.write_results(show_missing=True, summary=True,
431                            coverdir=self.ns.coverdir)
432
433        print()
434        duration = time.monotonic() - self.start_time
435        print("Total duration: %s" % format_duration(duration))
436
437        if self.bad:
438            result = "FAILURE"
439        elif self.interrupted:
440            result = "INTERRUPTED"
441        else:
442            result = "SUCCESS"
443        print("Tests result: %s" % result)
444
445        if self.ns.runleaks:
446            os.system("leaks %d" % os.getpid())
447
448    def main(self, tests=None, **kwargs):
449        global TEMPDIR
450
451        if sysconfig.is_python_build():
452            try:
453                os.mkdir(TEMPDIR)
454            except FileExistsError:
455                pass
456
457        # Define a writable temp dir that will be used as cwd while running
458        # the tests. The name of the dir includes the pid to allow parallel
459        # testing (see the -j option).
460        test_cwd = 'test_python_{}'.format(os.getpid())
461        test_cwd = os.path.join(TEMPDIR, test_cwd)
462
463        # Run the tests in a context manager that temporarily changes the CWD to a
464        # temporary and writable directory.  If it's not possible to create or
465        # change the CWD, the original CWD will be used.  The original CWD is
466        # available from support.SAVEDCWD.
467        with support.temp_cwd(test_cwd, quiet=True):
468            self._main(tests, kwargs)
469
470    def _main(self, tests, kwargs):
471        self.ns = self.parse_args(kwargs)
472
473        if self.ns.slaveargs is not None:
474            from test.libregrtest.runtest_mp import run_tests_slave
475            run_tests_slave(self.ns.slaveargs)
476
477        if self.ns.wait:
478            input("Press any key to continue...")
479
480        support.PGO = self.ns.pgo
481
482        setup_tests(self.ns)
483
484        self.find_tests(tests)
485
486        if self.ns.list_tests:
487            self.list_tests()
488            sys.exit(0)
489
490        self.run_tests()
491        self.display_result()
492
493        if self.ns.verbose2 and self.bad:
494            self.rerun_failed_tests()
495
496        self.finalize()
497        sys.exit(len(self.bad) > 0 or self.interrupted)
498
499
500def removepy(names):
501    if not names:
502        return
503    for idx, name in enumerate(names):
504        basename, ext = os.path.splitext(name)
505        if ext == '.py':
506            names[idx] = basename
507
508
509def count(n, word):
510    if n == 1:
511        return "%d %s" % (n, word)
512    else:
513        return "%d %ss" % (n, word)
514
515
516def printlist(x, width=70, indent=4):
517    """Print the elements of iterable x to stdout.
518
519    Optional arg width (default 70) is the maximum line length.
520    Optional arg indent (default 4) is the number of blanks with which to
521    begin each line.
522    """
523
524    blanks = ' ' * indent
525    # Print the sorted list: 'x' may be a '--random' list or a set()
526    print(textwrap.fill(' '.join(str(elt) for elt in sorted(x)), width,
527                        initial_indent=blanks, subsequent_indent=blanks))
528
529
530def main(tests=None, **kwargs):
531    """Run the Python suite."""
532    Regrtest().main(tests=tests, **kwargs)
533