• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import faulthandler
2import locale
3import os
4import platform
5import random
6import re
7import sys
8import sysconfig
9import tempfile
10import time
11import unittest
12from test.libregrtest.cmdline import _parse_args
13from test.libregrtest.runtest import (
14    findtests, runtest, get_abs_module, is_failed,
15    STDTESTS, NOTTESTS, PROGRESS_MIN_TIME,
16    Passed, Failed, EnvChanged, Skipped, ResourceDenied, Interrupted,
17    ChildError, DidNotRun)
18from test.libregrtest.setup import setup_tests
19from test.libregrtest.pgo import setup_pgo_tests
20from test.libregrtest.utils import removepy, count, format_duration, printlist
21from test import support
22from test.support import os_helper
23
24
25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
26# Used to protect against threading._shutdown() hang.
27# Must be smaller than buildbot "1200 seconds without output" limit.
28EXIT_TIMEOUT = 120.0
29
30
31class Regrtest:
32    """Execute a test suite.
33
34    This also parses command-line options and modifies its behavior
35    accordingly.
36
37    tests -- a list of strings containing test names (optional)
38    testdir -- the directory in which to look for tests (optional)
39
40    Users other than the Python test suite will certainly want to
41    specify testdir; if it's omitted, the directory containing the
42    Python test suite is searched for.
43
44    If the tests argument is omitted, the tests listed on the
45    command-line will be used.  If that's empty, too, then all *.py
46    files beginning with test_ will be used.
47
48    The other default arguments (verbose, quiet, exclude,
49    single, randomize, findleaks, use_resources, trace, coverdir,
50    print_slow, and random_seed) allow programmers calling main()
51    directly to set the values that would normally be set by flags
52    on the command line.
53    """
54    def __init__(self):
55        # Namespace of command line options
56        self.ns = None
57
58        # tests
59        self.tests = []
60        self.selected = []
61
62        # test results
63        self.good = []
64        self.bad = []
65        self.skipped = []
66        self.resource_denieds = []
67        self.environment_changed = []
68        self.run_no_tests = []
69        self.need_rerun = []
70        self.rerun = []
71        self.first_result = None
72        self.interrupted = False
73
74        # used by --slow
75        self.test_times = []
76
77        # used by --coverage, trace.Trace instance
78        self.tracer = None
79
80        # used to display the progress bar "[ 3/100]"
81        self.start_time = time.monotonic()
82        self.test_count = ''
83        self.test_count_width = 1
84
85        # used by --single
86        self.next_single_test = None
87        self.next_single_filename = None
88
89        # used by --junit-xml
90        self.testsuite_xml = None
91
92        # misc
93        self.win_load_tracker = None
94        self.tmp_dir = None
95        self.worker_test_name = None
96
97    def get_executed(self):
98        return (set(self.good) | set(self.bad) | set(self.skipped)
99                | set(self.resource_denieds) | set(self.environment_changed)
100                | set(self.run_no_tests))
101
102    def accumulate_result(self, result, rerun=False):
103        test_name = result.name
104
105        if not isinstance(result, (ChildError, Interrupted)) and not rerun:
106            self.test_times.append((result.duration_sec, test_name))
107
108        if isinstance(result, Passed):
109            self.good.append(test_name)
110        elif isinstance(result, ResourceDenied):
111            self.skipped.append(test_name)
112            self.resource_denieds.append(test_name)
113        elif isinstance(result, Skipped):
114            self.skipped.append(test_name)
115        elif isinstance(result, EnvChanged):
116            self.environment_changed.append(test_name)
117        elif isinstance(result, Failed):
118            if not rerun:
119                self.bad.append(test_name)
120                self.need_rerun.append(result)
121        elif isinstance(result, DidNotRun):
122            self.run_no_tests.append(test_name)
123        elif isinstance(result, Interrupted):
124            self.interrupted = True
125        else:
126            raise ValueError("invalid test result: %r" % result)
127
128        if rerun and not isinstance(result, (Failed, Interrupted)):
129            self.bad.remove(test_name)
130
131        xml_data = result.xml_data
132        if xml_data:
133            import xml.etree.ElementTree as ET
134            for e in xml_data:
135                try:
136                    self.testsuite_xml.append(ET.fromstring(e))
137                except ET.ParseError:
138                    print(xml_data, file=sys.__stderr__)
139                    raise
140
141    def log(self, line=''):
142        empty = not line
143
144        # add the system load prefix: "load avg: 1.80 "
145        load_avg = self.getloadavg()
146        if load_avg is not None:
147            line = f"load avg: {load_avg:.2f} {line}"
148
149        # add the timestamp prefix:  "0:01:05 "
150        test_time = time.monotonic() - self.start_time
151
152        mins, secs = divmod(int(test_time), 60)
153        hours, mins = divmod(mins, 60)
154        test_time = "%d:%02d:%02d" % (hours, mins, secs)
155
156        line = f"{test_time} {line}"
157        if empty:
158            line = line[:-1]
159
160        print(line, flush=True)
161
162    def display_progress(self, test_index, text):
163        if self.ns.quiet:
164            return
165
166        # "[ 51/405/1] test_tcl passed"
167        line = f"{test_index:{self.test_count_width}}{self.test_count}"
168        fails = len(self.bad) + len(self.environment_changed)
169        if fails and not self.ns.pgo:
170            line = f"{line}/{fails}"
171        self.log(f"[{line}] {text}")
172
173    def parse_args(self, kwargs):
174        ns = _parse_args(sys.argv[1:], **kwargs)
175
176        if ns.xmlpath:
177            support.junit_xml_list = self.testsuite_xml = []
178
179        worker_args = ns.worker_args
180        if worker_args is not None:
181            from test.libregrtest.runtest_mp import parse_worker_args
182            ns, test_name = parse_worker_args(ns.worker_args)
183            ns.worker_args = worker_args
184            self.worker_test_name = test_name
185
186        # Strip .py extensions.
187        removepy(ns.args)
188
189        if ns.huntrleaks:
190            warmup, repetitions, _ = ns.huntrleaks
191            if warmup < 1 or repetitions < 1:
192                msg = ("Invalid values for the --huntrleaks/-R parameters. The "
193                       "number of warmups and repetitions must be at least 1 "
194                       "each (1:1).")
195                print(msg, file=sys.stderr, flush=True)
196                sys.exit(2)
197
198        if ns.tempdir:
199            ns.tempdir = os.path.expanduser(ns.tempdir)
200
201        self.ns = ns
202
203    def find_tests(self, tests):
204        self.tests = tests
205
206        if self.ns.single:
207            self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
208            try:
209                with open(self.next_single_filename, 'r') as fp:
210                    next_test = fp.read().strip()
211                    self.tests = [next_test]
212            except OSError:
213                pass
214
215        if self.ns.fromfile:
216            self.tests = []
217            # regex to match 'test_builtin' in line:
218            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
219            regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
220            with open(os.path.join(os_helper.SAVEDCWD, self.ns.fromfile)) as fp:
221                for line in fp:
222                    line = line.split('#', 1)[0]
223                    line = line.strip()
224                    match = regex.search(line)
225                    if match is not None:
226                        self.tests.append(match.group())
227
228        removepy(self.tests)
229
230        if self.ns.pgo:
231            # add default PGO tests if no tests are specified
232            setup_pgo_tests(self.ns)
233
234        stdtests = STDTESTS[:]
235        nottests = NOTTESTS.copy()
236        if self.ns.exclude:
237            for arg in self.ns.args:
238                if arg in stdtests:
239                    stdtests.remove(arg)
240                nottests.add(arg)
241            self.ns.args = []
242
243        # if testdir is set, then we are not running the python tests suite, so
244        # don't add default tests to be executed or skipped (pass empty values)
245        if self.ns.testdir:
246            alltests = findtests(self.ns.testdir, list(), set())
247        else:
248            alltests = findtests(self.ns.testdir, stdtests, nottests)
249
250        if not self.ns.fromfile:
251            self.selected = self.tests or self.ns.args or alltests
252        else:
253            self.selected = self.tests
254        if self.ns.single:
255            self.selected = self.selected[:1]
256            try:
257                pos = alltests.index(self.selected[0])
258                self.next_single_test = alltests[pos + 1]
259            except IndexError:
260                pass
261
262        # Remove all the selected tests that precede start if it's set.
263        if self.ns.start:
264            try:
265                del self.selected[:self.selected.index(self.ns.start)]
266            except ValueError:
267                print("Couldn't find starting test (%s), using all tests"
268                      % self.ns.start, file=sys.stderr)
269
270        if self.ns.randomize:
271            if self.ns.random_seed is None:
272                self.ns.random_seed = random.randrange(10000000)
273            random.seed(self.ns.random_seed)
274            random.shuffle(self.selected)
275
276    def list_tests(self):
277        for name in self.selected:
278            print(name)
279
280    def _list_cases(self, suite):
281        for test in suite:
282            if isinstance(test, unittest.loader._FailedTest):
283                continue
284            if isinstance(test, unittest.TestSuite):
285                self._list_cases(test)
286            elif isinstance(test, unittest.TestCase):
287                if support.match_test(test):
288                    print(test.id())
289
290    def list_cases(self):
291        support.verbose = False
292        support.set_match_tests(self.ns.match_tests, self.ns.ignore_tests)
293
294        for test_name in self.selected:
295            abstest = get_abs_module(self.ns, test_name)
296            try:
297                suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
298                self._list_cases(suite)
299            except unittest.SkipTest:
300                self.skipped.append(test_name)
301
302        if self.skipped:
303            print(file=sys.stderr)
304            print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
305            printlist(self.skipped, file=sys.stderr)
306
307    def rerun_failed_tests(self):
308        self.ns.verbose = True
309        self.ns.failfast = False
310        self.ns.verbose3 = False
311
312        self.first_result = self.get_tests_result()
313
314        self.log()
315        self.log("Re-running failed tests in verbose mode")
316        rerun_list = list(self.need_rerun)
317        self.need_rerun.clear()
318        for result in rerun_list:
319            test_name = result.name
320            self.rerun.append(test_name)
321
322            errors = result.errors or []
323            failures = result.failures or []
324            error_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in errors]
325            failure_names = [test_full_name.split(" ")[0] for (test_full_name, *_) in failures]
326            self.ns.verbose = True
327            orig_match_tests = self.ns.match_tests
328            if errors or failures:
329                if self.ns.match_tests is None:
330                    self.ns.match_tests = []
331                self.ns.match_tests.extend(error_names)
332                self.ns.match_tests.extend(failure_names)
333                matching = "matching: " + ", ".join(self.ns.match_tests)
334                self.log(f"Re-running {test_name} in verbose mode ({matching})")
335            else:
336                self.log(f"Re-running {test_name} in verbose mode")
337            result = runtest(self.ns, test_name)
338            self.ns.match_tests = orig_match_tests
339
340            self.accumulate_result(result, rerun=True)
341
342            if isinstance(result, Interrupted):
343                break
344
345        if self.bad:
346            print(count(len(self.bad), 'test'), "failed again:")
347            printlist(self.bad)
348
349        self.display_result()
350
351    def display_result(self):
352        # If running the test suite for PGO then no one cares about results.
353        if self.ns.pgo:
354            return
355
356        print()
357        print("== Tests result: %s ==" % self.get_tests_result())
358
359        if self.interrupted:
360            print("Test suite interrupted by signal SIGINT.")
361
362        omitted = set(self.selected) - self.get_executed()
363        if omitted:
364            print()
365            print(count(len(omitted), "test"), "omitted:")
366            printlist(omitted)
367
368        if self.good and not self.ns.quiet:
369            print()
370            if (not self.bad
371                and not self.skipped
372                and not self.interrupted
373                and len(self.good) > 1):
374                print("All", end=' ')
375            print(count(len(self.good), "test"), "OK.")
376
377        if self.ns.print_slow:
378            self.test_times.sort(reverse=True)
379            print()
380            print("10 slowest tests:")
381            for test_time, test in self.test_times[:10]:
382                print("- %s: %s" % (test, format_duration(test_time)))
383
384        if self.bad:
385            print()
386            print(count(len(self.bad), "test"), "failed:")
387            printlist(self.bad)
388
389        if self.environment_changed:
390            print()
391            print("{} altered the execution environment:".format(
392                     count(len(self.environment_changed), "test")))
393            printlist(self.environment_changed)
394
395        if self.skipped and not self.ns.quiet:
396            print()
397            print(count(len(self.skipped), "test"), "skipped:")
398            printlist(self.skipped)
399
400        if self.rerun:
401            print()
402            print("%s:" % count(len(self.rerun), "re-run test"))
403            printlist(self.rerun)
404
405        if self.run_no_tests:
406            print()
407            print(count(len(self.run_no_tests), "test"), "run no tests:")
408            printlist(self.run_no_tests)
409
410    def run_tests_sequential(self):
411        if self.ns.trace:
412            import trace
413            self.tracer = trace.Trace(trace=False, count=True)
414
415        save_modules = sys.modules.keys()
416
417        msg = "Run tests sequentially"
418        if self.ns.timeout:
419            msg += " (timeout: %s)" % format_duration(self.ns.timeout)
420        self.log(msg)
421
422        previous_test = None
423        for test_index, test_name in enumerate(self.tests, 1):
424            start_time = time.monotonic()
425
426            text = test_name
427            if previous_test:
428                text = '%s -- %s' % (text, previous_test)
429            self.display_progress(test_index, text)
430
431            if self.tracer:
432                # If we're tracing code coverage, then we don't exit with status
433                # if on a false return value from main.
434                cmd = ('result = runtest(self.ns, test_name); '
435                       'self.accumulate_result(result)')
436                ns = dict(locals())
437                self.tracer.runctx(cmd, globals=globals(), locals=ns)
438                result = ns['result']
439            else:
440                result = runtest(self.ns, test_name)
441                self.accumulate_result(result)
442
443            if isinstance(result, Interrupted):
444                break
445
446            previous_test = str(result)
447            test_time = time.monotonic() - start_time
448            if test_time >= PROGRESS_MIN_TIME:
449                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
450            elif isinstance(result, Passed):
451                # be quiet: say nothing if the test passed shortly
452                previous_test = None
453
454            # Unload the newly imported modules (best effort finalization)
455            for module in sys.modules.keys():
456                if module not in save_modules and module.startswith("test."):
457                    support.unload(module)
458
459            if self.ns.failfast and is_failed(result, self.ns):
460                break
461
462        if previous_test:
463            print(previous_test)
464
465    def _test_forever(self, tests):
466        while True:
467            for test_name in tests:
468                yield test_name
469                if self.bad:
470                    return
471                if self.ns.fail_env_changed and self.environment_changed:
472                    return
473
474    def display_header(self):
475        # Print basic platform information
476        print("==", platform.python_implementation(), *sys.version.split())
477        print("==", platform.platform(aliased=True),
478                      "%s-endian" % sys.byteorder)
479        print("== cwd:", os.getcwd())
480        cpu_count = os.cpu_count()
481        if cpu_count:
482            print("== CPU count:", cpu_count)
483        print("== encodings: locale=%s, FS=%s"
484              % (locale.getpreferredencoding(False),
485                 sys.getfilesystemencoding()))
486
487    def get_tests_result(self):
488        result = []
489        if self.bad:
490            result.append("FAILURE")
491        elif self.ns.fail_env_changed and self.environment_changed:
492            result.append("ENV CHANGED")
493        elif not any((self.good, self.bad, self.skipped, self.interrupted,
494            self.environment_changed)):
495            result.append("NO TEST RUN")
496
497        if self.interrupted:
498            result.append("INTERRUPTED")
499
500        if not result:
501            result.append("SUCCESS")
502
503        result = ', '.join(result)
504        if self.first_result:
505            result = '%s then %s' % (self.first_result, result)
506        return result
507
508    def run_tests(self):
509        # For a partial run, we do not need to clutter the output.
510        if (self.ns.header
511            or not(self.ns.pgo or self.ns.quiet or self.ns.single
512                   or self.tests or self.ns.args)):
513            self.display_header()
514
515        if self.ns.huntrleaks:
516            warmup, repetitions, _ = self.ns.huntrleaks
517            if warmup < 3:
518                msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
519                        "3 warmup repetitions can give false positives!")
520                print(msg, file=sys.stdout, flush=True)
521
522        if self.ns.randomize:
523            print("Using random seed", self.ns.random_seed)
524
525        if self.ns.forever:
526            self.tests = self._test_forever(list(self.selected))
527            self.test_count = ''
528            self.test_count_width = 3
529        else:
530            self.tests = iter(self.selected)
531            self.test_count = '/{}'.format(len(self.selected))
532            self.test_count_width = len(self.test_count) - 1
533
534        if self.ns.use_mp:
535            from test.libregrtest.runtest_mp import run_tests_multiprocess
536            run_tests_multiprocess(self)
537        else:
538            self.run_tests_sequential()
539
540    def finalize(self):
541        if self.next_single_filename:
542            if self.next_single_test:
543                with open(self.next_single_filename, 'w') as fp:
544                    fp.write(self.next_single_test + '\n')
545            else:
546                os.unlink(self.next_single_filename)
547
548        if self.tracer:
549            r = self.tracer.results()
550            r.write_results(show_missing=True, summary=True,
551                            coverdir=self.ns.coverdir)
552
553        print()
554        duration = time.monotonic() - self.start_time
555        print("Total duration: %s" % format_duration(duration))
556        print("Tests result: %s" % self.get_tests_result())
557
558        if self.ns.runleaks:
559            os.system("leaks %d" % os.getpid())
560
561    def save_xml_result(self):
562        if not self.ns.xmlpath and not self.testsuite_xml:
563            return
564
565        import xml.etree.ElementTree as ET
566        root = ET.Element("testsuites")
567
568        # Manually count the totals for the overall summary
569        totals = {'tests': 0, 'errors': 0, 'failures': 0}
570        for suite in self.testsuite_xml:
571            root.append(suite)
572            for k in totals:
573                try:
574                    totals[k] += int(suite.get(k, 0))
575                except ValueError:
576                    pass
577
578        for k, v in totals.items():
579            root.set(k, str(v))
580
581        xmlpath = os.path.join(os_helper.SAVEDCWD, self.ns.xmlpath)
582        with open(xmlpath, 'wb') as f:
583            for s in ET.tostringlist(root):
584                f.write(s)
585
586    def set_temp_dir(self):
587        if self.ns.tempdir:
588            self.tmp_dir = self.ns.tempdir
589
590        if not self.tmp_dir:
591            # When tests are run from the Python build directory, it is best practice
592            # to keep the test files in a subfolder.  This eases the cleanup of leftover
593            # files using the "make distclean" command.
594            if sysconfig.is_python_build():
595                self.tmp_dir = sysconfig.get_config_var('abs_builddir')
596                if self.tmp_dir is None:
597                    # bpo-30284: On Windows, only srcdir is available. Using
598                    # abs_builddir mostly matters on UNIX when building Python
599                    # out of the source tree, especially when the source tree
600                    # is read only.
601                    self.tmp_dir = sysconfig.get_config_var('srcdir')
602                self.tmp_dir = os.path.join(self.tmp_dir, 'build')
603            else:
604                self.tmp_dir = tempfile.gettempdir()
605
606        self.tmp_dir = os.path.abspath(self.tmp_dir)
607
608    def create_temp_dir(self):
609        os.makedirs(self.tmp_dir, exist_ok=True)
610
611        # Define a writable temp dir that will be used as cwd while running
612        # the tests. The name of the dir includes the pid to allow parallel
613        # testing (see the -j option).
614        pid = os.getpid()
615        if self.worker_test_name is not None:
616            test_cwd = 'test_python_worker_{}'.format(pid)
617        else:
618            test_cwd = 'test_python_{}'.format(pid)
619        test_cwd += os_helper.FS_NONASCII
620        test_cwd = os.path.join(self.tmp_dir, test_cwd)
621        return test_cwd
622
623    def cleanup(self):
624        import glob
625
626        path = os.path.join(glob.escape(self.tmp_dir), 'test_python_*')
627        print("Cleanup %s directory" % self.tmp_dir)
628        for name in glob.glob(path):
629            if os.path.isdir(name):
630                print("Remove directory: %s" % name)
631                os_helper.rmtree(name)
632            else:
633                print("Remove file: %s" % name)
634                os_helper.unlink(name)
635
636    def main(self, tests=None, **kwargs):
637        self.parse_args(kwargs)
638
639        self.set_temp_dir()
640
641        if self.ns.cleanup:
642            self.cleanup()
643            sys.exit(0)
644
645        test_cwd = self.create_temp_dir()
646
647        try:
648            # Run the tests in a context manager that temporarily changes the CWD
649            # to a temporary and writable directory. If it's not possible to
650            # create or change the CWD, the original CWD will be used.
651            # The original CWD is available from os_helper.SAVEDCWD.
652            with os_helper.temp_cwd(test_cwd, quiet=True):
653                # When using multiprocessing, worker processes will use test_cwd
654                # as their parent temporary directory. So when the main process
655                # exit, it removes also subdirectories of worker processes.
656                self.ns.tempdir = test_cwd
657
658                self._main(tests, kwargs)
659        except SystemExit as exc:
660            # bpo-38203: Python can hang at exit in Py_Finalize(), especially
661            # on threading._shutdown() call: put a timeout
662            faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
663
664            sys.exit(exc.code)
665
666    def getloadavg(self):
667        if self.win_load_tracker is not None:
668            return self.win_load_tracker.getloadavg()
669
670        if hasattr(os, 'getloadavg'):
671            return os.getloadavg()[0]
672
673        return None
674
675    def _main(self, tests, kwargs):
676        if self.worker_test_name is not None:
677            from test.libregrtest.runtest_mp import run_tests_worker
678            run_tests_worker(self.ns, self.worker_test_name)
679
680        if self.ns.wait:
681            input("Press any key to continue...")
682
683        support.PGO = self.ns.pgo
684        support.PGO_EXTENDED = self.ns.pgo_extended
685
686        setup_tests(self.ns)
687
688        self.find_tests(tests)
689
690        if self.ns.list_tests:
691            self.list_tests()
692            sys.exit(0)
693
694        if self.ns.list_cases:
695            self.list_cases()
696            sys.exit(0)
697
698        # If we're on windows and this is the parent runner (not a worker),
699        # track the load average.
700        if sys.platform == 'win32' and self.worker_test_name is None:
701            from test.libregrtest.win_utils import WindowsLoadTracker
702
703            try:
704                self.win_load_tracker = WindowsLoadTracker()
705            except FileNotFoundError as error:
706                # Windows IoT Core and Windows Nano Server do not provide
707                # typeperf.exe for x64, x86 or ARM
708                print(f'Failed to create WindowsLoadTracker: {error}')
709
710        try:
711            self.run_tests()
712            self.display_result()
713
714            if self.ns.verbose2 and self.bad:
715                self.rerun_failed_tests()
716        finally:
717            if self.win_load_tracker is not None:
718                self.win_load_tracker.close()
719                self.win_load_tracker = None
720
721        self.finalize()
722
723        self.save_xml_result()
724
725        if self.bad:
726            sys.exit(2)
727        if self.interrupted:
728            sys.exit(130)
729        if self.ns.fail_env_changed and self.environment_changed:
730            sys.exit(3)
731        sys.exit(0)
732
733
734def main(tests=None, **kwargs):
735    """Run the Python suite."""
736    Regrtest().main(tests=tests, **kwargs)
737