• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import datetime
2import faulthandler
3import json
4import locale
5import os
6import platform
7import random
8import re
9import sys
10import sysconfig
11import tempfile
12import time
13import unittest
14from test.libregrtest.cmdline import _parse_args
15from test.libregrtest.runtest import (
16    findtests, runtest, get_abs_module,
17    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
18    INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN,
19    PROGRESS_MIN_TIME, format_test_result)
20from test.libregrtest.setup import setup_tests
21from test.libregrtest.utils import removepy, count, format_duration, printlist
22from test import support
23try:
24    import gc
25except ImportError:
26    gc = None
27
28
29# When tests are run from the Python build directory, it is best practice
30# to keep the test files in a subfolder.  This eases the cleanup of leftover
31# files using the "make distclean" command.
32if sysconfig.is_python_build():
33    TEMPDIR = sysconfig.get_config_var('abs_builddir')
34    if TEMPDIR is None:
35        # bpo-30284: On Windows, only srcdir is available. Using abs_builddir
36        # mostly matters on UNIX when building Python out of the source tree,
37        # especially when the source tree is read only.
38        TEMPDIR = sysconfig.get_config_var('srcdir')
39    TEMPDIR = os.path.join(TEMPDIR, 'build')
40else:
41    TEMPDIR = tempfile.gettempdir()
42TEMPDIR = os.path.abspath(TEMPDIR)
43
44
45class Regrtest:
46    """Execute a test suite.
47
48    This also parses command-line options and modifies its behavior
49    accordingly.
50
51    tests -- a list of strings containing test names (optional)
52    testdir -- the directory in which to look for tests (optional)
53
54    Users other than the Python test suite will certainly want to
55    specify testdir; if it's omitted, the directory containing the
56    Python test suite is searched for.
57
58    If the tests argument is omitted, the tests listed on the
59    command-line will be used.  If that's empty, too, then all *.py
60    files beginning with test_ will be used.
61
62    The other default arguments (verbose, quiet, exclude,
63    single, randomize, findleaks, use_resources, trace, coverdir,
64    print_slow, and random_seed) allow programmers calling main()
65    directly to set the values that would normally be set by flags
66    on the command line.
67    """
68    def __init__(self):
69        # Namespace of command line options
70        self.ns = None
71
72        # tests
73        self.tests = []
74        self.selected = []
75
76        # test results
77        self.good = []
78        self.bad = []
79        self.skipped = []
80        self.resource_denieds = []
81        self.environment_changed = []
82        self.rerun = []
83        self.run_no_tests = []
84        self.first_result = None
85        self.interrupted = False
86
87        # used by --slow
88        self.test_times = []
89
90        # used by --coverage, trace.Trace instance
91        self.tracer = None
92
93        # used by --findleaks, store for gc.garbage
94        self.found_garbage = []
95
96        # used to display the progress bar "[ 3/100]"
97        self.start_time = time.monotonic()
98        self.test_count = ''
99        self.test_count_width = 1
100
101        # used by --single
102        self.next_single_test = None
103        self.next_single_filename = None
104
105        # used by --junit-xml
106        self.testsuite_xml = None
107
108    def accumulate_result(self, test, result):
109        ok, test_time, xml_data = result
110        if ok not in (CHILD_ERROR, INTERRUPTED):
111            self.test_times.append((test_time, test))
112        if ok == PASSED:
113            self.good.append(test)
114        elif ok in (FAILED, CHILD_ERROR):
115            self.bad.append(test)
116        elif ok == ENV_CHANGED:
117            self.environment_changed.append(test)
118        elif ok == SKIPPED:
119            self.skipped.append(test)
120        elif ok == RESOURCE_DENIED:
121            self.skipped.append(test)
122            self.resource_denieds.append(test)
123        elif ok == TEST_DID_NOT_RUN:
124            self.run_no_tests.append(test)
125        elif ok != INTERRUPTED:
126            raise ValueError("invalid test result: %r" % ok)
127
128        if xml_data:
129            import xml.etree.ElementTree as ET
130            for e in xml_data:
131                try:
132                    self.testsuite_xml.append(ET.fromstring(e))
133                except ET.ParseError:
134                    print(xml_data, file=sys.__stderr__)
135                    raise
136
137    def display_progress(self, test_index, test):
138        if self.ns.quiet:
139            return
140
141        # "[ 51/405/1] test_tcl passed"
142        line = f"{test_index:{self.test_count_width}}{self.test_count}"
143        fails = len(self.bad) + len(self.environment_changed)
144        if fails and not self.ns.pgo:
145            line = f"{line}/{fails}"
146        line = f"[{line}] {test}"
147
148        # add the system load prefix: "load avg: 1.80 "
149        if hasattr(os, 'getloadavg'):
150            load_avg_1min = os.getloadavg()[0]
151            line = f"load avg: {load_avg_1min:.2f} {line}"
152
153        # add the timestamp prefix:  "0:01:05 "
154        test_time = time.monotonic() - self.start_time
155        test_time = datetime.timedelta(seconds=int(test_time))
156        line = f"{test_time} {line}"
157        print(line, flush=True)
158
159    def parse_args(self, kwargs):
160        ns = _parse_args(sys.argv[1:], **kwargs)
161
162        if ns.timeout and not hasattr(faulthandler, 'dump_traceback_later'):
163            print("Warning: The timeout option requires "
164                  "faulthandler.dump_traceback_later", file=sys.stderr)
165            ns.timeout = None
166
167        if ns.threshold is not None and gc is None:
168            print('No GC available, ignore --threshold.', file=sys.stderr)
169            ns.threshold = None
170
171        if ns.findleaks:
172            if gc is not None:
173                # Uncomment the line below to report garbage that is not
174                # freeable by reference counting alone.  By default only
175                # garbage that is not collectable by the GC is reported.
176                pass
177                #gc.set_debug(gc.DEBUG_SAVEALL)
178            else:
179                print('No GC available, disabling --findleaks',
180                      file=sys.stderr)
181                ns.findleaks = False
182
183        if ns.xmlpath:
184            support.junit_xml_list = self.testsuite_xml = []
185
186        # Strip .py extensions.
187        removepy(ns.args)
188
189        return ns
190
191    def find_tests(self, tests):
192        self.tests = tests
193
194        if self.ns.single:
195            self.next_single_filename = os.path.join(TEMPDIR, 'pynexttest')
196            try:
197                with open(self.next_single_filename, 'r') as fp:
198                    next_test = fp.read().strip()
199                    self.tests = [next_test]
200            except OSError:
201                pass
202
203        if self.ns.fromfile:
204            self.tests = []
205            # regex to match 'test_builtin' in line:
206            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
207            regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
208            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
209                for line in fp:
210                    line = line.split('#', 1)[0]
211                    line = line.strip()
212                    match = regex.search(line)
213                    if match is not None:
214                        self.tests.append(match.group())
215
216        removepy(self.tests)
217
218        stdtests = STDTESTS[:]
219        nottests = NOTTESTS.copy()
220        if self.ns.exclude:
221            for arg in self.ns.args:
222                if arg in stdtests:
223                    stdtests.remove(arg)
224                nottests.add(arg)
225            self.ns.args = []
226
227        # if testdir is set, then we are not running the python tests suite, so
228        # don't add default tests to be executed or skipped (pass empty values)
229        if self.ns.testdir:
230            alltests = findtests(self.ns.testdir, list(), set())
231        else:
232            alltests = findtests(self.ns.testdir, stdtests, nottests)
233
234        if not self.ns.fromfile:
235            self.selected = self.tests or self.ns.args or alltests
236        else:
237            self.selected = self.tests
238        if self.ns.single:
239            self.selected = self.selected[:1]
240            try:
241                pos = alltests.index(self.selected[0])
242                self.next_single_test = alltests[pos + 1]
243            except IndexError:
244                pass
245
246        # Remove all the selected tests that precede start if it's set.
247        if self.ns.start:
248            try:
249                del self.selected[:self.selected.index(self.ns.start)]
250            except ValueError:
251                print("Couldn't find starting test (%s), using all tests"
252                      % self.ns.start, file=sys.stderr)
253
254        if self.ns.randomize:
255            if self.ns.random_seed is None:
256                self.ns.random_seed = random.randrange(10000000)
257            random.seed(self.ns.random_seed)
258            random.shuffle(self.selected)
259
260    def list_tests(self):
261        for name in self.selected:
262            print(name)
263
264    def _list_cases(self, suite):
265        for test in suite:
266            if isinstance(test, unittest.loader._FailedTest):
267                continue
268            if isinstance(test, unittest.TestSuite):
269                self._list_cases(test)
270            elif isinstance(test, unittest.TestCase):
271                if support.match_test(test):
272                    print(test.id())
273
274    def list_cases(self):
275        support.verbose = False
276        support.set_match_tests(self.ns.match_tests)
277
278        for test in self.selected:
279            abstest = get_abs_module(self.ns, test)
280            try:
281                suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
282                self._list_cases(suite)
283            except unittest.SkipTest:
284                self.skipped.append(test)
285
286        if self.skipped:
287            print(file=sys.stderr)
288            print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
289            printlist(self.skipped, file=sys.stderr)
290
291    def rerun_failed_tests(self):
292        self.ns.verbose = True
293        self.ns.failfast = False
294        self.ns.verbose3 = False
295
296        self.first_result = self.get_tests_result()
297
298        print()
299        print("Re-running failed tests in verbose mode")
300        self.rerun = self.bad[:]
301        for test in self.rerun:
302            print("Re-running test %r in verbose mode" % test, flush=True)
303            try:
304                self.ns.verbose = True
305                ok = runtest(self.ns, test)
306            except KeyboardInterrupt:
307                self.interrupted = True
308                # print a newline separate from the ^C
309                print()
310                break
311            else:
312                if ok[0] in {PASSED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED}:
313                    self.bad.remove(test)
314        else:
315            if self.bad:
316                print(count(len(self.bad), 'test'), "failed again:")
317                printlist(self.bad)
318
319        self.display_result()
320
321    def display_result(self):
322        # If running the test suite for PGO then no one cares about results.
323        if self.ns.pgo:
324            return
325
326        print()
327        print("== Tests result: %s ==" % self.get_tests_result())
328
329        if self.interrupted:
330            print()
331            # print a newline after ^C
332            print("Test suite interrupted by signal SIGINT.")
333            executed = set(self.good) | set(self.bad) | set(self.skipped)
334            omitted = set(self.selected) - executed
335            print(count(len(omitted), "test"), "omitted:")
336            printlist(omitted)
337
338        if self.good and not self.ns.quiet:
339            print()
340            if (not self.bad
341                and not self.skipped
342                and not self.interrupted
343                and len(self.good) > 1):
344                print("All", end=' ')
345            print(count(len(self.good), "test"), "OK.")
346
347        if self.ns.print_slow:
348            self.test_times.sort(reverse=True)
349            print()
350            print("10 slowest tests:")
351            for time, test in self.test_times[:10]:
352                print("- %s: %s" % (test, format_duration(time)))
353
354        if self.bad:
355            print()
356            print(count(len(self.bad), "test"), "failed:")
357            printlist(self.bad)
358
359        if self.environment_changed:
360            print()
361            print("{} altered the execution environment:".format(
362                     count(len(self.environment_changed), "test")))
363            printlist(self.environment_changed)
364
365        if self.skipped and not self.ns.quiet:
366            print()
367            print(count(len(self.skipped), "test"), "skipped:")
368            printlist(self.skipped)
369
370        if self.rerun:
371            print()
372            print("%s:" % count(len(self.rerun), "re-run test"))
373            printlist(self.rerun)
374
375        if self.run_no_tests:
376            print()
377            print(count(len(self.run_no_tests), "test"), "run no tests:")
378            printlist(self.run_no_tests)
379
380    def run_tests_sequential(self):
381        if self.ns.trace:
382            import trace
383            self.tracer = trace.Trace(trace=False, count=True)
384
385        save_modules = sys.modules.keys()
386
387        print("Run tests sequentially")
388
389        previous_test = None
390        for test_index, test in enumerate(self.tests, 1):
391            start_time = time.monotonic()
392
393            text = test
394            if previous_test:
395                text = '%s -- %s' % (text, previous_test)
396            self.display_progress(test_index, text)
397
398            if self.tracer:
399                # If we're tracing code coverage, then we don't exit with status
400                # if on a false return value from main.
401                cmd = ('result = runtest(self.ns, test); '
402                       'self.accumulate_result(test, result)')
403                ns = dict(locals())
404                self.tracer.runctx(cmd, globals=globals(), locals=ns)
405                result = ns['result']
406            else:
407                try:
408                    result = runtest(self.ns, test)
409                except KeyboardInterrupt:
410                    self.interrupted = True
411                    self.accumulate_result(test, (INTERRUPTED, None, None))
412                    break
413                else:
414                    self.accumulate_result(test, result)
415
416            previous_test = format_test_result(test, result[0])
417            test_time = time.monotonic() - start_time
418            if test_time >= PROGRESS_MIN_TIME:
419                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
420            elif result[0] == PASSED:
421                # be quiet: say nothing if the test passed shortly
422                previous_test = None
423
424            if self.ns.findleaks:
425                gc.collect()
426                if gc.garbage:
427                    print("Warning: test created", len(gc.garbage), end=' ')
428                    print("uncollectable object(s).")
429                    # move the uncollectable objects somewhere so we don't see
430                    # them again
431                    self.found_garbage.extend(gc.garbage)
432                    del gc.garbage[:]
433
434            # Unload the newly imported modules (best effort finalization)
435            for module in sys.modules.keys():
436                if module not in save_modules and module.startswith("test."):
437                    support.unload(module)
438
439        if previous_test:
440            print(previous_test)
441
442    def _test_forever(self, tests):
443        while True:
444            for test in tests:
445                yield test
446                if self.bad:
447                    return
448                if self.ns.fail_env_changed and self.environment_changed:
449                    return
450
451    def display_header(self):
452        # Print basic platform information
453        print("==", platform.python_implementation(), *sys.version.split())
454        print("==", platform.platform(aliased=True),
455                      "%s-endian" % sys.byteorder)
456        print("== cwd:", os.getcwd())
457        cpu_count = os.cpu_count()
458        if cpu_count:
459            print("== CPU count:", cpu_count)
460        print("== encodings: locale=%s, FS=%s"
461              % (locale.getpreferredencoding(False),
462                 sys.getfilesystemencoding()))
463
464    def get_tests_result(self):
465        result = []
466        if self.bad:
467            result.append("FAILURE")
468        elif self.ns.fail_env_changed and self.environment_changed:
469            result.append("ENV CHANGED")
470        elif not any((self.good, self.bad, self.skipped, self.interrupted,
471            self.environment_changed)):
472            result.append("NO TEST RUN")
473
474        if self.interrupted:
475            result.append("INTERRUPTED")
476
477        if not result:
478            result.append("SUCCESS")
479
480        result = ', '.join(result)
481        if self.first_result:
482            result = '%s then %s' % (self.first_result, result)
483        return result
484
485    def run_tests(self):
486        # For a partial run, we do not need to clutter the output.
487        if (self.ns.header
488            or not(self.ns.pgo or self.ns.quiet or self.ns.single
489                   or self.tests or self.ns.args)):
490            self.display_header()
491
492        if self.ns.huntrleaks:
493            warmup, repetitions, _ = self.ns.huntrleaks
494            if warmup < 3:
495                msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
496                        "3 warmup repetitions can give false positives!")
497                print(msg, file=sys.stdout, flush=True)
498
499        if self.ns.randomize:
500            print("Using random seed", self.ns.random_seed)
501
502        if self.ns.forever:
503            self.tests = self._test_forever(list(self.selected))
504            self.test_count = ''
505            self.test_count_width = 3
506        else:
507            self.tests = iter(self.selected)
508            self.test_count = '/{}'.format(len(self.selected))
509            self.test_count_width = len(self.test_count) - 1
510
511        if self.ns.use_mp:
512            from test.libregrtest.runtest_mp import run_tests_multiprocess
513            run_tests_multiprocess(self)
514        else:
515            self.run_tests_sequential()
516
517    def finalize(self):
518        if self.next_single_filename:
519            if self.next_single_test:
520                with open(self.next_single_filename, 'w') as fp:
521                    fp.write(self.next_single_test + '\n')
522            else:
523                os.unlink(self.next_single_filename)
524
525        if self.tracer:
526            r = self.tracer.results()
527            r.write_results(show_missing=True, summary=True,
528                            coverdir=self.ns.coverdir)
529
530        print()
531        duration = time.monotonic() - self.start_time
532        print("Total duration: %s" % format_duration(duration))
533        print("Tests result: %s" % self.get_tests_result())
534
535        if self.ns.runleaks:
536            os.system("leaks %d" % os.getpid())
537
538    def save_xml_result(self):
539        if not self.ns.xmlpath and not self.testsuite_xml:
540            return
541
542        import xml.etree.ElementTree as ET
543        root = ET.Element("testsuites")
544
545        # Manually count the totals for the overall summary
546        totals = {'tests': 0, 'errors': 0, 'failures': 0}
547        for suite in self.testsuite_xml:
548            root.append(suite)
549            for k in totals:
550                try:
551                    totals[k] += int(suite.get(k, 0))
552                except ValueError:
553                    pass
554
555        for k, v in totals.items():
556            root.set(k, str(v))
557
558        xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
559        with open(xmlpath, 'wb') as f:
560            for s in ET.tostringlist(root):
561                f.write(s)
562
563    def main(self, tests=None, **kwargs):
564        global TEMPDIR
565        self.ns = self.parse_args(kwargs)
566
567        if self.ns.tempdir:
568            TEMPDIR = self.ns.tempdir
569        elif self.ns.worker_args:
570            ns_dict, _ = json.loads(self.ns.worker_args)
571            TEMPDIR = ns_dict.get("tempdir") or TEMPDIR
572
573        os.makedirs(TEMPDIR, exist_ok=True)
574
575        # Define a writable temp dir that will be used as cwd while running
576        # the tests. The name of the dir includes the pid to allow parallel
577        # testing (see the -j option).
578        test_cwd = 'test_python_{}'.format(os.getpid())
579        test_cwd = os.path.join(TEMPDIR, test_cwd)
580
581        # Run the tests in a context manager that temporarily changes the CWD to a
582        # temporary and writable directory.  If it's not possible to create or
583        # change the CWD, the original CWD will be used.  The original CWD is
584        # available from support.SAVEDCWD.
585        with support.temp_cwd(test_cwd, quiet=True):
586            self._main(tests, kwargs)
587
588    def _main(self, tests, kwargs):
589        if self.ns.huntrleaks:
590            warmup, repetitions, _ = self.ns.huntrleaks
591            if warmup < 1 or repetitions < 1:
592                msg = ("Invalid values for the --huntrleaks/-R parameters. The "
593                       "number of warmups and repetitions must be at least 1 "
594                       "each (1:1).")
595                print(msg, file=sys.stderr, flush=True)
596                sys.exit(2)
597
598        if self.ns.worker_args is not None:
599            from test.libregrtest.runtest_mp import run_tests_worker
600            run_tests_worker(self.ns.worker_args)
601
602        if self.ns.wait:
603            input("Press any key to continue...")
604
605        support.PGO = self.ns.pgo
606
607        setup_tests(self.ns)
608
609        self.find_tests(tests)
610
611        if self.ns.list_tests:
612            self.list_tests()
613            sys.exit(0)
614
615        if self.ns.list_cases:
616            self.list_cases()
617            sys.exit(0)
618
619        self.run_tests()
620        self.display_result()
621
622        if self.ns.verbose2 and self.bad:
623            self.rerun_failed_tests()
624
625        self.finalize()
626
627        self.save_xml_result()
628
629        if self.bad:
630            sys.exit(2)
631        if self.interrupted:
632            sys.exit(130)
633        if self.ns.fail_env_changed and self.environment_changed:
634            sys.exit(3)
635        sys.exit(0)
636
637
638def main(tests=None, **kwargs):
639    """Run the Python suite."""
640    Regrtest().main(tests=tests, **kwargs)
641