• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import datetime
2import faulthandler
3import locale
4import os
5import platform
6import random
7import re
8import sys
9import sysconfig
10import tempfile
11import time
12import unittest
13from test.libregrtest.cmdline import _parse_args
14from test.libregrtest.runtest import (
15    findtests, runtest, get_abs_module,
16    STDTESTS, NOTTESTS, PASSED, FAILED, ENV_CHANGED, SKIPPED, RESOURCE_DENIED,
17    INTERRUPTED, CHILD_ERROR, TEST_DID_NOT_RUN, TIMEOUT,
18    PROGRESS_MIN_TIME, format_test_result, is_failed)
19from test.libregrtest.setup import setup_tests
20from test.libregrtest.pgo import setup_pgo_tests
21from test.libregrtest.utils import removepy, count, format_duration, printlist
22from test import support
23
24
25# bpo-38203: Maximum delay in seconds to exit Python (call Py_Finalize()).
26# Used to protect against threading._shutdown() hang.
27# Must be smaller than buildbot "1200 seconds without output" limit.
28EXIT_TIMEOUT = 120.0
29
30
31class Regrtest:
32    """Execute a test suite.
33
34    This also parses command-line options and modifies its behavior
35    accordingly.
36
37    tests -- a list of strings containing test names (optional)
38    testdir -- the directory in which to look for tests (optional)
39
40    Users other than the Python test suite will certainly want to
41    specify testdir; if it's omitted, the directory containing the
42    Python test suite is searched for.
43
44    If the tests argument is omitted, the tests listed on the
45    command-line will be used.  If that's empty, too, then all *.py
46    files beginning with test_ will be used.
47
48    The other default arguments (verbose, quiet, exclude,
49    single, randomize, findleaks, use_resources, trace, coverdir,
50    print_slow, and random_seed) allow programmers calling main()
51    directly to set the values that would normally be set by flags
52    on the command line.
53    """
54    def __init__(self):
55        # Namespace of command line options
56        self.ns = None
57
58        # tests
59        self.tests = []
60        self.selected = []
61
62        # test results
63        self.good = []
64        self.bad = []
65        self.skipped = []
66        self.resource_denieds = []
67        self.environment_changed = []
68        self.run_no_tests = []
69        self.rerun = []
70        self.first_result = None
71        self.interrupted = False
72
73        # used by --slow
74        self.test_times = []
75
76        # used by --coverage, trace.Trace instance
77        self.tracer = None
78
79        # used to display the progress bar "[ 3/100]"
80        self.start_time = time.monotonic()
81        self.test_count = ''
82        self.test_count_width = 1
83
84        # used by --single
85        self.next_single_test = None
86        self.next_single_filename = None
87
88        # used by --junit-xml
89        self.testsuite_xml = None
90
91        # misc
92        self.win_load_tracker = None
93        self.tmp_dir = None
94        self.worker_test_name = None
95
96    def get_executed(self):
97        return (set(self.good) | set(self.bad) | set(self.skipped)
98                | set(self.resource_denieds) | set(self.environment_changed)
99                | set(self.run_no_tests))
100
101    def accumulate_result(self, result, rerun=False):
102        test_name = result.test_name
103        ok = result.result
104
105        if ok not in (CHILD_ERROR, INTERRUPTED) and not rerun:
106            self.test_times.append((result.test_time, test_name))
107
108        if ok == PASSED:
109            self.good.append(test_name)
110        elif ok in (FAILED, CHILD_ERROR):
111            if not rerun:
112                self.bad.append(test_name)
113        elif ok == ENV_CHANGED:
114            self.environment_changed.append(test_name)
115        elif ok == SKIPPED:
116            self.skipped.append(test_name)
117        elif ok == RESOURCE_DENIED:
118            self.skipped.append(test_name)
119            self.resource_denieds.append(test_name)
120        elif ok == TEST_DID_NOT_RUN:
121            self.run_no_tests.append(test_name)
122        elif ok == INTERRUPTED:
123            self.interrupted = True
124        elif ok == TIMEOUT:
125            self.bad.append(test_name)
126        else:
127            raise ValueError("invalid test result: %r" % ok)
128
129        if rerun and ok not in {FAILED, CHILD_ERROR, INTERRUPTED}:
130            self.bad.remove(test_name)
131
132        xml_data = result.xml_data
133        if xml_data:
134            import xml.etree.ElementTree as ET
135            for e in xml_data:
136                try:
137                    self.testsuite_xml.append(ET.fromstring(e))
138                except ET.ParseError:
139                    print(xml_data, file=sys.__stderr__)
140                    raise
141
142    def log(self, line=''):
143        empty = not line
144
145        # add the system load prefix: "load avg: 1.80 "
146        load_avg = self.getloadavg()
147        if load_avg is not None:
148            line = f"load avg: {load_avg:.2f} {line}"
149
150        # add the timestamp prefix:  "0:01:05 "
151        test_time = time.monotonic() - self.start_time
152        test_time = datetime.timedelta(seconds=int(test_time))
153        line = f"{test_time} {line}"
154
155        if empty:
156            line = line[:-1]
157
158        print(line, flush=True)
159
160    def display_progress(self, test_index, text):
161        if self.ns.quiet:
162            return
163
164        # "[ 51/405/1] test_tcl passed"
165        line = f"{test_index:{self.test_count_width}}{self.test_count}"
166        fails = len(self.bad) + len(self.environment_changed)
167        if fails and not self.ns.pgo:
168            line = f"{line}/{fails}"
169        self.log(f"[{line}] {text}")
170
171    def parse_args(self, kwargs):
172        ns = _parse_args(sys.argv[1:], **kwargs)
173
174        if ns.xmlpath:
175            support.junit_xml_list = self.testsuite_xml = []
176
177        worker_args = ns.worker_args
178        if worker_args is not None:
179            from test.libregrtest.runtest_mp import parse_worker_args
180            ns, test_name = parse_worker_args(ns.worker_args)
181            ns.worker_args = worker_args
182            self.worker_test_name = test_name
183
184        # Strip .py extensions.
185        removepy(ns.args)
186
187        if ns.huntrleaks:
188            warmup, repetitions, _ = ns.huntrleaks
189            if warmup < 1 or repetitions < 1:
190                msg = ("Invalid values for the --huntrleaks/-R parameters. The "
191                       "number of warmups and repetitions must be at least 1 "
192                       "each (1:1).")
193                print(msg, file=sys.stderr, flush=True)
194                sys.exit(2)
195
196        if ns.tempdir:
197            ns.tempdir = os.path.expanduser(ns.tempdir)
198
199        self.ns = ns
200
201    def find_tests(self, tests):
202        self.tests = tests
203
204        if self.ns.single:
205            self.next_single_filename = os.path.join(self.tmp_dir, 'pynexttest')
206            try:
207                with open(self.next_single_filename, 'r') as fp:
208                    next_test = fp.read().strip()
209                    self.tests = [next_test]
210            except OSError:
211                pass
212
213        if self.ns.fromfile:
214            self.tests = []
215            # regex to match 'test_builtin' in line:
216            # '0:00:00 [  4/400] test_builtin -- test_dict took 1 sec'
217            regex = re.compile(r'\btest_[a-zA-Z0-9_]+\b')
218            with open(os.path.join(support.SAVEDCWD, self.ns.fromfile)) as fp:
219                for line in fp:
220                    line = line.split('#', 1)[0]
221                    line = line.strip()
222                    match = regex.search(line)
223                    if match is not None:
224                        self.tests.append(match.group())
225
226        removepy(self.tests)
227
228        if self.ns.pgo:
229            # add default PGO tests if no tests are specified
230            setup_pgo_tests(self.ns)
231
232        stdtests = STDTESTS[:]
233        nottests = NOTTESTS.copy()
234        if self.ns.exclude:
235            for arg in self.ns.args:
236                if arg in stdtests:
237                    stdtests.remove(arg)
238                nottests.add(arg)
239            self.ns.args = []
240
241        # if testdir is set, then we are not running the python tests suite, so
242        # don't add default tests to be executed or skipped (pass empty values)
243        if self.ns.testdir:
244            alltests = findtests(self.ns.testdir, list(), set())
245        else:
246            alltests = findtests(self.ns.testdir, stdtests, nottests)
247
248        if not self.ns.fromfile:
249            self.selected = self.tests or self.ns.args or alltests
250        else:
251            self.selected = self.tests
252        if self.ns.single:
253            self.selected = self.selected[:1]
254            try:
255                pos = alltests.index(self.selected[0])
256                self.next_single_test = alltests[pos + 1]
257            except IndexError:
258                pass
259
260        # Remove all the selected tests that precede start if it's set.
261        if self.ns.start:
262            try:
263                del self.selected[:self.selected.index(self.ns.start)]
264            except ValueError:
265                print("Couldn't find starting test (%s), using all tests"
266                      % self.ns.start, file=sys.stderr)
267
268        if self.ns.randomize:
269            if self.ns.random_seed is None:
270                self.ns.random_seed = random.randrange(10000000)
271            random.seed(self.ns.random_seed)
272            random.shuffle(self.selected)
273
274    def list_tests(self):
275        for name in self.selected:
276            print(name)
277
278    def _list_cases(self, suite):
279        for test in suite:
280            if isinstance(test, unittest.loader._FailedTest):
281                continue
282            if isinstance(test, unittest.TestSuite):
283                self._list_cases(test)
284            elif isinstance(test, unittest.TestCase):
285                if support.match_test(test):
286                    print(test.id())
287
288    def list_cases(self):
289        support.verbose = False
290        support.set_match_tests(self.ns.match_tests)
291
292        for test_name in self.selected:
293            abstest = get_abs_module(self.ns, test_name)
294            try:
295                suite = unittest.defaultTestLoader.loadTestsFromName(abstest)
296                self._list_cases(suite)
297            except unittest.SkipTest:
298                self.skipped.append(test_name)
299
300        if self.skipped:
301            print(file=sys.stderr)
302            print(count(len(self.skipped), "test"), "skipped:", file=sys.stderr)
303            printlist(self.skipped, file=sys.stderr)
304
305    def rerun_failed_tests(self):
306        self.ns.verbose = True
307        self.ns.failfast = False
308        self.ns.verbose3 = False
309
310        self.first_result = self.get_tests_result()
311
312        self.log()
313        self.log("Re-running failed tests in verbose mode")
314        self.rerun = self.bad[:]
315        for test_name in self.rerun:
316            self.log(f"Re-running {test_name} in verbose mode")
317            self.ns.verbose = True
318            result = runtest(self.ns, test_name)
319
320            self.accumulate_result(result, rerun=True)
321
322            if result.result == INTERRUPTED:
323                break
324
325        if self.bad:
326            print(count(len(self.bad), 'test'), "failed again:")
327            printlist(self.bad)
328
329        self.display_result()
330
331    def display_result(self):
332        # If running the test suite for PGO then no one cares about results.
333        if self.ns.pgo:
334            return
335
336        print()
337        print("== Tests result: %s ==" % self.get_tests_result())
338
339        if self.interrupted:
340            print("Test suite interrupted by signal SIGINT.")
341
342        omitted = set(self.selected) - self.get_executed()
343        if omitted:
344            print()
345            print(count(len(omitted), "test"), "omitted:")
346            printlist(omitted)
347
348        if self.good and not self.ns.quiet:
349            print()
350            if (not self.bad
351                and not self.skipped
352                and not self.interrupted
353                and len(self.good) > 1):
354                print("All", end=' ')
355            print(count(len(self.good), "test"), "OK.")
356
357        if self.ns.print_slow:
358            self.test_times.sort(reverse=True)
359            print()
360            print("10 slowest tests:")
361            for test_time, test in self.test_times[:10]:
362                print("- %s: %s" % (test, format_duration(test_time)))
363
364        if self.bad:
365            print()
366            print(count(len(self.bad), "test"), "failed:")
367            printlist(self.bad)
368
369        if self.environment_changed:
370            print()
371            print("{} altered the execution environment:".format(
372                     count(len(self.environment_changed), "test")))
373            printlist(self.environment_changed)
374
375        if self.skipped and not self.ns.quiet:
376            print()
377            print(count(len(self.skipped), "test"), "skipped:")
378            printlist(self.skipped)
379
380        if self.rerun:
381            print()
382            print("%s:" % count(len(self.rerun), "re-run test"))
383            printlist(self.rerun)
384
385        if self.run_no_tests:
386            print()
387            print(count(len(self.run_no_tests), "test"), "run no tests:")
388            printlist(self.run_no_tests)
389
390    def run_tests_sequential(self):
391        if self.ns.trace:
392            import trace
393            self.tracer = trace.Trace(trace=False, count=True)
394
395        save_modules = sys.modules.keys()
396
397        self.log("Run tests sequentially")
398
399        previous_test = None
400        for test_index, test_name in enumerate(self.tests, 1):
401            start_time = time.monotonic()
402
403            text = test_name
404            if previous_test:
405                text = '%s -- %s' % (text, previous_test)
406            self.display_progress(test_index, text)
407
408            if self.tracer:
409                # If we're tracing code coverage, then we don't exit with status
410                # if on a false return value from main.
411                cmd = ('result = runtest(self.ns, test_name); '
412                       'self.accumulate_result(result)')
413                ns = dict(locals())
414                self.tracer.runctx(cmd, globals=globals(), locals=ns)
415                result = ns['result']
416            else:
417                result = runtest(self.ns, test_name)
418                self.accumulate_result(result)
419
420            if result.result == INTERRUPTED:
421                break
422
423            previous_test = format_test_result(result)
424            test_time = time.monotonic() - start_time
425            if test_time >= PROGRESS_MIN_TIME:
426                previous_test = "%s in %s" % (previous_test, format_duration(test_time))
427            elif result.result == PASSED:
428                # be quiet: say nothing if the test passed shortly
429                previous_test = None
430
431            # Unload the newly imported modules (best effort finalization)
432            for module in sys.modules.keys():
433                if module not in save_modules and module.startswith("test."):
434                    support.unload(module)
435
436            if self.ns.failfast and is_failed(result, self.ns):
437                break
438
439        if previous_test:
440            print(previous_test)
441
442    def _test_forever(self, tests):
443        while True:
444            for test_name in tests:
445                yield test_name
446                if self.bad:
447                    return
448                if self.ns.fail_env_changed and self.environment_changed:
449                    return
450
451    def display_header(self):
452        # Print basic platform information
453        print("==", platform.python_implementation(), *sys.version.split())
454        print("==", platform.platform(aliased=True),
455                      "%s-endian" % sys.byteorder)
456        print("== cwd:", os.getcwd())
457        cpu_count = os.cpu_count()
458        if cpu_count:
459            print("== CPU count:", cpu_count)
460        print("== encodings: locale=%s, FS=%s"
461              % (locale.getpreferredencoding(False),
462                 sys.getfilesystemencoding()))
463
464    def get_tests_result(self):
465        result = []
466        if self.bad:
467            result.append("FAILURE")
468        elif self.ns.fail_env_changed and self.environment_changed:
469            result.append("ENV CHANGED")
470        elif not any((self.good, self.bad, self.skipped, self.interrupted,
471            self.environment_changed)):
472            result.append("NO TEST RUN")
473
474        if self.interrupted:
475            result.append("INTERRUPTED")
476
477        if not result:
478            result.append("SUCCESS")
479
480        result = ', '.join(result)
481        if self.first_result:
482            result = '%s then %s' % (self.first_result, result)
483        return result
484
485    def run_tests(self):
486        # For a partial run, we do not need to clutter the output.
487        if (self.ns.header
488            or not(self.ns.pgo or self.ns.quiet or self.ns.single
489                   or self.tests or self.ns.args)):
490            self.display_header()
491
492        if self.ns.huntrleaks:
493            warmup, repetitions, _ = self.ns.huntrleaks
494            if warmup < 3:
495                msg = ("WARNING: Running tests with --huntrleaks/-R and less than "
496                        "3 warmup repetitions can give false positives!")
497                print(msg, file=sys.stdout, flush=True)
498
499        if self.ns.randomize:
500            print("Using random seed", self.ns.random_seed)
501
502        if self.ns.forever:
503            self.tests = self._test_forever(list(self.selected))
504            self.test_count = ''
505            self.test_count_width = 3
506        else:
507            self.tests = iter(self.selected)
508            self.test_count = '/{}'.format(len(self.selected))
509            self.test_count_width = len(self.test_count) - 1
510
511        if self.ns.use_mp:
512            from test.libregrtest.runtest_mp import run_tests_multiprocess
513            run_tests_multiprocess(self)
514        else:
515            self.run_tests_sequential()
516
517    def finalize(self):
518        if self.next_single_filename:
519            if self.next_single_test:
520                with open(self.next_single_filename, 'w') as fp:
521                    fp.write(self.next_single_test + '\n')
522            else:
523                os.unlink(self.next_single_filename)
524
525        if self.tracer:
526            r = self.tracer.results()
527            r.write_results(show_missing=True, summary=True,
528                            coverdir=self.ns.coverdir)
529
530        print()
531        duration = time.monotonic() - self.start_time
532        print("Total duration: %s" % format_duration(duration))
533        print("Tests result: %s" % self.get_tests_result())
534
535        if self.ns.runleaks:
536            os.system("leaks %d" % os.getpid())
537
538    def save_xml_result(self):
539        if not self.ns.xmlpath and not self.testsuite_xml:
540            return
541
542        import xml.etree.ElementTree as ET
543        root = ET.Element("testsuites")
544
545        # Manually count the totals for the overall summary
546        totals = {'tests': 0, 'errors': 0, 'failures': 0}
547        for suite in self.testsuite_xml:
548            root.append(suite)
549            for k in totals:
550                try:
551                    totals[k] += int(suite.get(k, 0))
552                except ValueError:
553                    pass
554
555        for k, v in totals.items():
556            root.set(k, str(v))
557
558        xmlpath = os.path.join(support.SAVEDCWD, self.ns.xmlpath)
559        with open(xmlpath, 'wb') as f:
560            for s in ET.tostringlist(root):
561                f.write(s)
562
563    def set_temp_dir(self):
564        if self.ns.tempdir:
565            self.tmp_dir = self.ns.tempdir
566
567        if not self.tmp_dir:
568            # When tests are run from the Python build directory, it is best practice
569            # to keep the test files in a subfolder.  This eases the cleanup of leftover
570            # files using the "make distclean" command.
571            if sysconfig.is_python_build():
572                self.tmp_dir = sysconfig.get_config_var('abs_builddir')
573                if self.tmp_dir is None:
574                    # bpo-30284: On Windows, only srcdir is available. Using
575                    # abs_builddir mostly matters on UNIX when building Python
576                    # out of the source tree, especially when the source tree
577                    # is read only.
578                    self.tmp_dir = sysconfig.get_config_var('srcdir')
579                self.tmp_dir = os.path.join(self.tmp_dir, 'build')
580            else:
581                self.tmp_dir = tempfile.gettempdir()
582
583        self.tmp_dir = os.path.abspath(self.tmp_dir)
584
585    def create_temp_dir(self):
586        os.makedirs(self.tmp_dir, exist_ok=True)
587
588        # Define a writable temp dir that will be used as cwd while running
589        # the tests. The name of the dir includes the pid to allow parallel
590        # testing (see the -j option).
591        pid = os.getpid()
592        if self.worker_test_name is not None:
593            test_cwd = 'test_python_worker_{}'.format(pid)
594        else:
595            test_cwd = 'test_python_{}'.format(pid)
596        test_cwd = os.path.join(self.tmp_dir, test_cwd)
597        return test_cwd
598
599    def cleanup(self):
600        import glob
601
602        path = os.path.join(self.tmp_dir, 'test_python_*')
603        print("Cleanup %s directory" % self.tmp_dir)
604        for name in glob.glob(path):
605            if os.path.isdir(name):
606                print("Remove directory: %s" % name)
607                support.rmtree(name)
608            else:
609                print("Remove file: %s" % name)
610                support.unlink(name)
611
612    def main(self, tests=None, **kwargs):
613        self.parse_args(kwargs)
614
615        self.set_temp_dir()
616
617        if self.ns.cleanup:
618            self.cleanup()
619            sys.exit(0)
620
621        test_cwd = self.create_temp_dir()
622
623        try:
624            # Run the tests in a context manager that temporarily changes the CWD
625            # to a temporary and writable directory. If it's not possible to
626            # create or change the CWD, the original CWD will be used.
627            # The original CWD is available from support.SAVEDCWD.
628            with support.temp_cwd(test_cwd, quiet=True):
629                # When using multiprocessing, worker processes will use test_cwd
630                # as their parent temporary directory. So when the main process
631                # exit, it removes also subdirectories of worker processes.
632                self.ns.tempdir = test_cwd
633
634                self._main(tests, kwargs)
635        except SystemExit as exc:
636            # bpo-38203: Python can hang at exit in Py_Finalize(), especially
637            # on threading._shutdown() call: put a timeout
638            faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
639
640            sys.exit(exc.code)
641
642    def getloadavg(self):
643        if self.win_load_tracker is not None:
644            return self.win_load_tracker.getloadavg()
645
646        if hasattr(os, 'getloadavg'):
647            return os.getloadavg()[0]
648
649        return None
650
651    def _main(self, tests, kwargs):
652        if self.worker_test_name is not None:
653            from test.libregrtest.runtest_mp import run_tests_worker
654            run_tests_worker(self.ns, self.worker_test_name)
655
656        if self.ns.wait:
657            input("Press any key to continue...")
658
659        support.PGO = self.ns.pgo
660        support.PGO_EXTENDED = self.ns.pgo_extended
661
662        setup_tests(self.ns)
663
664        self.find_tests(tests)
665
666        if self.ns.list_tests:
667            self.list_tests()
668            sys.exit(0)
669
670        if self.ns.list_cases:
671            self.list_cases()
672            sys.exit(0)
673
674        # If we're on windows and this is the parent runner (not a worker),
675        # track the load average.
676        if sys.platform == 'win32' and self.worker_test_name is None:
677            from test.libregrtest.win_utils import WindowsLoadTracker
678
679            try:
680                self.win_load_tracker = WindowsLoadTracker()
681            except FileNotFoundError as error:
682                # Windows IoT Core and Windows Nano Server do not provide
683                # typeperf.exe for x64, x86 or ARM
684                print(f'Failed to create WindowsLoadTracker: {error}')
685
686        try:
687            self.run_tests()
688            self.display_result()
689
690            if self.ns.verbose2 and self.bad:
691                self.rerun_failed_tests()
692        finally:
693            if self.win_load_tracker is not None:
694                self.win_load_tracker.close()
695                self.win_load_tracker = None
696
697        self.finalize()
698
699        self.save_xml_result()
700
701        if self.bad:
702            sys.exit(2)
703        if self.interrupted:
704            sys.exit(130)
705        if self.ns.fail_env_changed and self.environment_changed:
706            sys.exit(3)
707        sys.exit(0)
708
709
710def main(tests=None, **kwargs):
711    """Run the Python suite."""
712    Regrtest().main(tests=tests, **kwargs)
713