• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import contextlib
7import functools
8import os
9import re
10import stat
11import sys
12import sysconfig
13import time
14import types
15import unittest
16import warnings
17
18from .testresult import get_test_runner
19
20
21try:
22    from _testcapi import unicode_legacy_string
23except ImportError:
24    unicode_legacy_string = None
25
26__all__ = [
27    # globals
28    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
29    # exceptions
30    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
31    # io
32    "record_original_stdout", "get_original_stdout", "captured_stdout",
33    "captured_stdin", "captured_stderr",
34    # unittest
35    "is_resource_enabled", "requires", "requires_freebsd_version",
36    "requires_linux_version", "requires_mac_ver",
37    "check_syntax_error",
38    "BasicTestRunner", "run_unittest", "run_doctest",
39    "requires_gzip", "requires_bz2", "requires_lzma",
40    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
41    "requires_IEEE_754", "requires_zlib",
42    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
43    "check__all__", "skip_if_buggy_ucrt_strfptime",
44    "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
45    # sys
46    "is_jython", "is_android", "check_impl_detail", "unix_shell",
47    "setswitchinterval",
48    # network
49    "open_urlresource",
50    # processes
51    "reap_children",
52    # miscellaneous
53    "run_with_locale", "swap_item", "findfile",
54    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
55    "run_with_tz", "PGO", "missing_compiler_executable",
56    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
57    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
58    ]
59
60
61# Timeout in seconds for tests using a network server listening on the network
62# local loopback interface like 127.0.0.1.
63#
64# The timeout is long enough to prevent test failure: it takes into account
65# that the client and the server can run in different threads or even different
66# processes.
67#
68# The timeout should be long enough for connect(), recv() and send() methods
69# of socket.socket.
70LOOPBACK_TIMEOUT = 5.0
71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
72    # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
73    # seconds on Windows ARM32 buildbot
74    LOOPBACK_TIMEOUT = 10
75elif sys.platform == 'vxworks':
76    LOOPBACK_TIMEOUT = 10
77
78# Timeout in seconds for network requests going to the internet. The timeout is
79# short enough to prevent a test to wait for too long if the internet request
80# is blocked for whatever reason.
81#
82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
83# but skip the test instead: see transient_internet().
84INTERNET_TIMEOUT = 60.0
85
86# Timeout in seconds to mark a test as failed if the test takes "too long".
87#
88# The timeout value depends on the regrtest --timeout command line option.
89#
90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
91# LONG_TIMEOUT instead.
92SHORT_TIMEOUT = 30.0
93
94# Timeout in seconds to detect when a test hangs.
95#
96# It is long enough to reduce the risk of test failure on the slowest Python
97# buildbots. It should not be used to mark a test as failed if the test takes
98# "too long". The timeout value depends on the regrtest --timeout command line
99# option.
100LONG_TIMEOUT = 5 * 60.0
101
102
103class Error(Exception):
104    """Base class for regression test exceptions."""
105
106class TestFailed(Error):
107    """Test failed."""
108
109class TestFailedWithDetails(TestFailed):
110    """Test failed."""
111    def __init__(self, msg, errors, failures):
112        self.msg = msg
113        self.errors = errors
114        self.failures = failures
115        super().__init__(msg, errors, failures)
116
117    def __str__(self):
118        return self.msg
119
120class TestDidNotRun(Error):
121    """Test did not run any subtests."""
122
123class ResourceDenied(unittest.SkipTest):
124    """Test skipped because it requested a disallowed resource.
125
126    This is raised when a test calls requires() for a resource that
127    has not be enabled.  It is used to distinguish between expected
128    and unexpected skips.
129    """
130
131def anticipate_failure(condition):
132    """Decorator to mark a test that is known to be broken in some cases
133
134       Any use of this decorator should have a comment identifying the
135       associated tracker issue.
136    """
137    if condition:
138        return unittest.expectedFailure
139    return lambda f: f
140
141def load_package_tests(pkg_dir, loader, standard_tests, pattern):
142    """Generic load_tests implementation for simple test packages.
143
144    Most packages can implement load_tests using this function as follows:
145
146       def load_tests(*args):
147           return load_package_tests(os.path.dirname(__file__), *args)
148    """
149    if pattern is None:
150        pattern = "test*"
151    top_dir = os.path.dirname(              # Lib
152                  os.path.dirname(              # test
153                      os.path.dirname(__file__)))   # support
154    package_tests = loader.discover(start_dir=pkg_dir,
155                                    top_level_dir=top_dir,
156                                    pattern=pattern)
157    standard_tests.addTests(package_tests)
158    return standard_tests
159
160
161def get_attribute(obj, name):
162    """Get an attribute, raising SkipTest if AttributeError is raised."""
163    try:
164        attribute = getattr(obj, name)
165    except AttributeError:
166        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
167    else:
168        return attribute
169
170verbose = 1              # Flag set to 0 by regrtest.py
171use_resources = None     # Flag set to [] by regrtest.py
172max_memuse = 0           # Disable bigmem tests (they will still be run with
173                         # small sizes, to make sure they work.)
174real_max_memuse = 0
175junit_xml_list = None    # list of testsuite XML elements
176failfast = False
177
178# _original_stdout is meant to hold stdout at the time regrtest began.
179# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
180# The point is to have some flavor of stdout the user can actually see.
181_original_stdout = None
182def record_original_stdout(stdout):
183    global _original_stdout
184    _original_stdout = stdout
185
186def get_original_stdout():
187    return _original_stdout or sys.stdout
188
189
190def _force_run(path, func, *args):
191    try:
192        return func(*args)
193    except OSError as err:
194        if verbose >= 2:
195            print('%s: %s' % (err.__class__.__name__, err))
196            print('re-run %s%r' % (func.__name__, args))
197        os.chmod(path, stat.S_IRWXU)
198        return func(*args)
199
200
201# Check whether a gui is actually available
202def _is_gui_available():
203    if hasattr(_is_gui_available, 'result'):
204        return _is_gui_available.result
205    import platform
206    reason = None
207    if sys.platform.startswith('win') and platform.win32_is_iot():
208        reason = "gui is not available on Windows IoT Core"
209    elif sys.platform.startswith('win'):
210        # if Python is running as a service (such as the buildbot service),
211        # gui interaction may be disallowed
212        import ctypes
213        import ctypes.wintypes
214        UOI_FLAGS = 1
215        WSF_VISIBLE = 0x0001
216        class USEROBJECTFLAGS(ctypes.Structure):
217            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
218                        ("fReserved", ctypes.wintypes.BOOL),
219                        ("dwFlags", ctypes.wintypes.DWORD)]
220        dll = ctypes.windll.user32
221        h = dll.GetProcessWindowStation()
222        if not h:
223            raise ctypes.WinError()
224        uof = USEROBJECTFLAGS()
225        needed = ctypes.wintypes.DWORD()
226        res = dll.GetUserObjectInformationW(h,
227            UOI_FLAGS,
228            ctypes.byref(uof),
229            ctypes.sizeof(uof),
230            ctypes.byref(needed))
231        if not res:
232            raise ctypes.WinError()
233        if not bool(uof.dwFlags & WSF_VISIBLE):
234            reason = "gui not available (WSF_VISIBLE flag not set)"
235    elif sys.platform == 'darwin':
236        # The Aqua Tk implementations on OS X can abort the process if
237        # being called in an environment where a window server connection
238        # cannot be made, for instance when invoked by a buildbot or ssh
239        # process not running under the same user id as the current console
240        # user.  To avoid that, raise an exception if the window manager
241        # connection is not available.
242        from ctypes import cdll, c_int, pointer, Structure
243        from ctypes.util import find_library
244
245        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
246
247        if app_services.CGMainDisplayID() == 0:
248            reason = "gui tests cannot run without OS X window manager"
249        else:
250            class ProcessSerialNumber(Structure):
251                _fields_ = [("highLongOfPSN", c_int),
252                            ("lowLongOfPSN", c_int)]
253            psn = ProcessSerialNumber()
254            psn_p = pointer(psn)
255            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
256                  (app_services.SetFrontProcess(psn_p) < 0) ):
257                reason = "cannot run without OS X gui process"
258
259    # check on every platform whether tkinter can actually do anything
260    if not reason:
261        try:
262            from tkinter import Tk
263            root = Tk()
264            root.withdraw()
265            root.update()
266            root.destroy()
267        except Exception as e:
268            err_string = str(e)
269            if len(err_string) > 50:
270                err_string = err_string[:50] + ' [...]'
271            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
272                                                           err_string)
273
274    _is_gui_available.reason = reason
275    _is_gui_available.result = not reason
276
277    return _is_gui_available.result
278
279def is_resource_enabled(resource):
280    """Test whether a resource is enabled.
281
282    Known resources are set by regrtest.py.  If not running under regrtest.py,
283    all resources are assumed enabled unless use_resources has been set.
284    """
285    return use_resources is None or resource in use_resources
286
287def requires(resource, msg=None):
288    """Raise ResourceDenied if the specified resource is not available."""
289    if not is_resource_enabled(resource):
290        if msg is None:
291            msg = "Use of the %r resource not enabled" % resource
292        raise ResourceDenied(msg)
293    if resource == 'gui' and not _is_gui_available():
294        raise ResourceDenied(_is_gui_available.reason)
295
296def _requires_unix_version(sysname, min_version):
297    """Decorator raising SkipTest if the OS is `sysname` and the version is less
298    than `min_version`.
299
300    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
301    the FreeBSD version is less than 7.2.
302    """
303    import platform
304    min_version_txt = '.'.join(map(str, min_version))
305    version_txt = platform.release().split('-', 1)[0]
306    if platform.system() == sysname:
307        try:
308            version = tuple(map(int, version_txt.split('.')))
309        except ValueError:
310            skip = False
311        else:
312            skip = version < min_version
313    else:
314        skip = False
315
316    return unittest.skipIf(
317        skip,
318        f"{sysname} version {min_version_txt} or higher required, not "
319        f"{version_txt}"
320    )
321
322
323def requires_freebsd_version(*min_version):
324    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
325    less than `min_version`.
326
327    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
328    version is less than 7.2.
329    """
330    return _requires_unix_version('FreeBSD', min_version)
331
332def requires_linux_version(*min_version):
333    """Decorator raising SkipTest if the OS is Linux and the Linux version is
334    less than `min_version`.
335
336    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
337    version is less than 2.6.32.
338    """
339    return _requires_unix_version('Linux', min_version)
340
341def requires_mac_ver(*min_version):
342    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
343    version if less than min_version.
344
345    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
346    is lesser than 10.5.
347    """
348    def decorator(func):
349        @functools.wraps(func)
350        def wrapper(*args, **kw):
351            if sys.platform == 'darwin':
352                import platform
353                version_txt = platform.mac_ver()[0]
354                try:
355                    version = tuple(map(int, version_txt.split('.')))
356                except ValueError:
357                    pass
358                else:
359                    if version < min_version:
360                        min_version_txt = '.'.join(map(str, min_version))
361                        raise unittest.SkipTest(
362                            "Mac OS X %s or higher required, not %s"
363                            % (min_version_txt, version_txt))
364            return func(*args, **kw)
365        wrapper.min_version = min_version
366        return wrapper
367    return decorator
368
369
370def check_sanitizer(*, address=False, memory=False, ub=False):
371    """Returns True if Python is compiled with sanitizer support"""
372    if not (address or memory or ub):
373        raise ValueError('At least one of address, memory, or ub must be True')
374
375
376    _cflags = sysconfig.get_config_var('CFLAGS') or ''
377    _config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
378    memory_sanitizer = (
379        '-fsanitize=memory' in _cflags or
380        '--with-memory-sanitizer' in _config_args
381    )
382    address_sanitizer = (
383        '-fsanitize=address' in _cflags or
384        '--with-memory-sanitizer' in _config_args
385    )
386    ub_sanitizer = (
387        '-fsanitize=undefined' in _cflags or
388        '--with-undefined-behavior-sanitizer' in _config_args
389    )
390    return (
391        (memory and memory_sanitizer) or
392        (address and address_sanitizer) or
393        (ub and ub_sanitizer)
394    )
395
396
397def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False):
398    """Decorator raising SkipTest if running with a sanitizer active."""
399    if not reason:
400        reason = 'not working with sanitizers active'
401    skip = check_sanitizer(address=address, memory=memory, ub=ub)
402    return unittest.skipIf(skip, reason)
403
404
405def system_must_validate_cert(f):
406    """Skip the test on TLS certificate validation failures."""
407    @functools.wraps(f)
408    def dec(*args, **kwargs):
409        try:
410            f(*args, **kwargs)
411        except OSError as e:
412            if "CERTIFICATE_VERIFY_FAILED" in str(e):
413                raise unittest.SkipTest("system does not contain "
414                                        "necessary certificates")
415            raise
416    return dec
417
418# A constant likely larger than the underlying OS pipe buffer size, to
419# make writes blocking.
420# Windows limit seems to be around 512 B, and many Unix kernels have a
421# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
422# (see issue #17835 for a discussion of this number).
423PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
424
425# A constant likely larger than the underlying OS socket buffer size, to make
426# writes blocking.
427# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
428# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
429# for a discussion of this number.
430SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
431
432# decorator for skipping tests on non-IEEE 754 platforms
433requires_IEEE_754 = unittest.skipUnless(
434    float.__getformat__("double").startswith("IEEE"),
435    "test requires IEEE 754 doubles")
436
437def requires_zlib(reason='requires zlib'):
438    try:
439        import zlib
440    except ImportError:
441        zlib = None
442    return unittest.skipUnless(zlib, reason)
443
444def requires_gzip(reason='requires gzip'):
445    try:
446        import gzip
447    except ImportError:
448        gzip = None
449    return unittest.skipUnless(gzip, reason)
450
451def requires_bz2(reason='requires bz2'):
452    try:
453        import bz2
454    except ImportError:
455        bz2 = None
456    return unittest.skipUnless(bz2, reason)
457
458def requires_lzma(reason='requires lzma'):
459    try:
460        import lzma
461    except ImportError:
462        lzma = None
463    return unittest.skipUnless(lzma, reason)
464
465requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string,
466                        'requires legacy Unicode C API')
467
468is_jython = sys.platform.startswith('java')
469
470is_android = hasattr(sys, 'getandroidapilevel')
471
472if sys.platform not in ('win32', 'vxworks'):
473    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
474else:
475    unix_shell = None
476
477# Define the URL of a dedicated HTTP server for the network tests.
478# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
479TEST_HTTP_URL = "http://www.pythontest.net"
480
481# Set by libregrtest/main.py so we can skip tests that are not
482# useful for PGO
483PGO = False
484
485# Set by libregrtest/main.py if we are running the extended (time consuming)
486# PGO task.  If this is True, PGO is also True.
487PGO_EXTENDED = False
488
489# TEST_HOME_DIR refers to the top level directory of the "test" package
490# that contains Python's regression test suite
491TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
492TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
493
494# TEST_DATA_DIR is used as a target download location for remote resources
495TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
496
497
498def darwin_malloc_err_warning(test_name):
499    """Assure user that loud errors generated by macOS libc's malloc are
500    expected."""
501    if sys.platform != 'darwin':
502        return
503
504    import shutil
505    msg = ' NOTICE '
506    detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
507              'warnings on macOS systems. This behavior is known. Do not\n'
508              'report a bug unless tests are also failing. See bpo-40928.')
509
510    padding, _ = shutil.get_terminal_size()
511    print(msg.center(padding, '-'))
512    print(detail)
513    print('-' * padding)
514
515
516def findfile(filename, subdir=None):
517    """Try to find a file on sys.path or in the test directory.  If it is not
518    found the argument passed to the function is returned (this does not
519    necessarily signal failure; could still be the legitimate path).
520
521    Setting *subdir* indicates a relative path to use to find the file
522    rather than looking directly in the path directories.
523    """
524    if os.path.isabs(filename):
525        return filename
526    if subdir is not None:
527        filename = os.path.join(subdir, filename)
528    path = [TEST_HOME_DIR] + sys.path
529    for dn in path:
530        fn = os.path.join(dn, filename)
531        if os.path.exists(fn): return fn
532    return filename
533
534
535def sortdict(dict):
536    "Like repr(dict), but in sorted order."
537    items = sorted(dict.items())
538    reprpairs = ["%r: %r" % pair for pair in items]
539    withcommas = ", ".join(reprpairs)
540    return "{%s}" % withcommas
541
542def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
543    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
544        compile(statement, '<test string>', 'exec')
545    err = cm.exception
546    testcase.assertIsNotNone(err.lineno)
547    if lineno is not None:
548        testcase.assertEqual(err.lineno, lineno)
549    testcase.assertIsNotNone(err.offset)
550    if offset is not None:
551        testcase.assertEqual(err.offset, offset)
552
553
554def open_urlresource(url, *args, **kw):
555    import urllib.request, urllib.parse
556    from .os_helper import unlink
557    try:
558        import gzip
559    except ImportError:
560        gzip = None
561
562    check = kw.pop('check', None)
563
564    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
565
566    fn = os.path.join(TEST_DATA_DIR, filename)
567
568    def check_valid_file(fn):
569        f = open(fn, *args, **kw)
570        if check is None:
571            return f
572        elif check(f):
573            f.seek(0)
574            return f
575        f.close()
576
577    if os.path.exists(fn):
578        f = check_valid_file(fn)
579        if f is not None:
580            return f
581        unlink(fn)
582
583    # Verify the requirement before downloading the file
584    requires('urlfetch')
585
586    if verbose:
587        print('\tfetching %s ...' % url, file=get_original_stdout())
588    opener = urllib.request.build_opener()
589    if gzip:
590        opener.addheaders.append(('Accept-Encoding', 'gzip'))
591    f = opener.open(url, timeout=INTERNET_TIMEOUT)
592    if gzip and f.headers.get('Content-Encoding') == 'gzip':
593        f = gzip.GzipFile(fileobj=f)
594    try:
595        with open(fn, "wb") as out:
596            s = f.read()
597            while s:
598                out.write(s)
599                s = f.read()
600    finally:
601        f.close()
602
603    f = check_valid_file(fn)
604    if f is not None:
605        return f
606    raise TestFailed('invalid resource %r' % fn)
607
608
609@contextlib.contextmanager
610def captured_output(stream_name):
611    """Return a context manager used by captured_stdout/stdin/stderr
612    that temporarily replaces the sys stream *stream_name* with a StringIO."""
613    import io
614    orig_stdout = getattr(sys, stream_name)
615    setattr(sys, stream_name, io.StringIO())
616    try:
617        yield getattr(sys, stream_name)
618    finally:
619        setattr(sys, stream_name, orig_stdout)
620
621def captured_stdout():
622    """Capture the output of sys.stdout:
623
624       with captured_stdout() as stdout:
625           print("hello")
626       self.assertEqual(stdout.getvalue(), "hello\\n")
627    """
628    return captured_output("stdout")
629
630def captured_stderr():
631    """Capture the output of sys.stderr:
632
633       with captured_stderr() as stderr:
634           print("hello", file=sys.stderr)
635       self.assertEqual(stderr.getvalue(), "hello\\n")
636    """
637    return captured_output("stderr")
638
639def captured_stdin():
640    """Capture the input to sys.stdin:
641
642       with captured_stdin() as stdin:
643           stdin.write('hello\\n')
644           stdin.seek(0)
645           # call test code that consumes from sys.stdin
646           captured = input()
647       self.assertEqual(captured, "hello")
648    """
649    return captured_output("stdin")
650
651
652def gc_collect():
653    """Force as many objects as possible to be collected.
654
655    In non-CPython implementations of Python, this is needed because timely
656    deallocation is not guaranteed by the garbage collector.  (Even in CPython
657    this can be the case in case of reference cycles.)  This means that __del__
658    methods may be called later than expected and weakrefs may remain alive for
659    longer than expected.  This function tries its best to force all garbage
660    objects to disappear.
661    """
662    import gc
663    gc.collect()
664    if is_jython:
665        time.sleep(0.1)
666    gc.collect()
667    gc.collect()
668
669@contextlib.contextmanager
670def disable_gc():
671    import gc
672    have_gc = gc.isenabled()
673    gc.disable()
674    try:
675        yield
676    finally:
677        if have_gc:
678            gc.enable()
679
680
681def python_is_optimized():
682    """Find if Python was built with optimizations."""
683    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
684    final_opt = ""
685    for opt in cflags.split():
686        if opt.startswith('-O'):
687            final_opt = opt
688    return final_opt not in ('', '-O0', '-Og')
689
690
691_header = 'nP'
692_align = '0n'
693if hasattr(sys, "getobjects"):
694    _header = '2P' + _header
695    _align = '0P'
696_vheader = _header + 'n'
697
698def calcobjsize(fmt):
699    import struct
700    return struct.calcsize(_header + fmt + _align)
701
702def calcvobjsize(fmt):
703    import struct
704    return struct.calcsize(_vheader + fmt + _align)
705
706
707_TPFLAGS_HAVE_GC = 1<<14
708_TPFLAGS_HEAPTYPE = 1<<9
709
710def check_sizeof(test, o, size):
711    import _testinternalcapi
712    result = sys.getsizeof(o)
713    # add GC header size
714    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
715        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
716        size += _testinternalcapi.SIZEOF_PYGC_HEAD
717    msg = 'wrong size for %s: got %d, expected %d' \
718            % (type(o), result, size)
719    test.assertEqual(result, size, msg)
720
721#=======================================================================
722# Decorator for running a function in a different locale, correctly resetting
723# it afterwards.
724
725@contextlib.contextmanager
726def run_with_locale(catstr, *locales):
727            try:
728                import locale
729                category = getattr(locale, catstr)
730                orig_locale = locale.setlocale(category)
731            except AttributeError:
732                # if the test author gives us an invalid category string
733                raise
734            except:
735                # cannot retrieve original locale, so do nothing
736                locale = orig_locale = None
737            else:
738                for loc in locales:
739                    try:
740                        locale.setlocale(category, loc)
741                        break
742                    except:
743                        pass
744
745            try:
746                yield
747            finally:
748                if locale and orig_locale:
749                    locale.setlocale(category, orig_locale)
750
751#=======================================================================
752# Decorator for running a function in a specific timezone, correctly
753# resetting it afterwards.
754
755def run_with_tz(tz):
756    def decorator(func):
757        def inner(*args, **kwds):
758            try:
759                tzset = time.tzset
760            except AttributeError:
761                raise unittest.SkipTest("tzset required")
762            if 'TZ' in os.environ:
763                orig_tz = os.environ['TZ']
764            else:
765                orig_tz = None
766            os.environ['TZ'] = tz
767            tzset()
768
769            # now run the function, resetting the tz on exceptions
770            try:
771                return func(*args, **kwds)
772            finally:
773                if orig_tz is None:
774                    del os.environ['TZ']
775                else:
776                    os.environ['TZ'] = orig_tz
777                time.tzset()
778
779        inner.__name__ = func.__name__
780        inner.__doc__ = func.__doc__
781        return inner
782    return decorator
783
784#=======================================================================
785# Big-memory-test support. Separate from 'resources' because memory use
786# should be configurable.
787
788# Some handy shorthands. Note that these are used for byte-limits as well
789# as size-limits, in the various bigmem tests
790_1M = 1024*1024
791_1G = 1024 * _1M
792_2G = 2 * _1G
793_4G = 4 * _1G
794
795MAX_Py_ssize_t = sys.maxsize
796
797def set_memlimit(limit):
798    global max_memuse
799    global real_max_memuse
800    sizes = {
801        'k': 1024,
802        'm': _1M,
803        'g': _1G,
804        't': 1024*_1G,
805    }
806    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
807                 re.IGNORECASE | re.VERBOSE)
808    if m is None:
809        raise ValueError('Invalid memory limit %r' % (limit,))
810    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
811    real_max_memuse = memlimit
812    if memlimit > MAX_Py_ssize_t:
813        memlimit = MAX_Py_ssize_t
814    if memlimit < _2G - 1:
815        raise ValueError('Memory limit %r too low to be useful' % (limit,))
816    max_memuse = memlimit
817
818class _MemoryWatchdog:
819    """An object which periodically watches the process' memory consumption
820    and prints it out.
821    """
822
823    def __init__(self):
824        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
825        self.started = False
826
827    def start(self):
828        import warnings
829        try:
830            f = open(self.procfile, 'r')
831        except OSError as e:
832            warnings.warn('/proc not available for stats: {}'.format(e),
833                          RuntimeWarning)
834            sys.stderr.flush()
835            return
836
837        import subprocess
838        with f:
839            watchdog_script = findfile("memory_watchdog.py")
840            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
841                                                 stdin=f,
842                                                 stderr=subprocess.DEVNULL)
843        self.started = True
844
845    def stop(self):
846        if self.started:
847            self.mem_watchdog.terminate()
848            self.mem_watchdog.wait()
849
850
851def bigmemtest(size, memuse, dry_run=True):
852    """Decorator for bigmem tests.
853
854    'size' is a requested size for the test (in arbitrary, test-interpreted
855    units.) 'memuse' is the number of bytes per unit for the test, or a good
856    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
857    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
858
859    The 'size' argument is normally passed to the decorated test method as an
860    extra argument. If 'dry_run' is true, the value passed to the test method
861    may be less than the requested value. If 'dry_run' is false, it means the
862    test doesn't support dummy runs when -M is not specified.
863    """
864    def decorator(f):
865        def wrapper(self):
866            size = wrapper.size
867            memuse = wrapper.memuse
868            if not real_max_memuse:
869                maxsize = 5147
870            else:
871                maxsize = size
872
873            if ((real_max_memuse or not dry_run)
874                and real_max_memuse < maxsize * memuse):
875                raise unittest.SkipTest(
876                    "not enough memory: %.1fG minimum needed"
877                    % (size * memuse / (1024 ** 3)))
878
879            if real_max_memuse and verbose:
880                print()
881                print(" ... expected peak memory use: {peak:.1f}G"
882                      .format(peak=size * memuse / (1024 ** 3)))
883                watchdog = _MemoryWatchdog()
884                watchdog.start()
885            else:
886                watchdog = None
887
888            try:
889                return f(self, maxsize)
890            finally:
891                if watchdog:
892                    watchdog.stop()
893
894        wrapper.size = size
895        wrapper.memuse = memuse
896        return wrapper
897    return decorator
898
899def bigaddrspacetest(f):
900    """Decorator for tests that fill the address space."""
901    def wrapper(self):
902        if max_memuse < MAX_Py_ssize_t:
903            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
904                raise unittest.SkipTest(
905                    "not enough memory: try a 32-bit build instead")
906            else:
907                raise unittest.SkipTest(
908                    "not enough memory: %.1fG minimum needed"
909                    % (MAX_Py_ssize_t / (1024 ** 3)))
910        else:
911            return f(self)
912    return wrapper
913
914#=======================================================================
915# unittest integration.
916
917class BasicTestRunner:
918    def run(self, test):
919        result = unittest.TestResult()
920        test(result)
921        return result
922
923def _id(obj):
924    return obj
925
926def requires_resource(resource):
927    if resource == 'gui' and not _is_gui_available():
928        return unittest.skip(_is_gui_available.reason)
929    if is_resource_enabled(resource):
930        return _id
931    else:
932        return unittest.skip("resource {0!r} is not enabled".format(resource))
933
934def cpython_only(test):
935    """
936    Decorator for tests only applicable on CPython.
937    """
938    return impl_detail(cpython=True)(test)
939
940def impl_detail(msg=None, **guards):
941    if check_impl_detail(**guards):
942        return _id
943    if msg is None:
944        guardnames, default = _parse_guards(guards)
945        if default:
946            msg = "implementation detail not available on {0}"
947        else:
948            msg = "implementation detail specific to {0}"
949        guardnames = sorted(guardnames.keys())
950        msg = msg.format(' or '.join(guardnames))
951    return unittest.skip(msg)
952
953def _parse_guards(guards):
954    # Returns a tuple ({platform_name: run_me}, default_value)
955    if not guards:
956        return ({'cpython': True}, False)
957    is_true = list(guards.values())[0]
958    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
959    return (guards, not is_true)
960
961# Use the following check to guard CPython's implementation-specific tests --
962# or to run them only on the implementation(s) guarded by the arguments.
963def check_impl_detail(**guards):
964    """This function returns True or False depending on the host platform.
965       Examples:
966          if check_impl_detail():               # only on CPython (default)
967          if check_impl_detail(jython=True):    # only on Jython
968          if check_impl_detail(cpython=False):  # everywhere except on CPython
969    """
970    guards, default = _parse_guards(guards)
971    return guards.get(sys.implementation.name, default)
972
973
974def no_tracing(func):
975    """Decorator to temporarily turn off tracing for the duration of a test."""
976    if not hasattr(sys, 'gettrace'):
977        return func
978    else:
979        @functools.wraps(func)
980        def wrapper(*args, **kwargs):
981            original_trace = sys.gettrace()
982            try:
983                sys.settrace(None)
984                return func(*args, **kwargs)
985            finally:
986                sys.settrace(original_trace)
987        return wrapper
988
989
990def refcount_test(test):
991    """Decorator for tests which involve reference counting.
992
993    To start, the decorator does not run the test if is not run by CPython.
994    After that, any trace function is unset during the test to prevent
995    unexpected refcounts caused by the trace function.
996
997    """
998    return no_tracing(cpython_only(test))
999
1000
1001def _filter_suite(suite, pred):
1002    """Recursively filter test cases in a suite based on a predicate."""
1003    newtests = []
1004    for test in suite._tests:
1005        if isinstance(test, unittest.TestSuite):
1006            _filter_suite(test, pred)
1007            newtests.append(test)
1008        else:
1009            if pred(test):
1010                newtests.append(test)
1011    suite._tests = newtests
1012
1013def _run_suite(suite):
1014    """Run tests from a unittest.TestSuite-derived class."""
1015    runner = get_test_runner(sys.stdout,
1016                             verbosity=verbose,
1017                             capture_output=(junit_xml_list is not None))
1018
1019    result = runner.run(suite)
1020
1021    if junit_xml_list is not None:
1022        junit_xml_list.append(result.get_xml_element())
1023
1024    if not result.testsRun and not result.skipped:
1025        raise TestDidNotRun
1026    if not result.wasSuccessful():
1027        if len(result.errors) == 1 and not result.failures:
1028            err = result.errors[0][1]
1029        elif len(result.failures) == 1 and not result.errors:
1030            err = result.failures[0][1]
1031        else:
1032            err = "multiple errors occurred"
1033            if not verbose: err += "; run in verbose mode for details"
1034        errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
1035        failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
1036        raise TestFailedWithDetails(err, errors, failures)
1037
1038
1039# By default, don't filter tests
1040_match_test_func = None
1041
1042_accept_test_patterns = None
1043_ignore_test_patterns = None
1044
1045
1046def match_test(test):
1047    # Function used by support.run_unittest() and regrtest --list-cases
1048    if _match_test_func is None:
1049        return True
1050    else:
1051        return _match_test_func(test.id())
1052
1053
1054def _is_full_match_test(pattern):
1055    # If a pattern contains at least one dot, it's considered
1056    # as a full test identifier.
1057    # Example: 'test.test_os.FileTests.test_access'.
1058    #
1059    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1060    # or '[!...]'. For example, ignore 'test_access*'.
1061    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1062
1063
1064def set_match_tests(accept_patterns=None, ignore_patterns=None):
1065    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1066
1067
1068    if accept_patterns is None:
1069        accept_patterns = ()
1070    if ignore_patterns is None:
1071        ignore_patterns = ()
1072
1073    accept_func = ignore_func = None
1074
1075    if accept_patterns != _accept_test_patterns:
1076        accept_patterns, accept_func = _compile_match_function(accept_patterns)
1077    if ignore_patterns != _ignore_test_patterns:
1078        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1079
1080    # Create a copy since patterns can be mutable and so modified later
1081    _accept_test_patterns = tuple(accept_patterns)
1082    _ignore_test_patterns = tuple(ignore_patterns)
1083
1084    if accept_func is not None or ignore_func is not None:
1085        def match_function(test_id):
1086            accept = True
1087            ignore = False
1088            if accept_func:
1089                accept = accept_func(test_id)
1090            if ignore_func:
1091                ignore = ignore_func(test_id)
1092            return accept and not ignore
1093
1094        _match_test_func = match_function
1095
1096
1097def _compile_match_function(patterns):
1098    if not patterns:
1099        func = None
1100        # set_match_tests(None) behaves as set_match_tests(())
1101        patterns = ()
1102    elif all(map(_is_full_match_test, patterns)):
1103        # Simple case: all patterns are full test identifier.
1104        # The test.bisect_cmd utility only uses such full test identifiers.
1105        func = set(patterns).__contains__
1106    else:
1107        import fnmatch
1108        regex = '|'.join(map(fnmatch.translate, patterns))
1109        # The search *is* case sensitive on purpose:
1110        # don't use flags=re.IGNORECASE
1111        regex_match = re.compile(regex).match
1112
1113        def match_test_regex(test_id):
1114            if regex_match(test_id):
1115                # The regex matches the whole identifier, for example
1116                # 'test.test_os.FileTests.test_access'.
1117                return True
1118            else:
1119                # Try to match parts of the test identifier.
1120                # For example, split 'test.test_os.FileTests.test_access'
1121                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1122                return any(map(regex_match, test_id.split(".")))
1123
1124        func = match_test_regex
1125
1126    return patterns, func
1127
1128
1129def run_unittest(*classes):
1130    """Run tests from unittest.TestCase-derived classes."""
1131    valid_types = (unittest.TestSuite, unittest.TestCase)
1132    suite = unittest.TestSuite()
1133    for cls in classes:
1134        if isinstance(cls, str):
1135            if cls in sys.modules:
1136                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1137            else:
1138                raise ValueError("str arguments must be keys in sys.modules")
1139        elif isinstance(cls, valid_types):
1140            suite.addTest(cls)
1141        else:
1142            suite.addTest(unittest.makeSuite(cls))
1143    _filter_suite(suite, match_test)
1144    _run_suite(suite)
1145
1146#=======================================================================
1147# Check for the presence of docstrings.
1148
1149# Rather than trying to enumerate all the cases where docstrings may be
1150# disabled, we just check for that directly
1151
1152def _check_docstrings():
1153    """Just used to check if docstrings are enabled"""
1154
1155MISSING_C_DOCSTRINGS = (check_impl_detail() and
1156                        sys.platform != 'win32' and
1157                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1158
1159HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1160                   not MISSING_C_DOCSTRINGS)
1161
1162requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1163                                          "test requires docstrings")
1164
1165
1166#=======================================================================
1167# doctest driver.
1168
1169def run_doctest(module, verbosity=None, optionflags=0):
1170    """Run doctest on the given module.  Return (#failures, #tests).
1171
1172    If optional argument verbosity is not specified (or is None), pass
1173    support's belief about verbosity on to doctest.  Else doctest's
1174    usual behavior is used (it searches sys.argv for -v).
1175    """
1176
1177    import doctest
1178
1179    if verbosity is None:
1180        verbosity = verbose
1181    else:
1182        verbosity = None
1183
1184    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1185    if f:
1186        raise TestFailed("%d of %d doctests failed" % (f, t))
1187    if verbose:
1188        print('doctest (%s) ... %d tests with zero failures' %
1189              (module.__name__, t))
1190    return f, t
1191
1192
1193#=======================================================================
1194# Support for saving and restoring the imported modules.
1195
1196def print_warning(msg):
1197    # bpo-39983: Print into sys.__stderr__ to display the warning even
1198    # when sys.stderr is captured temporarily by a test
1199    for line in msg.splitlines():
1200        print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
1201
1202
1203# Flag used by saved_test_environment of test.libregrtest.save_env,
1204# to check if a test modified the environment. The flag should be set to False
1205# before running a new test.
1206#
1207# For example, threading_helper.threading_cleanup() sets the flag is the function fails
1208# to cleanup threads.
1209environment_altered = False
1210
1211def reap_children():
1212    """Use this function at the end of test_main() whenever sub-processes
1213    are started.  This will help ensure that no extra children (zombies)
1214    stick around to hog resources and create problems when looking
1215    for refleaks.
1216    """
1217    global environment_altered
1218
1219    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1220    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1221        return
1222
1223    # Reap all our dead child processes so we don't leave zombies around.
1224    # These hog resources and might be causing some of the buildbots to die.
1225    while True:
1226        try:
1227            # Read the exit status of any child process which already completed
1228            pid, status = os.waitpid(-1, os.WNOHANG)
1229        except OSError:
1230            break
1231
1232        if pid == 0:
1233            break
1234
1235        print_warning(f"reap_children() reaped child process {pid}")
1236        environment_altered = True
1237
1238
1239@contextlib.contextmanager
1240def swap_attr(obj, attr, new_val):
1241    """Temporary swap out an attribute with a new object.
1242
1243    Usage:
1244        with swap_attr(obj, "attr", 5):
1245            ...
1246
1247        This will set obj.attr to 5 for the duration of the with: block,
1248        restoring the old value at the end of the block. If `attr` doesn't
1249        exist on `obj`, it will be created and then deleted at the end of the
1250        block.
1251
1252        The old value (or None if it doesn't exist) will be assigned to the
1253        target of the "as" clause, if there is one.
1254    """
1255    if hasattr(obj, attr):
1256        real_val = getattr(obj, attr)
1257        setattr(obj, attr, new_val)
1258        try:
1259            yield real_val
1260        finally:
1261            setattr(obj, attr, real_val)
1262    else:
1263        setattr(obj, attr, new_val)
1264        try:
1265            yield
1266        finally:
1267            if hasattr(obj, attr):
1268                delattr(obj, attr)
1269
1270@contextlib.contextmanager
1271def swap_item(obj, item, new_val):
1272    """Temporary swap out an item with a new object.
1273
1274    Usage:
1275        with swap_item(obj, "item", 5):
1276            ...
1277
1278        This will set obj["item"] to 5 for the duration of the with: block,
1279        restoring the old value at the end of the block. If `item` doesn't
1280        exist on `obj`, it will be created and then deleted at the end of the
1281        block.
1282
1283        The old value (or None if it doesn't exist) will be assigned to the
1284        target of the "as" clause, if there is one.
1285    """
1286    if item in obj:
1287        real_val = obj[item]
1288        obj[item] = new_val
1289        try:
1290            yield real_val
1291        finally:
1292            obj[item] = real_val
1293    else:
1294        obj[item] = new_val
1295        try:
1296            yield
1297        finally:
1298            if item in obj:
1299                del obj[item]
1300
1301def args_from_interpreter_flags():
1302    """Return a list of command-line arguments reproducing the current
1303    settings in sys.flags and sys.warnoptions."""
1304    import subprocess
1305    return subprocess._args_from_interpreter_flags()
1306
1307def optim_args_from_interpreter_flags():
1308    """Return a list of command-line arguments reproducing the current
1309    optimization settings in sys.flags."""
1310    import subprocess
1311    return subprocess._optim_args_from_interpreter_flags()
1312
1313
1314class Matcher(object):
1315
1316    _partial_matches = ('msg', 'message')
1317
1318    def matches(self, d, **kwargs):
1319        """
1320        Try to match a single dict with the supplied arguments.
1321
1322        Keys whose values are strings and which are in self._partial_matches
1323        will be checked for partial (i.e. substring) matches. You can extend
1324        this scheme to (for example) do regular expression matching, etc.
1325        """
1326        result = True
1327        for k in kwargs:
1328            v = kwargs[k]
1329            dv = d.get(k)
1330            if not self.match_value(k, dv, v):
1331                result = False
1332                break
1333        return result
1334
1335    def match_value(self, k, dv, v):
1336        """
1337        Try to match a single stored value (dv) with a supplied value (v).
1338        """
1339        if type(v) != type(dv):
1340            result = False
1341        elif type(dv) is not str or k not in self._partial_matches:
1342            result = (v == dv)
1343        else:
1344            result = dv.find(v) >= 0
1345        return result
1346
1347
1348_buggy_ucrt = None
1349def skip_if_buggy_ucrt_strfptime(test):
1350    """
1351    Skip decorator for tests that use buggy strptime/strftime
1352
1353    If the UCRT bugs are present time.localtime().tm_zone will be
1354    an empty string, otherwise we assume the UCRT bugs are fixed
1355
1356    See bpo-37552 [Windows] strptime/strftime return invalid
1357    results with UCRT version 17763.615
1358    """
1359    import locale
1360    global _buggy_ucrt
1361    if _buggy_ucrt is None:
1362        if(sys.platform == 'win32' and
1363                locale.getdefaultlocale()[1]  == 'cp65001' and
1364                time.localtime().tm_zone == ''):
1365            _buggy_ucrt = True
1366        else:
1367            _buggy_ucrt = False
1368    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1369
1370class PythonSymlink:
1371    """Creates a symlink for the current Python executable"""
1372    def __init__(self, link=None):
1373        from .os_helper import TESTFN
1374
1375        self.link = link or os.path.abspath(TESTFN)
1376        self._linked = []
1377        self.real = os.path.realpath(sys.executable)
1378        self._also_link = []
1379
1380        self._env = None
1381
1382        self._platform_specific()
1383
1384    if sys.platform == "win32":
1385        def _platform_specific(self):
1386            import glob
1387            import _winapi
1388
1389            if os.path.lexists(self.real) and not os.path.exists(self.real):
1390                # App symlink appears to not exist, but we want the
1391                # real executable here anyway
1392                self.real = _winapi.GetModuleFileName(0)
1393
1394            dll = _winapi.GetModuleFileName(sys.dllhandle)
1395            src_dir = os.path.dirname(dll)
1396            dest_dir = os.path.dirname(self.link)
1397            self._also_link.append((
1398                dll,
1399                os.path.join(dest_dir, os.path.basename(dll))
1400            ))
1401            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1402                self._also_link.append((
1403                    runtime,
1404                    os.path.join(dest_dir, os.path.basename(runtime))
1405                ))
1406
1407            self._env = {k.upper(): os.getenv(k) for k in os.environ}
1408            self._env["PYTHONHOME"] = os.path.dirname(self.real)
1409            if sysconfig.is_python_build(True):
1410                self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
1411    else:
1412        def _platform_specific(self):
1413            pass
1414
1415    def __enter__(self):
1416        os.symlink(self.real, self.link)
1417        self._linked.append(self.link)
1418        for real, link in self._also_link:
1419            os.symlink(real, link)
1420            self._linked.append(link)
1421        return self
1422
1423    def __exit__(self, exc_type, exc_value, exc_tb):
1424        for link in self._linked:
1425            try:
1426                os.remove(link)
1427            except IOError as ex:
1428                if verbose:
1429                    print("failed to clean up {}: {}".format(link, ex))
1430
1431    def _call(self, python, args, env, returncode):
1432        import subprocess
1433        cmd = [python, *args]
1434        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1435                             stderr=subprocess.PIPE, env=env)
1436        r = p.communicate()
1437        if p.returncode != returncode:
1438            if verbose:
1439                print(repr(r[0]))
1440                print(repr(r[1]), file=sys.stderr)
1441            raise RuntimeError(
1442                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1443        return r
1444
1445    def call_real(self, *args, returncode=0):
1446        return self._call(self.real, args, None, returncode)
1447
1448    def call_link(self, *args, returncode=0):
1449        return self._call(self.link, args, self._env, returncode)
1450
1451
1452def skip_if_pgo_task(test):
1453    """Skip decorator for tests not run in (non-extended) PGO task"""
1454    ok = not PGO or PGO_EXTENDED
1455    msg = "Not run for (non-extended) PGO task"
1456    return test if ok else unittest.skip(msg)(test)
1457
1458
1459def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1460    """Returns the set of items in ref_api not in other_api, except for a
1461    defined list of items to be ignored in this check.
1462
1463    By default this skips private attributes beginning with '_' but
1464    includes all magic methods, i.e. those starting and ending in '__'.
1465    """
1466    missing_items = set(dir(ref_api)) - set(dir(other_api))
1467    if ignore:
1468        missing_items -= set(ignore)
1469    missing_items = set(m for m in missing_items
1470                        if not m.startswith('_') or m.endswith('__'))
1471    return missing_items
1472
1473
1474def check__all__(test_case, module, name_of_module=None, extra=(),
1475                 not_exported=()):
1476    """Assert that the __all__ variable of 'module' contains all public names.
1477
1478    The module's public names (its API) are detected automatically based on
1479    whether they match the public name convention and were defined in
1480    'module'.
1481
1482    The 'name_of_module' argument can specify (as a string or tuple thereof)
1483    what module(s) an API could be defined in in order to be detected as a
1484    public API. One case for this is when 'module' imports part of its public
1485    API from other modules, possibly a C backend (like 'csv' and its '_csv').
1486
1487    The 'extra' argument can be a set of names that wouldn't otherwise be
1488    automatically detected as "public", like objects without a proper
1489    '__module__' attribute. If provided, it will be added to the
1490    automatically detected ones.
1491
1492    The 'not_exported' argument can be a set of names that must not be treated
1493    as part of the public API even though their names indicate otherwise.
1494
1495    Usage:
1496        import bar
1497        import foo
1498        import unittest
1499        from test import support
1500
1501        class MiscTestCase(unittest.TestCase):
1502            def test__all__(self):
1503                support.check__all__(self, foo)
1504
1505        class OtherTestCase(unittest.TestCase):
1506            def test__all__(self):
1507                extra = {'BAR_CONST', 'FOO_CONST'}
1508                not_exported = {'baz'}  # Undocumented name.
1509                # bar imports part of its API from _bar.
1510                support.check__all__(self, bar, ('bar', '_bar'),
1511                                     extra=extra, not_exported=not_exported)
1512
1513    """
1514
1515    if name_of_module is None:
1516        name_of_module = (module.__name__, )
1517    elif isinstance(name_of_module, str):
1518        name_of_module = (name_of_module, )
1519
1520    expected = set(extra)
1521
1522    for name in dir(module):
1523        if name.startswith('_') or name in not_exported:
1524            continue
1525        obj = getattr(module, name)
1526        if (getattr(obj, '__module__', None) in name_of_module or
1527                (not hasattr(obj, '__module__') and
1528                 not isinstance(obj, types.ModuleType))):
1529            expected.add(name)
1530    test_case.assertCountEqual(module.__all__, expected)
1531
1532
1533def suppress_msvcrt_asserts(verbose=False):
1534    try:
1535        import msvcrt
1536    except ImportError:
1537        return
1538
1539    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1540                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1541                        | msvcrt.SEM_NOGPFAULTERRORBOX
1542                        | msvcrt.SEM_NOOPENFILEERRORBOX)
1543
1544    # CrtSetReportMode() is only available in debug build
1545    if hasattr(msvcrt, 'CrtSetReportMode'):
1546        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1547            if verbose:
1548                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1549                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1550            else:
1551                msvcrt.CrtSetReportMode(m, 0)
1552
1553
1554class SuppressCrashReport:
1555    """Try to prevent a crash report from popping up.
1556
1557    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1558    disable the creation of coredump file.
1559    """
1560    old_value = None
1561    old_modes = None
1562
1563    def __enter__(self):
1564        """On Windows, disable Windows Error Reporting dialogs using
1565        SetErrorMode() and CrtSetReportMode().
1566
1567        On UNIX, try to save the previous core file size limit, then set
1568        soft limit to 0.
1569        """
1570        if sys.platform.startswith('win'):
1571            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1572            try:
1573                import msvcrt
1574            except ImportError:
1575                return
1576
1577            self.old_value = msvcrt.GetErrorMode()
1578
1579            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1580
1581            # bpo-23314: Suppress assert dialogs in debug builds.
1582            # CrtSetReportMode() is only available in debug build.
1583            if hasattr(msvcrt, 'CrtSetReportMode'):
1584                self.old_modes = {}
1585                for report_type in [msvcrt.CRT_WARN,
1586                                    msvcrt.CRT_ERROR,
1587                                    msvcrt.CRT_ASSERT]:
1588                    old_mode = msvcrt.CrtSetReportMode(report_type,
1589                            msvcrt.CRTDBG_MODE_FILE)
1590                    old_file = msvcrt.CrtSetReportFile(report_type,
1591                            msvcrt.CRTDBG_FILE_STDERR)
1592                    self.old_modes[report_type] = old_mode, old_file
1593
1594        else:
1595            try:
1596                import resource
1597                self.resource = resource
1598            except ImportError:
1599                self.resource = None
1600            if self.resource is not None:
1601                try:
1602                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1603                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
1604                                            (0, self.old_value[1]))
1605                except (ValueError, OSError):
1606                    pass
1607
1608            if sys.platform == 'darwin':
1609                import subprocess
1610                # Check if the 'Crash Reporter' on OSX was configured
1611                # in 'Developer' mode and warn that it will get triggered
1612                # when it is.
1613                #
1614                # This assumes that this context manager is used in tests
1615                # that might trigger the next manager.
1616                cmd = ['/usr/bin/defaults', 'read',
1617                       'com.apple.CrashReporter', 'DialogType']
1618                proc = subprocess.Popen(cmd,
1619                                        stdout=subprocess.PIPE,
1620                                        stderr=subprocess.PIPE)
1621                with proc:
1622                    stdout = proc.communicate()[0]
1623                if stdout.strip() == b'developer':
1624                    print("this test triggers the Crash Reporter, "
1625                          "that is intentional", end='', flush=True)
1626
1627        return self
1628
1629    def __exit__(self, *ignore_exc):
1630        """Restore Windows ErrorMode or core file behavior to initial value."""
1631        if self.old_value is None:
1632            return
1633
1634        if sys.platform.startswith('win'):
1635            import msvcrt
1636            msvcrt.SetErrorMode(self.old_value)
1637
1638            if self.old_modes:
1639                for report_type, (old_mode, old_file) in self.old_modes.items():
1640                    msvcrt.CrtSetReportMode(report_type, old_mode)
1641                    msvcrt.CrtSetReportFile(report_type, old_file)
1642        else:
1643            if self.resource is not None:
1644                try:
1645                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1646                except (ValueError, OSError):
1647                    pass
1648
1649
1650def patch(test_instance, object_to_patch, attr_name, new_value):
1651    """Override 'object_to_patch'.'attr_name' with 'new_value'.
1652
1653    Also, add a cleanup procedure to 'test_instance' to restore
1654    'object_to_patch' value for 'attr_name'.
1655    The 'attr_name' should be a valid attribute for 'object_to_patch'.
1656
1657    """
1658    # check that 'attr_name' is a real attribute for 'object_to_patch'
1659    # will raise AttributeError if it does not exist
1660    getattr(object_to_patch, attr_name)
1661
1662    # keep a copy of the old value
1663    attr_is_local = False
1664    try:
1665        old_value = object_to_patch.__dict__[attr_name]
1666    except (AttributeError, KeyError):
1667        old_value = getattr(object_to_patch, attr_name, None)
1668    else:
1669        attr_is_local = True
1670
1671    # restore the value when the test is done
1672    def cleanup():
1673        if attr_is_local:
1674            setattr(object_to_patch, attr_name, old_value)
1675        else:
1676            delattr(object_to_patch, attr_name)
1677
1678    test_instance.addCleanup(cleanup)
1679
1680    # actually override the attribute
1681    setattr(object_to_patch, attr_name, new_value)
1682
1683
1684def run_in_subinterp(code):
1685    """
1686    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1687    module is enabled.
1688    """
1689    # Issue #10915, #15751: PyGILState_*() functions don't work with
1690    # sub-interpreters, the tracemalloc module uses these functions internally
1691    try:
1692        import tracemalloc
1693    except ImportError:
1694        pass
1695    else:
1696        if tracemalloc.is_tracing():
1697            raise unittest.SkipTest("run_in_subinterp() cannot be used "
1698                                     "if tracemalloc module is tracing "
1699                                     "memory allocations")
1700    import _testcapi
1701    return _testcapi.run_in_subinterp(code)
1702
1703
1704def check_free_after_iterating(test, iter, cls, args=()):
1705    class A(cls):
1706        def __del__(self):
1707            nonlocal done
1708            done = True
1709            try:
1710                next(it)
1711            except StopIteration:
1712                pass
1713
1714    done = False
1715    it = iter(A(*args))
1716    # Issue 26494: Shouldn't crash
1717    test.assertRaises(StopIteration, next, it)
1718    # The sequence should be deallocated just after the end of iterating
1719    gc_collect()
1720    test.assertTrue(done)
1721
1722
1723def missing_compiler_executable(cmd_names=[]):
1724    """Check if the compiler components used to build the interpreter exist.
1725
1726    Check for the existence of the compiler executables whose names are listed
1727    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1728    and return the first missing executable or None when none is found
1729    missing.
1730
1731    """
1732    # TODO (PEP 632): alternate check without using distutils
1733    from distutils import ccompiler, sysconfig, spawn, errors
1734    compiler = ccompiler.new_compiler()
1735    sysconfig.customize_compiler(compiler)
1736    if compiler.compiler_type == "msvc":
1737        # MSVC has no executables, so check whether initialization succeeds
1738        try:
1739            compiler.initialize()
1740        except errors.DistutilsPlatformError:
1741            return "msvc"
1742    for name in compiler.executables:
1743        if cmd_names and name not in cmd_names:
1744            continue
1745        cmd = getattr(compiler, name)
1746        if cmd_names:
1747            assert cmd is not None, \
1748                    "the '%s' executable is not configured" % name
1749        elif not cmd:
1750            continue
1751        if spawn.find_executable(cmd[0]) is None:
1752            return cmd[0]
1753
1754
1755_is_android_emulator = None
1756def setswitchinterval(interval):
1757    # Setting a very low gil interval on the Android emulator causes python
1758    # to hang (issue #26939).
1759    minimum_interval = 1e-5
1760    if is_android and interval < minimum_interval:
1761        global _is_android_emulator
1762        if _is_android_emulator is None:
1763            import subprocess
1764            _is_android_emulator = (subprocess.check_output(
1765                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
1766        if _is_android_emulator:
1767            interval = minimum_interval
1768    return sys.setswitchinterval(interval)
1769
1770
1771@contextlib.contextmanager
1772def disable_faulthandler():
1773    import faulthandler
1774
1775    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1776    # sys.stderr with a StringIO which has no file descriptor when a test
1777    # is run with -W/--verbose3.
1778    fd = sys.__stderr__.fileno()
1779
1780    is_enabled = faulthandler.is_enabled()
1781    try:
1782        faulthandler.disable()
1783        yield
1784    finally:
1785        if is_enabled:
1786            faulthandler.enable(file=fd, all_threads=True)
1787
1788
1789class SaveSignals:
1790    """
1791    Save and restore signal handlers.
1792
1793    This class is only able to save/restore signal handlers registered
1794    by the Python signal module: see bpo-13285 for "external" signal
1795    handlers.
1796    """
1797
1798    def __init__(self):
1799        import signal
1800        self.signal = signal
1801        self.signals = signal.valid_signals()
1802        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1803        for signame in ('SIGKILL', 'SIGSTOP'):
1804            try:
1805                signum = getattr(signal, signame)
1806            except AttributeError:
1807                continue
1808            self.signals.remove(signum)
1809        self.handlers = {}
1810
1811    def save(self):
1812        for signum in self.signals:
1813            handler = self.signal.getsignal(signum)
1814            if handler is None:
1815                # getsignal() returns None if a signal handler was not
1816                # registered by the Python signal module,
1817                # and the handler is not SIG_DFL nor SIG_IGN.
1818                #
1819                # Ignore the signal: we cannot restore the handler.
1820                continue
1821            self.handlers[signum] = handler
1822
1823    def restore(self):
1824        for signum, handler in self.handlers.items():
1825            self.signal.signal(signum, handler)
1826
1827
1828def with_pymalloc():
1829    import _testcapi
1830    return _testcapi.WITH_PYMALLOC
1831
1832
1833class _ALWAYS_EQ:
1834    """
1835    Object that is equal to anything.
1836    """
1837    def __eq__(self, other):
1838        return True
1839    def __ne__(self, other):
1840        return False
1841
1842ALWAYS_EQ = _ALWAYS_EQ()
1843
1844class _NEVER_EQ:
1845    """
1846    Object that is not equal to anything.
1847    """
1848    def __eq__(self, other):
1849        return False
1850    def __ne__(self, other):
1851        return True
1852    def __hash__(self):
1853        return 1
1854
1855NEVER_EQ = _NEVER_EQ()
1856
1857@functools.total_ordering
1858class _LARGEST:
1859    """
1860    Object that is greater than anything (except itself).
1861    """
1862    def __eq__(self, other):
1863        return isinstance(other, _LARGEST)
1864    def __lt__(self, other):
1865        return False
1866
1867LARGEST = _LARGEST()
1868
1869@functools.total_ordering
1870class _SMALLEST:
1871    """
1872    Object that is less than anything (except itself).
1873    """
1874    def __eq__(self, other):
1875        return isinstance(other, _SMALLEST)
1876    def __gt__(self, other):
1877        return False
1878
1879SMALLEST = _SMALLEST()
1880
1881def maybe_get_event_loop_policy():
1882    """Return the global event loop policy if one is set, else return None."""
1883    import asyncio.events
1884    return asyncio.events._event_loop_policy
1885
1886# Helpers for testing hashing.
1887NHASHBITS = sys.hash_info.width # number of bits in hash() result
1888assert NHASHBITS in (32, 64)
1889
1890# Return mean and sdev of number of collisions when tossing nballs balls
1891# uniformly at random into nbins bins.  By definition, the number of
1892# collisions is the number of balls minus the number of occupied bins at
1893# the end.
1894def collision_stats(nbins, nballs):
1895    n, k = nbins, nballs
1896    # prob a bin empty after k trials = (1 - 1/n)**k
1897    # mean # empty is then n * (1 - 1/n)**k
1898    # so mean # occupied is n - n * (1 - 1/n)**k
1899    # so collisions = k - (n - n*(1 - 1/n)**k)
1900    #
1901    # For the variance:
1902    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
1903    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
1904    #
1905    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
1906    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
1907    # error-prone) efforts to rework the naive formulas to avoid those,
1908    # we use the `decimal` module to get plenty of extra precision.
1909    #
1910    # Note:  the exact values are straightforward to compute with
1911    # rationals, but in context that's unbearably slow, requiring
1912    # multi-million bit arithmetic.
1913    import decimal
1914    with decimal.localcontext() as ctx:
1915        bits = n.bit_length() * 2  # bits in n**2
1916        # At least that many bits will likely cancel out.
1917        # Use that many decimal digits instead.
1918        ctx.prec = max(bits, 30)
1919        dn = decimal.Decimal(n)
1920        p1empty = ((dn - 1) / dn) ** k
1921        meanempty = n * p1empty
1922        occupied = n - meanempty
1923        collisions = k - occupied
1924        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
1925        return float(collisions), float(var.sqrt())
1926
1927
1928class catch_unraisable_exception:
1929    """
1930    Context manager catching unraisable exception using sys.unraisablehook.
1931
1932    Storing the exception value (cm.unraisable.exc_value) creates a reference
1933    cycle. The reference cycle is broken explicitly when the context manager
1934    exits.
1935
1936    Storing the object (cm.unraisable.object) can resurrect it if it is set to
1937    an object which is being finalized. Exiting the context manager clears the
1938    stored object.
1939
1940    Usage:
1941
1942        with support.catch_unraisable_exception() as cm:
1943            # code creating an "unraisable exception"
1944            ...
1945
1946            # check the unraisable exception: use cm.unraisable
1947            ...
1948
1949        # cm.unraisable attribute no longer exists at this point
1950        # (to break a reference cycle)
1951    """
1952
1953    def __init__(self):
1954        self.unraisable = None
1955        self._old_hook = None
1956
1957    def _hook(self, unraisable):
1958        # Storing unraisable.object can resurrect an object which is being
1959        # finalized. Storing unraisable.exc_value creates a reference cycle.
1960        self.unraisable = unraisable
1961
1962    def __enter__(self):
1963        self._old_hook = sys.unraisablehook
1964        sys.unraisablehook = self._hook
1965        return self
1966
1967    def __exit__(self, *exc_info):
1968        sys.unraisablehook = self._old_hook
1969        del self.unraisable
1970
1971
1972def wait_process(pid, *, exitcode, timeout=None):
1973    """
1974    Wait until process pid completes and check that the process exit code is
1975    exitcode.
1976
1977    Raise an AssertionError if the process exit code is not equal to exitcode.
1978
1979    If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
1980    kill the process (if signal.SIGKILL is available) and raise an
1981    AssertionError. The timeout feature is not available on Windows.
1982    """
1983    if os.name != "nt":
1984        import signal
1985
1986        if timeout is None:
1987            timeout = SHORT_TIMEOUT
1988        t0 = time.monotonic()
1989        sleep = 0.001
1990        max_sleep = 0.1
1991        while True:
1992            pid2, status = os.waitpid(pid, os.WNOHANG)
1993            if pid2 != 0:
1994                break
1995            # process is still running
1996
1997            dt = time.monotonic() - t0
1998            if dt > SHORT_TIMEOUT:
1999                try:
2000                    os.kill(pid, signal.SIGKILL)
2001                    os.waitpid(pid, 0)
2002                except OSError:
2003                    # Ignore errors like ChildProcessError or PermissionError
2004                    pass
2005
2006                raise AssertionError(f"process {pid} is still running "
2007                                     f"after {dt:.1f} seconds")
2008
2009            sleep = min(sleep * 2, max_sleep)
2010            time.sleep(sleep)
2011    else:
2012        # Windows implementation
2013        pid2, status = os.waitpid(pid, 0)
2014
2015    exitcode2 = os.waitstatus_to_exitcode(status)
2016    if exitcode2 != exitcode:
2017        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
2018                             f"but exit code {exitcode} is expected")
2019
2020    # sanity check: it should not fail in practice
2021    if pid2 != pid:
2022        raise AssertionError(f"pid {pid2} != pid {pid}")
2023
2024def skip_if_broken_multiprocessing_synchronize():
2025    """
2026    Skip tests if the multiprocessing.synchronize module is missing, if there
2027    is no available semaphore implementation, or if creating a lock raises an
2028    OSError (on Linux only).
2029    """
2030    from .import_helper import import_module
2031
2032    # Skip tests if the _multiprocessing extension is missing.
2033    import_module('_multiprocessing')
2034
2035    # Skip tests if there is no available semaphore implementation:
2036    # multiprocessing.synchronize requires _multiprocessing.SemLock.
2037    synchronize = import_module('multiprocessing.synchronize')
2038
2039    if sys.platform == "linux":
2040        try:
2041            # bpo-38377: On Linux, creating a semaphore fails with OSError
2042            # if the current user does not have the permission to create
2043            # a file in /dev/shm/ directory.
2044            synchronize.Lock(ctx=None)
2045        except OSError as exc:
2046            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2047
2048
2049@contextlib.contextmanager
2050def infinite_recursion(max_depth=75):
2051    original_depth = sys.getrecursionlimit()
2052    try:
2053        sys.setrecursionlimit(max_depth)
2054        yield
2055    finally:
2056        sys.setrecursionlimit(original_depth)
2057
2058
2059def check_disallow_instantiation(testcase, tp, *args, **kwds):
2060    """
2061    Check that given type cannot be instantiated using *args and **kwds.
2062
2063    See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2064    """
2065    mod = tp.__module__
2066    name = tp.__name__
2067    if mod != 'builtins':
2068        qualname = f"{mod}.{name}"
2069    else:
2070        qualname = f"{name}"
2071    msg = f"cannot create '{re.escape(qualname)}' instances"
2072    testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2073
2074
2075def ignore_deprecations_from(module: str, *, like: str) -> object:
2076    token = object()
2077    warnings.filterwarnings(
2078        "ignore",
2079        category=DeprecationWarning,
2080        module=module,
2081        message=like + fr"(?#support{id(token)})",
2082    )
2083    return token
2084
2085
2086def clear_ignored_deprecations(*tokens: object) -> None:
2087    if not tokens:
2088        raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2089
2090    new_filters = []
2091    endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2092    for action, message, category, module, lineno in warnings.filters:
2093        if action == "ignore" and category is DeprecationWarning:
2094            if isinstance(message, re.Pattern):
2095                msg = message.pattern
2096            else:
2097                msg = message or ""
2098            if msg.endswith(endswith):
2099                continue
2100        new_filters.append((action, message, category, module, lineno))
2101    if warnings.filters != new_filters:
2102        warnings.filters[:] = new_filters
2103        warnings._filters_mutated()
2104