• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import collections
2import faulthandler
3import functools
4import gc
5import importlib
6import io
7import os
8import sys
9import time
10import traceback
11import unittest
12
13from test import support
14from test.libregrtest.refleak import dash_R, clear_caches
15from test.libregrtest.save_env import saved_test_environment
16from test.libregrtest.utils import format_duration, print_warning
17
18
19# Test result constants.
20PASSED = 1
21FAILED = 0
22ENV_CHANGED = -1
23SKIPPED = -2
24RESOURCE_DENIED = -3
25INTERRUPTED = -4
26CHILD_ERROR = -5   # error in a child process
27TEST_DID_NOT_RUN = -6
28TIMEOUT = -7
29
30_FORMAT_TEST_RESULT = {
31    PASSED: '%s passed',
32    FAILED: '%s failed',
33    ENV_CHANGED: '%s failed (env changed)',
34    SKIPPED: '%s skipped',
35    RESOURCE_DENIED: '%s skipped (resource denied)',
36    INTERRUPTED: '%s interrupted',
37    CHILD_ERROR: '%s crashed',
38    TEST_DID_NOT_RUN: '%s run no tests',
39    TIMEOUT: '%s timed out',
40}
41
42# Minimum duration of a test to display its duration or to mention that
43# the test is running in background
44PROGRESS_MIN_TIME = 30.0   # seconds
45
46# small set of tests to determine if we have a basically functioning interpreter
47# (i.e. if any of these fail, then anything else is likely to follow)
48STDTESTS = [
49    'test_grammar',
50    'test_opcodes',
51    'test_dict',
52    'test_builtin',
53    'test_exceptions',
54    'test_types',
55    'test_unittest',
56    'test_doctest',
57    'test_doctest2',
58    'test_support'
59]
60
61# set of tests that we don't want to be executed when using regrtest
62NOTTESTS = set()
63
64
65# used by --findleaks, store for gc.garbage
66FOUND_GARBAGE = []
67
68
69def is_failed(result, ns):
70    ok = result.result
71    if ok in (PASSED, RESOURCE_DENIED, SKIPPED, TEST_DID_NOT_RUN):
72        return False
73    if ok == ENV_CHANGED:
74        return ns.fail_env_changed
75    return True
76
77
78def format_test_result(result):
79    fmt = _FORMAT_TEST_RESULT.get(result.result, "%s")
80    text = fmt % result.test_name
81    if result.result == TIMEOUT:
82        text = '%s (%s)' % (text, format_duration(result.test_time))
83    return text
84
85
86def findtestdir(path=None):
87    return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
88
89
90def findtests(testdir=None, stdtests=STDTESTS, nottests=NOTTESTS):
91    """Return a list of all applicable test modules."""
92    testdir = findtestdir(testdir)
93    names = os.listdir(testdir)
94    tests = []
95    others = set(stdtests) | nottests
96    for name in names:
97        mod, ext = os.path.splitext(name)
98        if mod[:5] == "test_" and ext in (".py", "") and mod not in others:
99            tests.append(mod)
100    return stdtests + sorted(tests)
101
102
103def get_abs_module(ns, test_name):
104    if test_name.startswith('test.') or ns.testdir:
105        return test_name
106    else:
107        # Import it from the test package
108        return 'test.' + test_name
109
110
111TestResult = collections.namedtuple('TestResult',
112    'test_name result test_time xml_data')
113
114def _runtest(ns, test_name):
115    # Handle faulthandler timeout, capture stdout+stderr, XML serialization
116    # and measure time.
117
118    output_on_failure = ns.verbose3
119
120    use_timeout = (ns.timeout is not None)
121    if use_timeout:
122        faulthandler.dump_traceback_later(ns.timeout, exit=True)
123
124    start_time = time.perf_counter()
125    try:
126        support.set_match_tests(ns.match_tests)
127        support.junit_xml_list = xml_list = [] if ns.xmlpath else None
128        if ns.failfast:
129            support.failfast = True
130
131        if output_on_failure:
132            support.verbose = True
133
134            stream = io.StringIO()
135            orig_stdout = sys.stdout
136            orig_stderr = sys.stderr
137            try:
138                sys.stdout = stream
139                sys.stderr = stream
140                result = _runtest_inner(ns, test_name,
141                                        display_failure=False)
142                if result != PASSED:
143                    output = stream.getvalue()
144                    orig_stderr.write(output)
145                    orig_stderr.flush()
146            finally:
147                sys.stdout = orig_stdout
148                sys.stderr = orig_stderr
149        else:
150            # Tell tests to be moderately quiet
151            support.verbose = ns.verbose
152
153            result = _runtest_inner(ns, test_name,
154                                    display_failure=not ns.verbose)
155
156        if xml_list:
157            import xml.etree.ElementTree as ET
158            xml_data = [ET.tostring(x).decode('us-ascii') for x in xml_list]
159        else:
160            xml_data = None
161
162        test_time = time.perf_counter() - start_time
163
164        return TestResult(test_name, result, test_time, xml_data)
165    finally:
166        if use_timeout:
167            faulthandler.cancel_dump_traceback_later()
168        support.junit_xml_list = None
169
170
171def runtest(ns, test_name):
172    """Run a single test.
173
174    ns -- regrtest namespace of options
175    test_name -- the name of the test
176
177    Returns the tuple (result, test_time, xml_data), where result is one
178    of the constants:
179
180        INTERRUPTED      KeyboardInterrupt
181        RESOURCE_DENIED  test skipped because resource denied
182        SKIPPED          test skipped for some other reason
183        ENV_CHANGED      test failed because it changed the execution environment
184        FAILED           test failed
185        PASSED           test passed
186        EMPTY_TEST_SUITE test ran no subtests.
187        TIMEOUT          test timed out.
188
189    If ns.xmlpath is not None, xml_data is a list containing each
190    generated testsuite element.
191    """
192    try:
193        return _runtest(ns, test_name)
194    except:
195        if not ns.pgo:
196            msg = traceback.format_exc()
197            print(f"test {test_name} crashed -- {msg}",
198                  file=sys.stderr, flush=True)
199        return TestResult(test_name, FAILED, 0.0, None)
200
201
202def _test_module(the_module):
203    loader = unittest.TestLoader()
204    tests = loader.loadTestsFromModule(the_module)
205    for error in loader.errors:
206        print(error, file=sys.stderr)
207    if loader.errors:
208        raise Exception("errors while loading tests")
209    support.run_unittest(tests)
210
211
212def _runtest_inner2(ns, test_name):
213    # Load the test function, run the test function, handle huntrleaks
214    # and findleaks to detect leaks
215
216    abstest = get_abs_module(ns, test_name)
217
218    # remove the module from sys.module to reload it if it was already imported
219    support.unload(abstest)
220
221    the_module = importlib.import_module(abstest)
222
223    # If the test has a test_main, that will run the appropriate
224    # tests.  If not, use normal unittest test loading.
225    test_runner = getattr(the_module, "test_main", None)
226    if test_runner is None:
227        test_runner = functools.partial(_test_module, the_module)
228
229    try:
230        if ns.huntrleaks:
231            # Return True if the test leaked references
232            refleak = dash_R(ns, test_name, test_runner)
233        else:
234            test_runner()
235            refleak = False
236    finally:
237        cleanup_test_droppings(test_name, ns.verbose)
238
239    support.gc_collect()
240
241    if gc.garbage:
242        support.environment_altered = True
243        print_warning(f"{test_name} created {len(gc.garbage)} "
244                      f"uncollectable object(s).")
245
246        # move the uncollectable objects somewhere,
247        # so we don't see them again
248        FOUND_GARBAGE.extend(gc.garbage)
249        gc.garbage.clear()
250
251    support.reap_children()
252
253    return refleak
254
255
256def _runtest_inner(ns, test_name, display_failure=True):
257    # Detect environment changes, handle exceptions.
258
259    # Reset the environment_altered flag to detect if a test altered
260    # the environment
261    support.environment_altered = False
262
263    if ns.pgo:
264        display_failure = False
265
266    try:
267        clear_caches()
268
269        with saved_test_environment(test_name, ns.verbose, ns.quiet, pgo=ns.pgo) as environment:
270            refleak = _runtest_inner2(ns, test_name)
271    except support.ResourceDenied as msg:
272        if not ns.quiet and not ns.pgo:
273            print(f"{test_name} skipped -- {msg}", flush=True)
274        return RESOURCE_DENIED
275    except unittest.SkipTest as msg:
276        if not ns.quiet and not ns.pgo:
277            print(f"{test_name} skipped -- {msg}", flush=True)
278        return SKIPPED
279    except support.TestFailed as exc:
280        msg = f"test {test_name} failed"
281        if display_failure:
282            msg = f"{msg} -- {exc}"
283        print(msg, file=sys.stderr, flush=True)
284        return FAILED
285    except support.TestDidNotRun:
286        return TEST_DID_NOT_RUN
287    except KeyboardInterrupt:
288        print()
289        return INTERRUPTED
290    except:
291        if not ns.pgo:
292            msg = traceback.format_exc()
293            print(f"test {test_name} crashed -- {msg}",
294                  file=sys.stderr, flush=True)
295        return FAILED
296
297    if refleak:
298        return FAILED
299    if environment.changed:
300        return ENV_CHANGED
301    return PASSED
302
303
304def cleanup_test_droppings(test_name, verbose):
305    # First kill any dangling references to open files etc.
306    # This can also issue some ResourceWarnings which would otherwise get
307    # triggered during the following test run, and possibly produce failures.
308    support.gc_collect()
309
310    # Try to clean up junk commonly left behind.  While tests shouldn't leave
311    # any files or directories behind, when a test fails that can be tedious
312    # for it to arrange.  The consequences can be especially nasty on Windows,
313    # since if a test leaves a file open, it cannot be deleted by name (while
314    # there's nothing we can do about that here either, we can display the
315    # name of the offending test, which is a real help).
316    for name in (support.TESTFN,):
317        if not os.path.exists(name):
318            continue
319
320        if os.path.isdir(name):
321            import shutil
322            kind, nuker = "directory", shutil.rmtree
323        elif os.path.isfile(name):
324            kind, nuker = "file", os.unlink
325        else:
326            raise RuntimeError(f"os.path says {name!r} exists but is neither "
327                               f"directory nor file")
328
329        if verbose:
330            print_warning("%r left behind %s %r" % (test_name, kind, name))
331            support.environment_altered = True
332
333        try:
334            import stat
335            # fix possible permissions problems that might prevent cleanup
336            os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
337            nuker(name)
338        except Exception as exc:
339            print_warning(f"{test_name} left behind {kind} {name!r} "
340                          f"and it couldn't be removed: {exc}")
341