• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import contextlib
7import dataclasses
8import functools
9import _opcode
10import os
11import re
12import stat
13import sys
14import sysconfig
15import textwrap
16import time
17import types
18import unittest
19import warnings
20
21
22__all__ = [
23    # globals
24    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
25    # exceptions
26    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
27    # io
28    "record_original_stdout", "get_original_stdout", "captured_stdout",
29    "captured_stdin", "captured_stderr", "captured_output",
30    # unittest
31    "is_resource_enabled", "requires", "requires_freebsd_version",
32    "requires_gil_enabled", "requires_linux_version", "requires_mac_ver",
33    "check_syntax_error",
34    "requires_gzip", "requires_bz2", "requires_lzma",
35    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
36    "requires_IEEE_754", "requires_zlib",
37    "has_fork_support", "requires_fork",
38    "has_subprocess_support", "requires_subprocess",
39    "has_socket_support", "requires_working_socket",
40    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
41    "check__all__", "skip_if_buggy_ucrt_strfptime",
42    "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
43    "requires_limited_api", "requires_specialization",
44    # sys
45    "MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi",
46    "is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval",
47    # os
48    "get_pagesize",
49    # network
50    "open_urlresource",
51    # processes
52    "reap_children",
53    # miscellaneous
54    "run_with_locale", "swap_item", "findfile", "infinite_recursion",
55    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
56    "run_with_tz", "PGO", "missing_compiler_executable",
57    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
58    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
59    "Py_DEBUG", "exceeds_recursion_limit", "get_c_recursion_limit",
60    "skip_on_s390x",
61    "without_optimizer",
62    "force_not_colorized",
63    "BrokenIter",
64    ]
65
66
67# Timeout in seconds for tests using a network server listening on the network
68# local loopback interface like 127.0.0.1.
69#
70# The timeout is long enough to prevent test failure: it takes into account
71# that the client and the server can run in different threads or even different
72# processes.
73#
74# The timeout should be long enough for connect(), recv() and send() methods
75# of socket.socket.
76LOOPBACK_TIMEOUT = 10.0
77
78# Timeout in seconds for network requests going to the internet. The timeout is
79# short enough to prevent a test to wait for too long if the internet request
80# is blocked for whatever reason.
81#
82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
83# but skip the test instead: see transient_internet().
84INTERNET_TIMEOUT = 60.0
85
86# Timeout in seconds to mark a test as failed if the test takes "too long".
87#
88# The timeout value depends on the regrtest --timeout command line option.
89#
90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
91# LONG_TIMEOUT instead.
92SHORT_TIMEOUT = 30.0
93
94# Timeout in seconds to detect when a test hangs.
95#
96# It is long enough to reduce the risk of test failure on the slowest Python
97# buildbots. It should not be used to mark a test as failed if the test takes
98# "too long". The timeout value depends on the regrtest --timeout command line
99# option.
100LONG_TIMEOUT = 5 * 60.0
101
102# TEST_HOME_DIR refers to the top level directory of the "test" package
103# that contains Python's regression test suite
104TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
105TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
106STDLIB_DIR = os.path.dirname(TEST_HOME_DIR)
107REPO_ROOT = os.path.dirname(STDLIB_DIR)
108
109
110class Error(Exception):
111    """Base class for regression test exceptions."""
112
113class TestFailed(Error):
114    """Test failed."""
115    def __init__(self, msg, *args, stats=None):
116        self.msg = msg
117        self.stats = stats
118        super().__init__(msg, *args)
119
120    def __str__(self):
121        return self.msg
122
123class TestFailedWithDetails(TestFailed):
124    """Test failed."""
125    def __init__(self, msg, errors, failures, stats):
126        self.errors = errors
127        self.failures = failures
128        super().__init__(msg, errors, failures, stats=stats)
129
130class TestDidNotRun(Error):
131    """Test did not run any subtests."""
132
133class ResourceDenied(unittest.SkipTest):
134    """Test skipped because it requested a disallowed resource.
135
136    This is raised when a test calls requires() for a resource that
137    has not be enabled.  It is used to distinguish between expected
138    and unexpected skips.
139    """
140
141def anticipate_failure(condition):
142    """Decorator to mark a test that is known to be broken in some cases
143
144       Any use of this decorator should have a comment identifying the
145       associated tracker issue.
146    """
147    if condition:
148        return unittest.expectedFailure
149    return lambda f: f
150
151def load_package_tests(pkg_dir, loader, standard_tests, pattern):
152    """Generic load_tests implementation for simple test packages.
153
154    Most packages can implement load_tests using this function as follows:
155
156       def load_tests(*args):
157           return load_package_tests(os.path.dirname(__file__), *args)
158    """
159    if pattern is None:
160        pattern = "test*"
161    top_dir = STDLIB_DIR
162    package_tests = loader.discover(start_dir=pkg_dir,
163                                    top_level_dir=top_dir,
164                                    pattern=pattern)
165    standard_tests.addTests(package_tests)
166    return standard_tests
167
168
169def get_attribute(obj, name):
170    """Get an attribute, raising SkipTest if AttributeError is raised."""
171    try:
172        attribute = getattr(obj, name)
173    except AttributeError:
174        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
175    else:
176        return attribute
177
178verbose = 1              # Flag set to 0 by regrtest.py
179use_resources = None     # Flag set to [] by regrtest.py
180max_memuse = 0           # Disable bigmem tests (they will still be run with
181                         # small sizes, to make sure they work.)
182real_max_memuse = 0
183junit_xml_list = None    # list of testsuite XML elements
184failfast = False
185
186# _original_stdout is meant to hold stdout at the time regrtest began.
187# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
188# The point is to have some flavor of stdout the user can actually see.
189_original_stdout = None
190def record_original_stdout(stdout):
191    global _original_stdout
192    _original_stdout = stdout
193
194def get_original_stdout():
195    return _original_stdout or sys.stdout
196
197
198def _force_run(path, func, *args):
199    try:
200        return func(*args)
201    except FileNotFoundError as err:
202        # chmod() won't fix a missing file.
203        if verbose >= 2:
204            print('%s: %s' % (err.__class__.__name__, err))
205        raise
206    except OSError as err:
207        if verbose >= 2:
208            print('%s: %s' % (err.__class__.__name__, err))
209            print('re-run %s%r' % (func.__name__, args))
210        os.chmod(path, stat.S_IRWXU)
211        return func(*args)
212
213
214# Check whether a gui is actually available
215def _is_gui_available():
216    if hasattr(_is_gui_available, 'result'):
217        return _is_gui_available.result
218    import platform
219    reason = None
220    if sys.platform.startswith('win') and platform.win32_is_iot():
221        reason = "gui is not available on Windows IoT Core"
222    elif sys.platform.startswith('win'):
223        # if Python is running as a service (such as the buildbot service),
224        # gui interaction may be disallowed
225        import ctypes
226        import ctypes.wintypes
227        UOI_FLAGS = 1
228        WSF_VISIBLE = 0x0001
229        class USEROBJECTFLAGS(ctypes.Structure):
230            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
231                        ("fReserved", ctypes.wintypes.BOOL),
232                        ("dwFlags", ctypes.wintypes.DWORD)]
233        dll = ctypes.windll.user32
234        h = dll.GetProcessWindowStation()
235        if not h:
236            raise ctypes.WinError()
237        uof = USEROBJECTFLAGS()
238        needed = ctypes.wintypes.DWORD()
239        res = dll.GetUserObjectInformationW(h,
240            UOI_FLAGS,
241            ctypes.byref(uof),
242            ctypes.sizeof(uof),
243            ctypes.byref(needed))
244        if not res:
245            raise ctypes.WinError()
246        if not bool(uof.dwFlags & WSF_VISIBLE):
247            reason = "gui not available (WSF_VISIBLE flag not set)"
248    elif sys.platform == 'darwin':
249        # The Aqua Tk implementations on OS X can abort the process if
250        # being called in an environment where a window server connection
251        # cannot be made, for instance when invoked by a buildbot or ssh
252        # process not running under the same user id as the current console
253        # user.  To avoid that, raise an exception if the window manager
254        # connection is not available.
255        import subprocess
256        try:
257            rc = subprocess.run(["launchctl", "managername"],
258                                capture_output=True, check=True)
259            managername = rc.stdout.decode("utf-8").strip()
260        except subprocess.CalledProcessError:
261            reason = "unable to detect macOS launchd job manager"
262        else:
263            if managername != "Aqua":
264                reason = f"{managername=} -- can only run in a macOS GUI session"
265
266    # check on every platform whether tkinter can actually do anything
267    if not reason:
268        try:
269            from tkinter import Tk
270            root = Tk()
271            root.withdraw()
272            root.update()
273            root.destroy()
274        except Exception as e:
275            err_string = str(e)
276            if len(err_string) > 50:
277                err_string = err_string[:50] + ' [...]'
278            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
279                                                           err_string)
280
281    _is_gui_available.reason = reason
282    _is_gui_available.result = not reason
283
284    return _is_gui_available.result
285
286def is_resource_enabled(resource):
287    """Test whether a resource is enabled.
288
289    Known resources are set by regrtest.py.  If not running under regrtest.py,
290    all resources are assumed enabled unless use_resources has been set.
291    """
292    return use_resources is None or resource in use_resources
293
294def requires(resource, msg=None):
295    """Raise ResourceDenied if the specified resource is not available."""
296    if not is_resource_enabled(resource):
297        if msg is None:
298            msg = "Use of the %r resource not enabled" % resource
299        raise ResourceDenied(msg)
300    if resource in {"network", "urlfetch"} and not has_socket_support:
301        raise ResourceDenied("No socket support")
302    if resource == 'gui' and not _is_gui_available():
303        raise ResourceDenied(_is_gui_available.reason)
304
305def _requires_unix_version(sysname, min_version):
306    """Decorator raising SkipTest if the OS is `sysname` and the version is less
307    than `min_version`.
308
309    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
310    the FreeBSD version is less than 7.2.
311    """
312    import platform
313    min_version_txt = '.'.join(map(str, min_version))
314    version_txt = platform.release().split('-', 1)[0]
315    if platform.system() == sysname:
316        try:
317            version = tuple(map(int, version_txt.split('.')))
318        except ValueError:
319            skip = False
320        else:
321            skip = version < min_version
322    else:
323        skip = False
324
325    return unittest.skipIf(
326        skip,
327        f"{sysname} version {min_version_txt} or higher required, not "
328        f"{version_txt}"
329    )
330
331
332def requires_freebsd_version(*min_version):
333    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
334    less than `min_version`.
335
336    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
337    version is less than 7.2.
338    """
339    return _requires_unix_version('FreeBSD', min_version)
340
341def requires_linux_version(*min_version):
342    """Decorator raising SkipTest if the OS is Linux and the Linux version is
343    less than `min_version`.
344
345    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
346    version is less than 2.6.32.
347    """
348    return _requires_unix_version('Linux', min_version)
349
350def requires_mac_ver(*min_version):
351    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
352    version if less than min_version.
353
354    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
355    is lesser than 10.5.
356    """
357    def decorator(func):
358        @functools.wraps(func)
359        def wrapper(*args, **kw):
360            if sys.platform == 'darwin':
361                import platform
362                version_txt = platform.mac_ver()[0]
363                try:
364                    version = tuple(map(int, version_txt.split('.')))
365                except ValueError:
366                    pass
367                else:
368                    if version < min_version:
369                        min_version_txt = '.'.join(map(str, min_version))
370                        raise unittest.SkipTest(
371                            "Mac OS X %s or higher required, not %s"
372                            % (min_version_txt, version_txt))
373            return func(*args, **kw)
374        wrapper.min_version = min_version
375        return wrapper
376    return decorator
377
378
379def skip_if_buildbot(reason=None):
380    """Decorator raising SkipTest if running on a buildbot."""
381    import getpass
382    if not reason:
383        reason = 'not suitable for buildbots'
384    try:
385        isbuildbot = getpass.getuser().lower() == 'buildbot'
386    except (KeyError, OSError) as err:
387        warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning)
388        isbuildbot = False
389    return unittest.skipIf(isbuildbot, reason)
390
391def check_sanitizer(*, address=False, memory=False, ub=False, thread=False):
392    """Returns True if Python is compiled with sanitizer support"""
393    if not (address or memory or ub or thread):
394        raise ValueError('At least one of address, memory, ub or thread must be True')
395
396
397    cflags = sysconfig.get_config_var('CFLAGS') or ''
398    config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
399    memory_sanitizer = (
400        '-fsanitize=memory' in cflags or
401        '--with-memory-sanitizer' in config_args
402    )
403    address_sanitizer = (
404        '-fsanitize=address' in cflags or
405        '--with-address-sanitizer' in config_args
406    )
407    ub_sanitizer = (
408        '-fsanitize=undefined' in cflags or
409        '--with-undefined-behavior-sanitizer' in config_args
410    )
411    thread_sanitizer = (
412        '-fsanitize=thread' in cflags or
413        '--with-thread-sanitizer' in config_args
414    )
415    return (
416        (memory and memory_sanitizer) or
417        (address and address_sanitizer) or
418        (ub and ub_sanitizer) or
419        (thread and thread_sanitizer)
420    )
421
422
423def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False, thread=False):
424    """Decorator raising SkipTest if running with a sanitizer active."""
425    if not reason:
426        reason = 'not working with sanitizers active'
427    skip = check_sanitizer(address=address, memory=memory, ub=ub, thread=thread)
428    return unittest.skipIf(skip, reason)
429
430# gh-89363: True if fork() can hang if Python is built with Address Sanitizer
431# (ASAN): libasan race condition, dead lock in pthread_create().
432HAVE_ASAN_FORK_BUG = check_sanitizer(address=True)
433
434
435def set_sanitizer_env_var(env, option):
436    for name in ('ASAN_OPTIONS', 'MSAN_OPTIONS', 'UBSAN_OPTIONS', 'TSAN_OPTIONS'):
437        if name in env:
438            env[name] += f':{option}'
439        else:
440            env[name] = option
441
442
443def system_must_validate_cert(f):
444    """Skip the test on TLS certificate validation failures."""
445    @functools.wraps(f)
446    def dec(*args, **kwargs):
447        try:
448            f(*args, **kwargs)
449        except OSError as e:
450            if "CERTIFICATE_VERIFY_FAILED" in str(e):
451                raise unittest.SkipTest("system does not contain "
452                                        "necessary certificates")
453            raise
454    return dec
455
456# A constant likely larger than the underlying OS pipe buffer size, to
457# make writes blocking.
458# Windows limit seems to be around 512 B, and many Unix kernels have a
459# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
460# (see issue #17835 for a discussion of this number).
461PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
462
463# A constant likely larger than the underlying OS socket buffer size, to make
464# writes blocking.
465# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
466# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
467# for a discussion of this number.
468SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
469
470# decorator for skipping tests on non-IEEE 754 platforms
471requires_IEEE_754 = unittest.skipUnless(
472    float.__getformat__("double").startswith("IEEE"),
473    "test requires IEEE 754 doubles")
474
475def requires_zlib(reason='requires zlib'):
476    try:
477        import zlib
478    except ImportError:
479        zlib = None
480    return unittest.skipUnless(zlib, reason)
481
482def requires_gzip(reason='requires gzip'):
483    try:
484        import gzip
485    except ImportError:
486        gzip = None
487    return unittest.skipUnless(gzip, reason)
488
489def requires_bz2(reason='requires bz2'):
490    try:
491        import bz2
492    except ImportError:
493        bz2 = None
494    return unittest.skipUnless(bz2, reason)
495
496def requires_lzma(reason='requires lzma'):
497    try:
498        import lzma
499    except ImportError:
500        lzma = None
501    return unittest.skipUnless(lzma, reason)
502
503def has_no_debug_ranges():
504    try:
505        import _testinternalcapi
506    except ImportError:
507        raise unittest.SkipTest("_testinternalcapi required")
508    config = _testinternalcapi.get_config()
509    return not bool(config['code_debug_ranges'])
510
511def requires_debug_ranges(reason='requires co_positions / debug_ranges'):
512    return unittest.skipIf(has_no_debug_ranges(), reason)
513
514@contextlib.contextmanager
515def suppress_immortalization(suppress=True):
516    """Suppress immortalization of deferred objects."""
517    try:
518        import _testinternalcapi
519    except ImportError:
520        yield
521        return
522
523    if not suppress:
524        yield
525        return
526
527    _testinternalcapi.suppress_immortalization(True)
528    try:
529        yield
530    finally:
531        _testinternalcapi.suppress_immortalization(False)
532
533def skip_if_suppress_immortalization():
534    try:
535        import _testinternalcapi
536    except ImportError:
537        return
538    return unittest.skipUnless(_testinternalcapi.get_immortalize_deferred(),
539                                "requires immortalization of deferred objects")
540
541
542MS_WINDOWS = (sys.platform == 'win32')
543
544# Is not actually used in tests, but is kept for compatibility.
545is_jython = sys.platform.startswith('java')
546
547is_android = sys.platform == "android"
548
549if sys.platform not in {"win32", "vxworks", "ios", "tvos", "watchos"}:
550    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
551else:
552    unix_shell = None
553
554# wasm32-emscripten and -wasi are POSIX-like but do not
555# have subprocess or fork support.
556is_emscripten = sys.platform == "emscripten"
557is_wasi = sys.platform == "wasi"
558
559is_apple_mobile = sys.platform in {"ios", "tvos", "watchos"}
560is_apple = is_apple_mobile or sys.platform == "darwin"
561
562has_fork_support = hasattr(os, "fork") and not (
563    # WASM and Apple mobile platforms do not support subprocesses.
564    is_emscripten
565    or is_wasi
566    or is_apple_mobile
567
568    # Although Android supports fork, it's unsafe to call it from Python because
569    # all Android apps are multi-threaded.
570    or is_android
571)
572
573def requires_fork():
574    return unittest.skipUnless(has_fork_support, "requires working os.fork()")
575
576has_subprocess_support = not (
577    # WASM and Apple mobile platforms do not support subprocesses.
578    is_emscripten
579    or is_wasi
580    or is_apple_mobile
581
582    # Although Android supports subproceses, they're almost never useful in
583    # practice (see PEP 738). And most of the tests that use them are calling
584    # sys.executable, which won't work when Python is embedded in an Android app.
585    or is_android
586)
587
588def requires_subprocess():
589    """Used for subprocess, os.spawn calls, fd inheritance"""
590    return unittest.skipUnless(has_subprocess_support, "requires subprocess support")
591
592# Emscripten's socket emulation and WASI sockets have limitations.
593has_socket_support = not (
594    is_emscripten
595    or is_wasi
596)
597
598def requires_working_socket(*, module=False):
599    """Skip tests or modules that require working sockets
600
601    Can be used as a function/class decorator or to skip an entire module.
602    """
603    msg = "requires socket support"
604    if module:
605        if not has_socket_support:
606            raise unittest.SkipTest(msg)
607    else:
608        return unittest.skipUnless(has_socket_support, msg)
609
610# Does strftime() support glibc extension like '%4Y'?
611has_strftime_extensions = False
612if sys.platform != "win32":
613    # bpo-47037: Windows debug builds crash with "Debug Assertion Failed"
614    try:
615        has_strftime_extensions = time.strftime("%4Y") != "%4Y"
616    except ValueError:
617        pass
618
619# Define the URL of a dedicated HTTP server for the network tests.
620# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
621TEST_HTTP_URL = "http://www.pythontest.net"
622
623# Set by libregrtest/main.py so we can skip tests that are not
624# useful for PGO
625PGO = False
626
627# Set by libregrtest/main.py if we are running the extended (time consuming)
628# PGO task.  If this is True, PGO is also True.
629PGO_EXTENDED = False
630
631# TEST_DATA_DIR is used as a target download location for remote resources
632TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
633
634
635def darwin_malloc_err_warning(test_name):
636    """Assure user that loud errors generated by macOS libc's malloc are
637    expected."""
638    if sys.platform != 'darwin':
639        return
640
641    import shutil
642    msg = ' NOTICE '
643    detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
644              'warnings on macOS systems. This behavior is known. Do not\n'
645              'report a bug unless tests are also failing.\n'
646              'See https://github.com/python/cpython/issues/85100')
647
648    padding, _ = shutil.get_terminal_size()
649    print(msg.center(padding, '-'))
650    print(detail)
651    print('-' * padding)
652
653
654def findfile(filename, subdir=None):
655    """Try to find a file on sys.path or in the test directory.  If it is not
656    found the argument passed to the function is returned (this does not
657    necessarily signal failure; could still be the legitimate path).
658
659    Setting *subdir* indicates a relative path to use to find the file
660    rather than looking directly in the path directories.
661    """
662    if os.path.isabs(filename):
663        return filename
664    if subdir is not None:
665        filename = os.path.join(subdir, filename)
666    path = [TEST_HOME_DIR] + sys.path
667    for dn in path:
668        fn = os.path.join(dn, filename)
669        if os.path.exists(fn): return fn
670    return filename
671
672
673def sortdict(dict):
674    "Like repr(dict), but in sorted order."
675    items = sorted(dict.items())
676    reprpairs = ["%r: %r" % pair for pair in items]
677    withcommas = ", ".join(reprpairs)
678    return "{%s}" % withcommas
679
680
681def run_code(code: str) -> dict[str, object]:
682    """Run a piece of code after dedenting it, and return its global namespace."""
683    ns = {}
684    exec(textwrap.dedent(code), ns)
685    return ns
686
687
688def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
689    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
690        compile(statement, '<test string>', 'exec')
691    err = cm.exception
692    testcase.assertIsNotNone(err.lineno)
693    if lineno is not None:
694        testcase.assertEqual(err.lineno, lineno)
695    testcase.assertIsNotNone(err.offset)
696    if offset is not None:
697        testcase.assertEqual(err.offset, offset)
698
699
700def open_urlresource(url, *args, **kw):
701    import urllib.request, urllib.parse
702    from .os_helper import unlink
703    try:
704        import gzip
705    except ImportError:
706        gzip = None
707
708    check = kw.pop('check', None)
709
710    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
711
712    fn = os.path.join(TEST_DATA_DIR, filename)
713
714    def check_valid_file(fn):
715        f = open(fn, *args, **kw)
716        if check is None:
717            return f
718        elif check(f):
719            f.seek(0)
720            return f
721        f.close()
722
723    if os.path.exists(fn):
724        f = check_valid_file(fn)
725        if f is not None:
726            return f
727        unlink(fn)
728
729    # Verify the requirement before downloading the file
730    requires('urlfetch')
731
732    if verbose:
733        print('\tfetching %s ...' % url, file=get_original_stdout())
734    opener = urllib.request.build_opener()
735    if gzip:
736        opener.addheaders.append(('Accept-Encoding', 'gzip'))
737    f = opener.open(url, timeout=INTERNET_TIMEOUT)
738    if gzip and f.headers.get('Content-Encoding') == 'gzip':
739        f = gzip.GzipFile(fileobj=f)
740    try:
741        with open(fn, "wb") as out:
742            s = f.read()
743            while s:
744                out.write(s)
745                s = f.read()
746    finally:
747        f.close()
748
749    f = check_valid_file(fn)
750    if f is not None:
751        return f
752    raise TestFailed('invalid resource %r' % fn)
753
754
755@contextlib.contextmanager
756def captured_output(stream_name):
757    """Return a context manager used by captured_stdout/stdin/stderr
758    that temporarily replaces the sys stream *stream_name* with a StringIO."""
759    import io
760    orig_stdout = getattr(sys, stream_name)
761    setattr(sys, stream_name, io.StringIO())
762    try:
763        yield getattr(sys, stream_name)
764    finally:
765        setattr(sys, stream_name, orig_stdout)
766
767def captured_stdout():
768    """Capture the output of sys.stdout:
769
770       with captured_stdout() as stdout:
771           print("hello")
772       self.assertEqual(stdout.getvalue(), "hello\\n")
773    """
774    return captured_output("stdout")
775
776def captured_stderr():
777    """Capture the output of sys.stderr:
778
779       with captured_stderr() as stderr:
780           print("hello", file=sys.stderr)
781       self.assertEqual(stderr.getvalue(), "hello\\n")
782    """
783    return captured_output("stderr")
784
785def captured_stdin():
786    """Capture the input to sys.stdin:
787
788       with captured_stdin() as stdin:
789           stdin.write('hello\\n')
790           stdin.seek(0)
791           # call test code that consumes from sys.stdin
792           captured = input()
793       self.assertEqual(captured, "hello")
794    """
795    return captured_output("stdin")
796
797
798def gc_collect():
799    """Force as many objects as possible to be collected.
800
801    In non-CPython implementations of Python, this is needed because timely
802    deallocation is not guaranteed by the garbage collector.  (Even in CPython
803    this can be the case in case of reference cycles.)  This means that __del__
804    methods may be called later than expected and weakrefs may remain alive for
805    longer than expected.  This function tries its best to force all garbage
806    objects to disappear.
807    """
808    import gc
809    gc.collect()
810    gc.collect()
811    gc.collect()
812
813@contextlib.contextmanager
814def disable_gc():
815    import gc
816    have_gc = gc.isenabled()
817    gc.disable()
818    try:
819        yield
820    finally:
821        if have_gc:
822            gc.enable()
823
824@contextlib.contextmanager
825def gc_threshold(*args):
826    import gc
827    old_threshold = gc.get_threshold()
828    gc.set_threshold(*args)
829    try:
830        yield
831    finally:
832        gc.set_threshold(*old_threshold)
833
834
835def python_is_optimized():
836    """Find if Python was built with optimizations."""
837    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
838    final_opt = ""
839    for opt in cflags.split():
840        if opt.startswith('-O'):
841            final_opt = opt
842    return final_opt not in ('', '-O0', '-Og')
843
844
845def check_cflags_pgo():
846    # Check if Python was built with ./configure --enable-optimizations:
847    # with Profile Guided Optimization (PGO).
848    cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or ''
849    pgo_options = [
850        # GCC
851        '-fprofile-use',
852        # clang: -fprofile-instr-use=code.profclangd
853        '-fprofile-instr-use',
854        # ICC
855        "-prof-use",
856    ]
857    PGO_PROF_USE_FLAG = sysconfig.get_config_var('PGO_PROF_USE_FLAG')
858    if PGO_PROF_USE_FLAG:
859        pgo_options.append(PGO_PROF_USE_FLAG)
860    return any(option in cflags_nodist for option in pgo_options)
861
862
863def check_bolt_optimized():
864    # Always return false, if the platform is WASI,
865    # because BOLT optimization does not support WASM binary.
866    if is_wasi:
867        return False
868    config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
869    return '--enable-bolt' in config_args
870
871
872Py_GIL_DISABLED = bool(sysconfig.get_config_var('Py_GIL_DISABLED'))
873
874def requires_gil_enabled(msg="needs the GIL enabled"):
875    """Decorator for skipping tests on the free-threaded build."""
876    return unittest.skipIf(Py_GIL_DISABLED, msg)
877
878def expected_failure_if_gil_disabled():
879    """Expect test failure if the GIL is disabled."""
880    if Py_GIL_DISABLED:
881        return unittest.expectedFailure
882    return lambda test_case: test_case
883
884if Py_GIL_DISABLED:
885    _header = 'PHBBInP'
886else:
887    _header = 'nP'
888_align = '0n'
889_vheader = _header + 'n'
890
891def calcobjsize(fmt):
892    import struct
893    return struct.calcsize(_header + fmt + _align)
894
895def calcvobjsize(fmt):
896    import struct
897    return struct.calcsize(_vheader + fmt + _align)
898
899
900_TPFLAGS_HAVE_GC = 1<<14
901_TPFLAGS_HEAPTYPE = 1<<9
902
903def check_sizeof(test, o, size):
904    try:
905        import _testinternalcapi
906    except ImportError:
907        raise unittest.SkipTest("_testinternalcapi required")
908    result = sys.getsizeof(o)
909    # add GC header size
910    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
911        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
912        size += _testinternalcapi.SIZEOF_PYGC_HEAD
913    msg = 'wrong size for %s: got %d, expected %d' \
914            % (type(o), result, size)
915    test.assertEqual(result, size, msg)
916
917#=======================================================================
918# Decorator/context manager for running a code in a different locale,
919# correctly resetting it afterwards.
920
921@contextlib.contextmanager
922def run_with_locale(catstr, *locales):
923    try:
924        import locale
925        category = getattr(locale, catstr)
926        orig_locale = locale.setlocale(category)
927    except AttributeError:
928        # if the test author gives us an invalid category string
929        raise
930    except Exception:
931        # cannot retrieve original locale, so do nothing
932        locale = orig_locale = None
933        if '' not in locales:
934            raise unittest.SkipTest('no locales')
935    else:
936        for loc in locales:
937            try:
938                locale.setlocale(category, loc)
939                break
940            except locale.Error:
941                pass
942        else:
943            if '' not in locales:
944                raise unittest.SkipTest(f'no locales {locales}')
945
946    try:
947        yield
948    finally:
949        if locale and orig_locale:
950            locale.setlocale(category, orig_locale)
951
952#=======================================================================
953# Decorator for running a function in multiple locales (if they are
954# availasble) and resetting the original locale afterwards.
955
956def run_with_locales(catstr, *locales):
957    def deco(func):
958        @functools.wraps(func)
959        def wrapper(self, /, *args, **kwargs):
960            dry_run = '' in locales
961            try:
962                import locale
963                category = getattr(locale, catstr)
964                orig_locale = locale.setlocale(category)
965            except AttributeError:
966                # if the test author gives us an invalid category string
967                raise
968            except Exception:
969                # cannot retrieve original locale, so do nothing
970                pass
971            else:
972                try:
973                    for loc in locales:
974                        with self.subTest(locale=loc):
975                            try:
976                                locale.setlocale(category, loc)
977                            except locale.Error:
978                                self.skipTest(f'no locale {loc!r}')
979                            else:
980                                dry_run = False
981                                func(self, *args, **kwargs)
982                finally:
983                    locale.setlocale(category, orig_locale)
984            if dry_run:
985                # no locales available, so just run the test
986                # with the current locale
987                with self.subTest(locale=None):
988                    func(self, *args, **kwargs)
989        return wrapper
990    return deco
991
992#=======================================================================
993# Decorator for running a function in a specific timezone, correctly
994# resetting it afterwards.
995
996def run_with_tz(tz):
997    def decorator(func):
998        def inner(*args, **kwds):
999            try:
1000                tzset = time.tzset
1001            except AttributeError:
1002                raise unittest.SkipTest("tzset required")
1003            if 'TZ' in os.environ:
1004                orig_tz = os.environ['TZ']
1005            else:
1006                orig_tz = None
1007            os.environ['TZ'] = tz
1008            tzset()
1009
1010            # now run the function, resetting the tz on exceptions
1011            try:
1012                return func(*args, **kwds)
1013            finally:
1014                if orig_tz is None:
1015                    del os.environ['TZ']
1016                else:
1017                    os.environ['TZ'] = orig_tz
1018                time.tzset()
1019
1020        inner.__name__ = func.__name__
1021        inner.__doc__ = func.__doc__
1022        return inner
1023    return decorator
1024
1025#=======================================================================
1026# Big-memory-test support. Separate from 'resources' because memory use
1027# should be configurable.
1028
1029# Some handy shorthands. Note that these are used for byte-limits as well
1030# as size-limits, in the various bigmem tests
1031_1M = 1024*1024
1032_1G = 1024 * _1M
1033_2G = 2 * _1G
1034_4G = 4 * _1G
1035
1036MAX_Py_ssize_t = sys.maxsize
1037
1038def _parse_memlimit(limit: str) -> int:
1039    sizes = {
1040        'k': 1024,
1041        'm': _1M,
1042        'g': _1G,
1043        't': 1024*_1G,
1044    }
1045    m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit,
1046                 re.IGNORECASE | re.VERBOSE)
1047    if m is None:
1048        raise ValueError(f'Invalid memory limit: {limit!r}')
1049    return int(float(m.group(1)) * sizes[m.group(2).lower()])
1050
1051def set_memlimit(limit: str) -> None:
1052    global max_memuse
1053    global real_max_memuse
1054    memlimit = _parse_memlimit(limit)
1055    if memlimit < _2G - 1:
1056        raise ValueError('Memory limit {limit!r} too low to be useful')
1057
1058    real_max_memuse = memlimit
1059    memlimit = min(memlimit, MAX_Py_ssize_t)
1060    max_memuse = memlimit
1061
1062
1063class _MemoryWatchdog:
1064    """An object which periodically watches the process' memory consumption
1065    and prints it out.
1066    """
1067
1068    def __init__(self):
1069        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1070        self.started = False
1071
1072    def start(self):
1073        import warnings
1074        try:
1075            f = open(self.procfile, 'r')
1076        except OSError as e:
1077            warnings.warn('/proc not available for stats: {}'.format(e),
1078                          RuntimeWarning)
1079            sys.stderr.flush()
1080            return
1081
1082        import subprocess
1083        with f:
1084            watchdog_script = findfile("memory_watchdog.py")
1085            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1086                                                 stdin=f,
1087                                                 stderr=subprocess.DEVNULL)
1088        self.started = True
1089
1090    def stop(self):
1091        if self.started:
1092            self.mem_watchdog.terminate()
1093            self.mem_watchdog.wait()
1094
1095
1096def bigmemtest(size, memuse, dry_run=True):
1097    """Decorator for bigmem tests.
1098
1099    'size' is a requested size for the test (in arbitrary, test-interpreted
1100    units.) 'memuse' is the number of bytes per unit for the test, or a good
1101    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1102    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1103
1104    The 'size' argument is normally passed to the decorated test method as an
1105    extra argument. If 'dry_run' is true, the value passed to the test method
1106    may be less than the requested value. If 'dry_run' is false, it means the
1107    test doesn't support dummy runs when -M is not specified.
1108    """
1109    def decorator(f):
1110        def wrapper(self):
1111            size = wrapper.size
1112            memuse = wrapper.memuse
1113            if not real_max_memuse:
1114                maxsize = 5147
1115            else:
1116                maxsize = size
1117
1118            if ((real_max_memuse or not dry_run)
1119                and real_max_memuse < maxsize * memuse):
1120                raise unittest.SkipTest(
1121                    "not enough memory: %.1fG minimum needed"
1122                    % (size * memuse / (1024 ** 3)))
1123
1124            if real_max_memuse and verbose:
1125                print()
1126                print(" ... expected peak memory use: {peak:.1f}G"
1127                      .format(peak=size * memuse / (1024 ** 3)))
1128                watchdog = _MemoryWatchdog()
1129                watchdog.start()
1130            else:
1131                watchdog = None
1132
1133            try:
1134                return f(self, maxsize)
1135            finally:
1136                if watchdog:
1137                    watchdog.stop()
1138
1139        wrapper.size = size
1140        wrapper.memuse = memuse
1141        return wrapper
1142    return decorator
1143
1144def bigaddrspacetest(f):
1145    """Decorator for tests that fill the address space."""
1146    def wrapper(self):
1147        if max_memuse < MAX_Py_ssize_t:
1148            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1149                raise unittest.SkipTest(
1150                    "not enough memory: try a 32-bit build instead")
1151            else:
1152                raise unittest.SkipTest(
1153                    "not enough memory: %.1fG minimum needed"
1154                    % (MAX_Py_ssize_t / (1024 ** 3)))
1155        else:
1156            return f(self)
1157    return wrapper
1158
1159#=======================================================================
1160# unittest integration.
1161
1162def _id(obj):
1163    return obj
1164
1165def requires_resource(resource):
1166    if resource == 'gui' and not _is_gui_available():
1167        return unittest.skip(_is_gui_available.reason)
1168    if is_resource_enabled(resource):
1169        return _id
1170    else:
1171        return unittest.skip("resource {0!r} is not enabled".format(resource))
1172
1173def cpython_only(test):
1174    """
1175    Decorator for tests only applicable on CPython.
1176    """
1177    return impl_detail(cpython=True)(test)
1178
1179def impl_detail(msg=None, **guards):
1180    if check_impl_detail(**guards):
1181        return _id
1182    if msg is None:
1183        guardnames, default = _parse_guards(guards)
1184        if default:
1185            msg = "implementation detail not available on {0}"
1186        else:
1187            msg = "implementation detail specific to {0}"
1188        guardnames = sorted(guardnames.keys())
1189        msg = msg.format(' or '.join(guardnames))
1190    return unittest.skip(msg)
1191
1192def _parse_guards(guards):
1193    # Returns a tuple ({platform_name: run_me}, default_value)
1194    if not guards:
1195        return ({'cpython': True}, False)
1196    is_true = list(guards.values())[0]
1197    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1198    return (guards, not is_true)
1199
1200# Use the following check to guard CPython's implementation-specific tests --
1201# or to run them only on the implementation(s) guarded by the arguments.
1202def check_impl_detail(**guards):
1203    """This function returns True or False depending on the host platform.
1204       Examples:
1205          if check_impl_detail():               # only on CPython (default)
1206          if check_impl_detail(jython=True):    # only on Jython
1207          if check_impl_detail(cpython=False):  # everywhere except on CPython
1208    """
1209    guards, default = _parse_guards(guards)
1210    return guards.get(sys.implementation.name, default)
1211
1212
1213def no_tracing(func):
1214    """Decorator to temporarily turn off tracing for the duration of a test."""
1215    trace_wrapper = func
1216    if hasattr(sys, 'gettrace'):
1217        @functools.wraps(func)
1218        def trace_wrapper(*args, **kwargs):
1219            original_trace = sys.gettrace()
1220            try:
1221                sys.settrace(None)
1222                return func(*args, **kwargs)
1223            finally:
1224                sys.settrace(original_trace)
1225
1226    coverage_wrapper = trace_wrapper
1227    if 'test.cov' in sys.modules:  # -Xpresite=test.cov used
1228        cov = sys.monitoring.COVERAGE_ID
1229        @functools.wraps(func)
1230        def coverage_wrapper(*args, **kwargs):
1231            original_events = sys.monitoring.get_events(cov)
1232            try:
1233                sys.monitoring.set_events(cov, 0)
1234                return trace_wrapper(*args, **kwargs)
1235            finally:
1236                sys.monitoring.set_events(cov, original_events)
1237
1238    return coverage_wrapper
1239
1240
1241def refcount_test(test):
1242    """Decorator for tests which involve reference counting.
1243
1244    To start, the decorator does not run the test if is not run by CPython.
1245    After that, any trace function is unset during the test to prevent
1246    unexpected refcounts caused by the trace function.
1247
1248    """
1249    return no_tracing(cpython_only(test))
1250
1251
1252def requires_limited_api(test):
1253    try:
1254        import _testcapi
1255        import _testlimitedcapi
1256    except ImportError:
1257        return unittest.skip('needs _testcapi and _testlimitedcapi modules')(test)
1258    return test
1259
1260
1261# Windows build doesn't support --disable-test-modules feature, so there's no
1262# 'TEST_MODULES' var in config
1263TEST_MODULES_ENABLED = (sysconfig.get_config_var('TEST_MODULES') or 'yes') == 'yes'
1264
1265def requires_specialization(test):
1266    return unittest.skipUnless(
1267        _opcode.ENABLE_SPECIALIZATION, "requires specialization")(test)
1268
1269
1270#=======================================================================
1271# Check for the presence of docstrings.
1272
1273# Rather than trying to enumerate all the cases where docstrings may be
1274# disabled, we just check for that directly
1275
1276def _check_docstrings():
1277    """Just used to check if docstrings are enabled"""
1278
1279MISSING_C_DOCSTRINGS = (check_impl_detail() and
1280                        sys.platform != 'win32' and
1281                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1282
1283HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1284                   not MISSING_C_DOCSTRINGS)
1285
1286requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1287                                          "test requires docstrings")
1288
1289
1290#=======================================================================
1291# Support for saving and restoring the imported modules.
1292
1293def flush_std_streams():
1294    if sys.stdout is not None:
1295        sys.stdout.flush()
1296    if sys.stderr is not None:
1297        sys.stderr.flush()
1298
1299
1300def print_warning(msg):
1301    # bpo-45410: Explicitly flush stdout to keep logs in order
1302    flush_std_streams()
1303    stream = print_warning.orig_stderr
1304    for line in msg.splitlines():
1305        print(f"Warning -- {line}", file=stream)
1306    stream.flush()
1307
1308# bpo-39983: Store the original sys.stderr at Python startup to be able to
1309# log warnings even if sys.stderr is captured temporarily by a test.
1310print_warning.orig_stderr = sys.stderr
1311
1312
1313# Flag used by saved_test_environment of test.libregrtest.save_env,
1314# to check if a test modified the environment. The flag should be set to False
1315# before running a new test.
1316#
1317# For example, threading_helper.threading_cleanup() sets the flag is the function fails
1318# to cleanup threads.
1319environment_altered = False
1320
1321def reap_children():
1322    """Use this function at the end of test_main() whenever sub-processes
1323    are started.  This will help ensure that no extra children (zombies)
1324    stick around to hog resources and create problems when looking
1325    for refleaks.
1326    """
1327    global environment_altered
1328
1329    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1330    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1331        return
1332    elif not has_subprocess_support:
1333        return
1334
1335    # Reap all our dead child processes so we don't leave zombies around.
1336    # These hog resources and might be causing some of the buildbots to die.
1337    while True:
1338        try:
1339            # Read the exit status of any child process which already completed
1340            pid, status = os.waitpid(-1, os.WNOHANG)
1341        except OSError:
1342            break
1343
1344        if pid == 0:
1345            break
1346
1347        print_warning(f"reap_children() reaped child process {pid}")
1348        environment_altered = True
1349
1350
1351@contextlib.contextmanager
1352def swap_attr(obj, attr, new_val):
1353    """Temporary swap out an attribute with a new object.
1354
1355    Usage:
1356        with swap_attr(obj, "attr", 5):
1357            ...
1358
1359        This will set obj.attr to 5 for the duration of the with: block,
1360        restoring the old value at the end of the block. If `attr` doesn't
1361        exist on `obj`, it will be created and then deleted at the end of the
1362        block.
1363
1364        The old value (or None if it doesn't exist) will be assigned to the
1365        target of the "as" clause, if there is one.
1366    """
1367    if hasattr(obj, attr):
1368        real_val = getattr(obj, attr)
1369        setattr(obj, attr, new_val)
1370        try:
1371            yield real_val
1372        finally:
1373            setattr(obj, attr, real_val)
1374    else:
1375        setattr(obj, attr, new_val)
1376        try:
1377            yield
1378        finally:
1379            if hasattr(obj, attr):
1380                delattr(obj, attr)
1381
1382@contextlib.contextmanager
1383def swap_item(obj, item, new_val):
1384    """Temporary swap out an item with a new object.
1385
1386    Usage:
1387        with swap_item(obj, "item", 5):
1388            ...
1389
1390        This will set obj["item"] to 5 for the duration of the with: block,
1391        restoring the old value at the end of the block. If `item` doesn't
1392        exist on `obj`, it will be created and then deleted at the end of the
1393        block.
1394
1395        The old value (or None if it doesn't exist) will be assigned to the
1396        target of the "as" clause, if there is one.
1397    """
1398    if item in obj:
1399        real_val = obj[item]
1400        obj[item] = new_val
1401        try:
1402            yield real_val
1403        finally:
1404            obj[item] = real_val
1405    else:
1406        obj[item] = new_val
1407        try:
1408            yield
1409        finally:
1410            if item in obj:
1411                del obj[item]
1412
1413def args_from_interpreter_flags():
1414    """Return a list of command-line arguments reproducing the current
1415    settings in sys.flags and sys.warnoptions."""
1416    import subprocess
1417    return subprocess._args_from_interpreter_flags()
1418
1419def optim_args_from_interpreter_flags():
1420    """Return a list of command-line arguments reproducing the current
1421    optimization settings in sys.flags."""
1422    import subprocess
1423    return subprocess._optim_args_from_interpreter_flags()
1424
1425
1426class Matcher(object):
1427
1428    _partial_matches = ('msg', 'message')
1429
1430    def matches(self, d, **kwargs):
1431        """
1432        Try to match a single dict with the supplied arguments.
1433
1434        Keys whose values are strings and which are in self._partial_matches
1435        will be checked for partial (i.e. substring) matches. You can extend
1436        this scheme to (for example) do regular expression matching, etc.
1437        """
1438        result = True
1439        for k in kwargs:
1440            v = kwargs[k]
1441            dv = d.get(k)
1442            if not self.match_value(k, dv, v):
1443                result = False
1444                break
1445        return result
1446
1447    def match_value(self, k, dv, v):
1448        """
1449        Try to match a single stored value (dv) with a supplied value (v).
1450        """
1451        if type(v) != type(dv):
1452            result = False
1453        elif type(dv) is not str or k not in self._partial_matches:
1454            result = (v == dv)
1455        else:
1456            result = dv.find(v) >= 0
1457        return result
1458
1459
1460_buggy_ucrt = None
1461def skip_if_buggy_ucrt_strfptime(test):
1462    """
1463    Skip decorator for tests that use buggy strptime/strftime
1464
1465    If the UCRT bugs are present time.localtime().tm_zone will be
1466    an empty string, otherwise we assume the UCRT bugs are fixed
1467
1468    See bpo-37552 [Windows] strptime/strftime return invalid
1469    results with UCRT version 17763.615
1470    """
1471    import locale
1472    global _buggy_ucrt
1473    if _buggy_ucrt is None:
1474        if(sys.platform == 'win32' and
1475                locale.getencoding() == 'cp65001' and
1476                time.localtime().tm_zone == ''):
1477            _buggy_ucrt = True
1478        else:
1479            _buggy_ucrt = False
1480    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1481
1482class PythonSymlink:
1483    """Creates a symlink for the current Python executable"""
1484    def __init__(self, link=None):
1485        from .os_helper import TESTFN
1486
1487        self.link = link or os.path.abspath(TESTFN)
1488        self._linked = []
1489        self.real = os.path.realpath(sys.executable)
1490        self._also_link = []
1491
1492        self._env = None
1493
1494        self._platform_specific()
1495
1496    if sys.platform == "win32":
1497        def _platform_specific(self):
1498            import glob
1499            import _winapi
1500
1501            if os.path.lexists(self.real) and not os.path.exists(self.real):
1502                # App symlink appears to not exist, but we want the
1503                # real executable here anyway
1504                self.real = _winapi.GetModuleFileName(0)
1505
1506            dll = _winapi.GetModuleFileName(sys.dllhandle)
1507            src_dir = os.path.dirname(dll)
1508            dest_dir = os.path.dirname(self.link)
1509            self._also_link.append((
1510                dll,
1511                os.path.join(dest_dir, os.path.basename(dll))
1512            ))
1513            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1514                self._also_link.append((
1515                    runtime,
1516                    os.path.join(dest_dir, os.path.basename(runtime))
1517                ))
1518
1519            self._env = {k.upper(): os.getenv(k) for k in os.environ}
1520            self._env["PYTHONHOME"] = os.path.dirname(self.real)
1521            if sysconfig.is_python_build():
1522                self._env["PYTHONPATH"] = STDLIB_DIR
1523    else:
1524        def _platform_specific(self):
1525            pass
1526
1527    def __enter__(self):
1528        os.symlink(self.real, self.link)
1529        self._linked.append(self.link)
1530        for real, link in self._also_link:
1531            os.symlink(real, link)
1532            self._linked.append(link)
1533        return self
1534
1535    def __exit__(self, exc_type, exc_value, exc_tb):
1536        for link in self._linked:
1537            try:
1538                os.remove(link)
1539            except IOError as ex:
1540                if verbose:
1541                    print("failed to clean up {}: {}".format(link, ex))
1542
1543    def _call(self, python, args, env, returncode):
1544        import subprocess
1545        cmd = [python, *args]
1546        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1547                             stderr=subprocess.PIPE, env=env)
1548        r = p.communicate()
1549        if p.returncode != returncode:
1550            if verbose:
1551                print(repr(r[0]))
1552                print(repr(r[1]), file=sys.stderr)
1553            raise RuntimeError(
1554                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1555        return r
1556
1557    def call_real(self, *args, returncode=0):
1558        return self._call(self.real, args, None, returncode)
1559
1560    def call_link(self, *args, returncode=0):
1561        return self._call(self.link, args, self._env, returncode)
1562
1563
1564def skip_if_pgo_task(test):
1565    """Skip decorator for tests not run in (non-extended) PGO task"""
1566    ok = not PGO or PGO_EXTENDED
1567    msg = "Not run for (non-extended) PGO task"
1568    return test if ok else unittest.skip(msg)(test)
1569
1570
1571def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1572    """Returns the set of items in ref_api not in other_api, except for a
1573    defined list of items to be ignored in this check.
1574
1575    By default this skips private attributes beginning with '_' but
1576    includes all magic methods, i.e. those starting and ending in '__'.
1577    """
1578    missing_items = set(dir(ref_api)) - set(dir(other_api))
1579    if ignore:
1580        missing_items -= set(ignore)
1581    missing_items = set(m for m in missing_items
1582                        if not m.startswith('_') or m.endswith('__'))
1583    return missing_items
1584
1585
1586def check__all__(test_case, module, name_of_module=None, extra=(),
1587                 not_exported=()):
1588    """Assert that the __all__ variable of 'module' contains all public names.
1589
1590    The module's public names (its API) are detected automatically based on
1591    whether they match the public name convention and were defined in
1592    'module'.
1593
1594    The 'name_of_module' argument can specify (as a string or tuple thereof)
1595    what module(s) an API could be defined in in order to be detected as a
1596    public API. One case for this is when 'module' imports part of its public
1597    API from other modules, possibly a C backend (like 'csv' and its '_csv').
1598
1599    The 'extra' argument can be a set of names that wouldn't otherwise be
1600    automatically detected as "public", like objects without a proper
1601    '__module__' attribute. If provided, it will be added to the
1602    automatically detected ones.
1603
1604    The 'not_exported' argument can be a set of names that must not be treated
1605    as part of the public API even though their names indicate otherwise.
1606
1607    Usage:
1608        import bar
1609        import foo
1610        import unittest
1611        from test import support
1612
1613        class MiscTestCase(unittest.TestCase):
1614            def test__all__(self):
1615                support.check__all__(self, foo)
1616
1617        class OtherTestCase(unittest.TestCase):
1618            def test__all__(self):
1619                extra = {'BAR_CONST', 'FOO_CONST'}
1620                not_exported = {'baz'}  # Undocumented name.
1621                # bar imports part of its API from _bar.
1622                support.check__all__(self, bar, ('bar', '_bar'),
1623                                     extra=extra, not_exported=not_exported)
1624
1625    """
1626
1627    if name_of_module is None:
1628        name_of_module = (module.__name__, )
1629    elif isinstance(name_of_module, str):
1630        name_of_module = (name_of_module, )
1631
1632    expected = set(extra)
1633
1634    for name in dir(module):
1635        if name.startswith('_') or name in not_exported:
1636            continue
1637        obj = getattr(module, name)
1638        if (getattr(obj, '__module__', None) in name_of_module or
1639                (not hasattr(obj, '__module__') and
1640                 not isinstance(obj, types.ModuleType))):
1641            expected.add(name)
1642    test_case.assertCountEqual(module.__all__, expected)
1643
1644
1645def suppress_msvcrt_asserts(verbose=False):
1646    try:
1647        import msvcrt
1648    except ImportError:
1649        return
1650
1651    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1652                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1653                        | msvcrt.SEM_NOGPFAULTERRORBOX
1654                        | msvcrt.SEM_NOOPENFILEERRORBOX)
1655
1656    # CrtSetReportMode() is only available in debug build
1657    if hasattr(msvcrt, 'CrtSetReportMode'):
1658        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1659            if verbose:
1660                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1661                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1662            else:
1663                msvcrt.CrtSetReportMode(m, 0)
1664
1665
1666class SuppressCrashReport:
1667    """Try to prevent a crash report from popping up.
1668
1669    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1670    disable the creation of coredump file.
1671    """
1672    old_value = None
1673    old_modes = None
1674
1675    def __enter__(self):
1676        """On Windows, disable Windows Error Reporting dialogs using
1677        SetErrorMode() and CrtSetReportMode().
1678
1679        On UNIX, try to save the previous core file size limit, then set
1680        soft limit to 0.
1681        """
1682        if sys.platform.startswith('win'):
1683            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1684            try:
1685                import msvcrt
1686            except ImportError:
1687                return
1688
1689            self.old_value = msvcrt.GetErrorMode()
1690
1691            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1692
1693            # bpo-23314: Suppress assert dialogs in debug builds.
1694            # CrtSetReportMode() is only available in debug build.
1695            if hasattr(msvcrt, 'CrtSetReportMode'):
1696                self.old_modes = {}
1697                for report_type in [msvcrt.CRT_WARN,
1698                                    msvcrt.CRT_ERROR,
1699                                    msvcrt.CRT_ASSERT]:
1700                    old_mode = msvcrt.CrtSetReportMode(report_type,
1701                            msvcrt.CRTDBG_MODE_FILE)
1702                    old_file = msvcrt.CrtSetReportFile(report_type,
1703                            msvcrt.CRTDBG_FILE_STDERR)
1704                    self.old_modes[report_type] = old_mode, old_file
1705
1706        else:
1707            try:
1708                import resource
1709                self.resource = resource
1710            except ImportError:
1711                self.resource = None
1712            if self.resource is not None:
1713                try:
1714                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1715                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
1716                                            (0, self.old_value[1]))
1717                except (ValueError, OSError):
1718                    pass
1719
1720            if sys.platform == 'darwin':
1721                import subprocess
1722                # Check if the 'Crash Reporter' on OSX was configured
1723                # in 'Developer' mode and warn that it will get triggered
1724                # when it is.
1725                #
1726                # This assumes that this context manager is used in tests
1727                # that might trigger the next manager.
1728                cmd = ['/usr/bin/defaults', 'read',
1729                       'com.apple.CrashReporter', 'DialogType']
1730                proc = subprocess.Popen(cmd,
1731                                        stdout=subprocess.PIPE,
1732                                        stderr=subprocess.PIPE)
1733                with proc:
1734                    stdout = proc.communicate()[0]
1735                if stdout.strip() == b'developer':
1736                    print("this test triggers the Crash Reporter, "
1737                          "that is intentional", end='', flush=True)
1738
1739        return self
1740
1741    def __exit__(self, *ignore_exc):
1742        """Restore Windows ErrorMode or core file behavior to initial value."""
1743        if self.old_value is None:
1744            return
1745
1746        if sys.platform.startswith('win'):
1747            import msvcrt
1748            msvcrt.SetErrorMode(self.old_value)
1749
1750            if self.old_modes:
1751                for report_type, (old_mode, old_file) in self.old_modes.items():
1752                    msvcrt.CrtSetReportMode(report_type, old_mode)
1753                    msvcrt.CrtSetReportFile(report_type, old_file)
1754        else:
1755            if self.resource is not None:
1756                try:
1757                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1758                except (ValueError, OSError):
1759                    pass
1760
1761
1762def patch(test_instance, object_to_patch, attr_name, new_value):
1763    """Override 'object_to_patch'.'attr_name' with 'new_value'.
1764
1765    Also, add a cleanup procedure to 'test_instance' to restore
1766    'object_to_patch' value for 'attr_name'.
1767    The 'attr_name' should be a valid attribute for 'object_to_patch'.
1768
1769    """
1770    # check that 'attr_name' is a real attribute for 'object_to_patch'
1771    # will raise AttributeError if it does not exist
1772    getattr(object_to_patch, attr_name)
1773
1774    # keep a copy of the old value
1775    attr_is_local = False
1776    try:
1777        old_value = object_to_patch.__dict__[attr_name]
1778    except (AttributeError, KeyError):
1779        old_value = getattr(object_to_patch, attr_name, None)
1780    else:
1781        attr_is_local = True
1782
1783    # restore the value when the test is done
1784    def cleanup():
1785        if attr_is_local:
1786            setattr(object_to_patch, attr_name, old_value)
1787        else:
1788            delattr(object_to_patch, attr_name)
1789
1790    test_instance.addCleanup(cleanup)
1791
1792    # actually override the attribute
1793    setattr(object_to_patch, attr_name, new_value)
1794
1795
1796@contextlib.contextmanager
1797def patch_list(orig):
1798    """Like unittest.mock.patch.dict, but for lists."""
1799    try:
1800        saved = orig[:]
1801        yield
1802    finally:
1803        orig[:] = saved
1804
1805
1806def run_in_subinterp(code):
1807    """
1808    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1809    module is enabled.
1810    """
1811    _check_tracemalloc()
1812    try:
1813        import _testcapi
1814    except ImportError:
1815        raise unittest.SkipTest("requires _testcapi")
1816    return _testcapi.run_in_subinterp(code)
1817
1818
1819def run_in_subinterp_with_config(code, *, own_gil=None, **config):
1820    """
1821    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1822    module is enabled.
1823    """
1824    _check_tracemalloc()
1825    try:
1826        import _testinternalcapi
1827    except ImportError:
1828        raise unittest.SkipTest("requires _testinternalcapi")
1829    if own_gil is not None:
1830        assert 'gil' not in config, (own_gil, config)
1831        config['gil'] = 'own' if own_gil else 'shared'
1832    else:
1833        gil = config['gil']
1834        if gil == 0:
1835            config['gil'] = 'default'
1836        elif gil == 1:
1837            config['gil'] = 'shared'
1838        elif gil == 2:
1839            config['gil'] = 'own'
1840        elif not isinstance(gil, str):
1841            raise NotImplementedError(gil)
1842    config = types.SimpleNamespace(**config)
1843    return _testinternalcapi.run_in_subinterp_with_config(code, config)
1844
1845
1846def _check_tracemalloc():
1847    # Issue #10915, #15751: PyGILState_*() functions don't work with
1848    # sub-interpreters, the tracemalloc module uses these functions internally
1849    try:
1850        import tracemalloc
1851    except ImportError:
1852        pass
1853    else:
1854        if tracemalloc.is_tracing():
1855            raise unittest.SkipTest("run_in_subinterp() cannot be used "
1856                                     "if tracemalloc module is tracing "
1857                                     "memory allocations")
1858
1859
1860def check_free_after_iterating(test, iter, cls, args=()):
1861    done = False
1862    def wrapper():
1863        class A(cls):
1864            def __del__(self):
1865                nonlocal done
1866                done = True
1867                try:
1868                    next(it)
1869                except StopIteration:
1870                    pass
1871
1872        it = iter(A(*args))
1873        # Issue 26494: Shouldn't crash
1874        test.assertRaises(StopIteration, next, it)
1875
1876    wrapper()
1877    # The sequence should be deallocated just after the end of iterating
1878    gc_collect()
1879    test.assertTrue(done)
1880
1881
1882def missing_compiler_executable(cmd_names=[]):
1883    """Check if the compiler components used to build the interpreter exist.
1884
1885    Check for the existence of the compiler executables whose names are listed
1886    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1887    and return the first missing executable or None when none is found
1888    missing.
1889
1890    """
1891    from setuptools._distutils import ccompiler, sysconfig, spawn
1892    from setuptools import errors
1893
1894    compiler = ccompiler.new_compiler()
1895    sysconfig.customize_compiler(compiler)
1896    if compiler.compiler_type == "msvc":
1897        # MSVC has no executables, so check whether initialization succeeds
1898        try:
1899            compiler.initialize()
1900        except errors.PlatformError:
1901            return "msvc"
1902    for name in compiler.executables:
1903        if cmd_names and name not in cmd_names:
1904            continue
1905        cmd = getattr(compiler, name)
1906        if cmd_names:
1907            assert cmd is not None, \
1908                    "the '%s' executable is not configured" % name
1909        elif not cmd:
1910            continue
1911        if spawn.find_executable(cmd[0]) is None:
1912            return cmd[0]
1913
1914
1915_old_android_emulator = None
1916def setswitchinterval(interval):
1917    # Setting a very low gil interval on the Android emulator causes python
1918    # to hang (issue #26939).
1919    minimum_interval = 1e-4   # 100 us
1920    if is_android and interval < minimum_interval:
1921        global _old_android_emulator
1922        if _old_android_emulator is None:
1923            import platform
1924            av = platform.android_ver()
1925            _old_android_emulator = av.is_emulator and av.api_level < 24
1926        if _old_android_emulator:
1927            interval = minimum_interval
1928    return sys.setswitchinterval(interval)
1929
1930
1931def get_pagesize():
1932    """Get size of a page in bytes."""
1933    try:
1934        page_size = os.sysconf('SC_PAGESIZE')
1935    except (ValueError, AttributeError):
1936        try:
1937            page_size = os.sysconf('SC_PAGE_SIZE')
1938        except (ValueError, AttributeError):
1939            page_size = 4096
1940    return page_size
1941
1942
1943@contextlib.contextmanager
1944def disable_faulthandler():
1945    import faulthandler
1946
1947    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1948    # sys.stderr with a StringIO which has no file descriptor when a test
1949    # is run with -W/--verbose3.
1950    fd = sys.__stderr__.fileno()
1951
1952    is_enabled = faulthandler.is_enabled()
1953    try:
1954        faulthandler.disable()
1955        yield
1956    finally:
1957        if is_enabled:
1958            faulthandler.enable(file=fd, all_threads=True)
1959
1960
1961class SaveSignals:
1962    """
1963    Save and restore signal handlers.
1964
1965    This class is only able to save/restore signal handlers registered
1966    by the Python signal module: see bpo-13285 for "external" signal
1967    handlers.
1968    """
1969
1970    def __init__(self):
1971        import signal
1972        self.signal = signal
1973        self.signals = signal.valid_signals()
1974        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1975        for signame in ('SIGKILL', 'SIGSTOP'):
1976            try:
1977                signum = getattr(signal, signame)
1978            except AttributeError:
1979                continue
1980            self.signals.remove(signum)
1981        self.handlers = {}
1982
1983    def save(self):
1984        for signum in self.signals:
1985            handler = self.signal.getsignal(signum)
1986            if handler is None:
1987                # getsignal() returns None if a signal handler was not
1988                # registered by the Python signal module,
1989                # and the handler is not SIG_DFL nor SIG_IGN.
1990                #
1991                # Ignore the signal: we cannot restore the handler.
1992                continue
1993            self.handlers[signum] = handler
1994
1995    def restore(self):
1996        for signum, handler in self.handlers.items():
1997            self.signal.signal(signum, handler)
1998
1999
2000def with_pymalloc():
2001    try:
2002        import _testcapi
2003    except ImportError:
2004        raise unittest.SkipTest("requires _testcapi")
2005    return _testcapi.WITH_PYMALLOC and not Py_GIL_DISABLED
2006
2007
2008def with_mimalloc():
2009    try:
2010        import _testcapi
2011    except ImportError:
2012        raise unittest.SkipTest("requires _testcapi")
2013    return _testcapi.WITH_MIMALLOC
2014
2015
2016class _ALWAYS_EQ:
2017    """
2018    Object that is equal to anything.
2019    """
2020    def __eq__(self, other):
2021        return True
2022    def __ne__(self, other):
2023        return False
2024
2025ALWAYS_EQ = _ALWAYS_EQ()
2026
2027class _NEVER_EQ:
2028    """
2029    Object that is not equal to anything.
2030    """
2031    def __eq__(self, other):
2032        return False
2033    def __ne__(self, other):
2034        return True
2035    def __hash__(self):
2036        return 1
2037
2038NEVER_EQ = _NEVER_EQ()
2039
2040@functools.total_ordering
2041class _LARGEST:
2042    """
2043    Object that is greater than anything (except itself).
2044    """
2045    def __eq__(self, other):
2046        return isinstance(other, _LARGEST)
2047    def __lt__(self, other):
2048        return False
2049
2050LARGEST = _LARGEST()
2051
2052@functools.total_ordering
2053class _SMALLEST:
2054    """
2055    Object that is less than anything (except itself).
2056    """
2057    def __eq__(self, other):
2058        return isinstance(other, _SMALLEST)
2059    def __gt__(self, other):
2060        return False
2061
2062SMALLEST = _SMALLEST()
2063
2064def maybe_get_event_loop_policy():
2065    """Return the global event loop policy if one is set, else return None."""
2066    import asyncio.events
2067    return asyncio.events._event_loop_policy
2068
2069# Helpers for testing hashing.
2070NHASHBITS = sys.hash_info.width # number of bits in hash() result
2071assert NHASHBITS in (32, 64)
2072
2073# Return mean and sdev of number of collisions when tossing nballs balls
2074# uniformly at random into nbins bins.  By definition, the number of
2075# collisions is the number of balls minus the number of occupied bins at
2076# the end.
2077def collision_stats(nbins, nballs):
2078    n, k = nbins, nballs
2079    # prob a bin empty after k trials = (1 - 1/n)**k
2080    # mean # empty is then n * (1 - 1/n)**k
2081    # so mean # occupied is n - n * (1 - 1/n)**k
2082    # so collisions = k - (n - n*(1 - 1/n)**k)
2083    #
2084    # For the variance:
2085    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
2086    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
2087    #
2088    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
2089    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
2090    # error-prone) efforts to rework the naive formulas to avoid those,
2091    # we use the `decimal` module to get plenty of extra precision.
2092    #
2093    # Note:  the exact values are straightforward to compute with
2094    # rationals, but in context that's unbearably slow, requiring
2095    # multi-million bit arithmetic.
2096    import decimal
2097    with decimal.localcontext() as ctx:
2098        bits = n.bit_length() * 2  # bits in n**2
2099        # At least that many bits will likely cancel out.
2100        # Use that many decimal digits instead.
2101        ctx.prec = max(bits, 30)
2102        dn = decimal.Decimal(n)
2103        p1empty = ((dn - 1) / dn) ** k
2104        meanempty = n * p1empty
2105        occupied = n - meanempty
2106        collisions = k - occupied
2107        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
2108        return float(collisions), float(var.sqrt())
2109
2110
2111class catch_unraisable_exception:
2112    """
2113    Context manager catching unraisable exception using sys.unraisablehook.
2114
2115    Storing the exception value (cm.unraisable.exc_value) creates a reference
2116    cycle. The reference cycle is broken explicitly when the context manager
2117    exits.
2118
2119    Storing the object (cm.unraisable.object) can resurrect it if it is set to
2120    an object which is being finalized. Exiting the context manager clears the
2121    stored object.
2122
2123    Usage:
2124
2125        with support.catch_unraisable_exception() as cm:
2126            # code creating an "unraisable exception"
2127            ...
2128
2129            # check the unraisable exception: use cm.unraisable
2130            ...
2131
2132        # cm.unraisable attribute no longer exists at this point
2133        # (to break a reference cycle)
2134    """
2135
2136    def __init__(self):
2137        self.unraisable = None
2138        self._old_hook = None
2139
2140    def _hook(self, unraisable):
2141        # Storing unraisable.object can resurrect an object which is being
2142        # finalized. Storing unraisable.exc_value creates a reference cycle.
2143        self.unraisable = unraisable
2144
2145    def __enter__(self):
2146        self._old_hook = sys.unraisablehook
2147        sys.unraisablehook = self._hook
2148        return self
2149
2150    def __exit__(self, *exc_info):
2151        sys.unraisablehook = self._old_hook
2152        del self.unraisable
2153
2154
2155def wait_process(pid, *, exitcode, timeout=None):
2156    """
2157    Wait until process pid completes and check that the process exit code is
2158    exitcode.
2159
2160    Raise an AssertionError if the process exit code is not equal to exitcode.
2161
2162    If the process runs longer than timeout seconds (LONG_TIMEOUT by default),
2163    kill the process (if signal.SIGKILL is available) and raise an
2164    AssertionError. The timeout feature is not available on Windows.
2165    """
2166    if os.name != "nt":
2167        import signal
2168
2169        if timeout is None:
2170            timeout = LONG_TIMEOUT
2171
2172        start_time = time.monotonic()
2173        for _ in sleeping_retry(timeout, error=False):
2174            pid2, status = os.waitpid(pid, os.WNOHANG)
2175            if pid2 != 0:
2176                break
2177            # rety: the process is still running
2178        else:
2179            try:
2180                os.kill(pid, signal.SIGKILL)
2181                os.waitpid(pid, 0)
2182            except OSError:
2183                # Ignore errors like ChildProcessError or PermissionError
2184                pass
2185
2186            dt = time.monotonic() - start_time
2187            raise AssertionError(f"process {pid} is still running "
2188                                 f"after {dt:.1f} seconds")
2189    else:
2190        # Windows implementation: don't support timeout :-(
2191        pid2, status = os.waitpid(pid, 0)
2192
2193    exitcode2 = os.waitstatus_to_exitcode(status)
2194    if exitcode2 != exitcode:
2195        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
2196                             f"but exit code {exitcode} is expected")
2197
2198    # sanity check: it should not fail in practice
2199    if pid2 != pid:
2200        raise AssertionError(f"pid {pid2} != pid {pid}")
2201
2202def skip_if_broken_multiprocessing_synchronize():
2203    """
2204    Skip tests if the multiprocessing.synchronize module is missing, if there
2205    is no available semaphore implementation, or if creating a lock raises an
2206    OSError (on Linux only).
2207    """
2208    from .import_helper import import_module
2209
2210    # Skip tests if the _multiprocessing extension is missing.
2211    import_module('_multiprocessing')
2212
2213    # Skip tests if there is no available semaphore implementation:
2214    # multiprocessing.synchronize requires _multiprocessing.SemLock.
2215    synchronize = import_module('multiprocessing.synchronize')
2216
2217    if sys.platform == "linux":
2218        try:
2219            # bpo-38377: On Linux, creating a semaphore fails with OSError
2220            # if the current user does not have the permission to create
2221            # a file in /dev/shm/ directory.
2222            synchronize.Lock(ctx=None)
2223        except OSError as exc:
2224            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2225
2226
2227def check_disallow_instantiation(testcase, tp, *args, **kwds):
2228    """
2229    Check that given type cannot be instantiated using *args and **kwds.
2230
2231    See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2232    """
2233    mod = tp.__module__
2234    name = tp.__name__
2235    if mod != 'builtins':
2236        qualname = f"{mod}.{name}"
2237    else:
2238        qualname = f"{name}"
2239    msg = f"cannot create '{re.escape(qualname)}' instances"
2240    testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2241
2242def get_recursion_depth():
2243    """Get the recursion depth of the caller function.
2244
2245    In the __main__ module, at the module level, it should be 1.
2246    """
2247    try:
2248        import _testinternalcapi
2249        depth = _testinternalcapi.get_recursion_depth()
2250    except (ImportError, RecursionError) as exc:
2251        # sys._getframe() + frame.f_back implementation.
2252        try:
2253            depth = 0
2254            frame = sys._getframe()
2255            while frame is not None:
2256                depth += 1
2257                frame = frame.f_back
2258        finally:
2259            # Break any reference cycles.
2260            frame = None
2261
2262    # Ignore get_recursion_depth() frame.
2263    return max(depth - 1, 1)
2264
2265def get_recursion_available():
2266    """Get the number of available frames before RecursionError.
2267
2268    It depends on the current recursion depth of the caller function and
2269    sys.getrecursionlimit().
2270    """
2271    limit = sys.getrecursionlimit()
2272    depth = get_recursion_depth()
2273    return limit - depth
2274
2275@contextlib.contextmanager
2276def set_recursion_limit(limit):
2277    """Temporarily change the recursion limit."""
2278    original_limit = sys.getrecursionlimit()
2279    try:
2280        sys.setrecursionlimit(limit)
2281        yield
2282    finally:
2283        sys.setrecursionlimit(original_limit)
2284
2285def infinite_recursion(max_depth=None):
2286    if max_depth is None:
2287        # Pick a number large enough to cause problems
2288        # but not take too long for code that can handle
2289        # very deep recursion.
2290        max_depth = 20_000
2291    elif max_depth < 3:
2292        raise ValueError("max_depth must be at least 3, got {max_depth}")
2293    depth = get_recursion_depth()
2294    depth = max(depth - 1, 1)  # Ignore infinite_recursion() frame.
2295    limit = depth + max_depth
2296    return set_recursion_limit(limit)
2297
2298def ignore_deprecations_from(module: str, *, like: str) -> object:
2299    token = object()
2300    warnings.filterwarnings(
2301        "ignore",
2302        category=DeprecationWarning,
2303        module=module,
2304        message=like + fr"(?#support{id(token)})",
2305    )
2306    return token
2307
2308def clear_ignored_deprecations(*tokens: object) -> None:
2309    if not tokens:
2310        raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2311
2312    new_filters = []
2313    endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2314    for action, message, category, module, lineno in warnings.filters:
2315        if action == "ignore" and category is DeprecationWarning:
2316            if isinstance(message, re.Pattern):
2317                msg = message.pattern
2318            else:
2319                msg = message or ""
2320            if msg.endswith(endswith):
2321                continue
2322        new_filters.append((action, message, category, module, lineno))
2323    if warnings.filters != new_filters:
2324        warnings.filters[:] = new_filters
2325        warnings._filters_mutated()
2326
2327
2328# Skip a test if venv with pip is known to not work.
2329def requires_venv_with_pip():
2330    # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages)
2331    try:
2332        import zlib
2333    except ImportError:
2334        return unittest.skipIf(True, "venv: ensurepip requires zlib")
2335
2336    # bpo-26610: pip/pep425tags.py requires ctypes.
2337    # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1).
2338    try:
2339        import ctypes
2340    except ImportError:
2341        ctypes = None
2342    return unittest.skipUnless(ctypes, 'venv: pip requires ctypes')
2343
2344
2345@functools.cache
2346def _findwheel(pkgname):
2347    """Try to find a wheel with the package specified as pkgname.
2348
2349    If set, the wheels are searched for in WHEEL_PKG_DIR (see ensurepip).
2350    Otherwise, they are searched for in the test directory.
2351    """
2352    wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or os.path.join(
2353        TEST_HOME_DIR, 'wheeldata',
2354    )
2355    filenames = os.listdir(wheel_dir)
2356    filenames = sorted(filenames, reverse=True)  # approximate "newest" first
2357    for filename in filenames:
2358        # filename is like 'setuptools-67.6.1-py3-none-any.whl'
2359        if not filename.endswith(".whl"):
2360            continue
2361        prefix = pkgname + '-'
2362        if filename.startswith(prefix):
2363            return os.path.join(wheel_dir, filename)
2364    raise FileNotFoundError(f"No wheel for {pkgname} found in {wheel_dir}")
2365
2366
2367# Context manager that creates a virtual environment, install setuptools and wheel in it
2368# and returns the path to the venv directory and the path to the python executable
2369@contextlib.contextmanager
2370def setup_venv_with_pip_setuptools_wheel(venv_dir):
2371    import shlex
2372    import subprocess
2373    from .os_helper import temp_cwd
2374
2375    def run_command(cmd):
2376        if verbose:
2377            print()
2378            print('Run:', ' '.join(map(shlex.quote, cmd)))
2379            subprocess.run(cmd, check=True)
2380        else:
2381            subprocess.run(cmd,
2382                           stdout=subprocess.PIPE,
2383                           stderr=subprocess.STDOUT,
2384                           check=True)
2385
2386    with temp_cwd() as temp_dir:
2387        # Create virtual environment to get setuptools
2388        cmd = [sys.executable, '-X', 'dev', '-m', 'venv', venv_dir]
2389        run_command(cmd)
2390
2391        venv = os.path.join(temp_dir, venv_dir)
2392
2393        # Get the Python executable of the venv
2394        python_exe = os.path.basename(sys.executable)
2395        if sys.platform == 'win32':
2396            python = os.path.join(venv, 'Scripts', python_exe)
2397        else:
2398            python = os.path.join(venv, 'bin', python_exe)
2399
2400        cmd = [python, '-X', 'dev',
2401               '-m', 'pip', 'install',
2402               _findwheel('setuptools'),
2403               _findwheel('wheel')]
2404        run_command(cmd)
2405
2406        yield python
2407
2408
2409# True if Python is built with the Py_DEBUG macro defined: if
2410# Python is built in debug mode (./configure --with-pydebug).
2411Py_DEBUG = hasattr(sys, 'gettotalrefcount')
2412
2413
2414def late_deletion(obj):
2415    """
2416    Keep a Python alive as long as possible.
2417
2418    Create a reference cycle and store the cycle in an object deleted late in
2419    Python finalization. Try to keep the object alive until the very last
2420    garbage collection.
2421
2422    The function keeps a strong reference by design. It should be called in a
2423    subprocess to not mark a test as "leaking a reference".
2424    """
2425
2426    # Late CPython finalization:
2427    # - finalize_interp_clear()
2428    # - _PyInterpreterState_Clear(): Clear PyInterpreterState members
2429    #   (ex: codec_search_path, before_forkers)
2430    # - clear os.register_at_fork() callbacks
2431    # - clear codecs.register() callbacks
2432
2433    ref_cycle = [obj]
2434    ref_cycle.append(ref_cycle)
2435
2436    # Store a reference in PyInterpreterState.codec_search_path
2437    import codecs
2438    def search_func(encoding):
2439        return None
2440    search_func.reference = ref_cycle
2441    codecs.register(search_func)
2442
2443    if hasattr(os, 'register_at_fork'):
2444        # Store a reference in PyInterpreterState.before_forkers
2445        def atfork_func():
2446            pass
2447        atfork_func.reference = ref_cycle
2448        os.register_at_fork(before=atfork_func)
2449
2450
2451def busy_retry(timeout, err_msg=None, /, *, error=True):
2452    """
2453    Run the loop body until "break" stops the loop.
2454
2455    After *timeout* seconds, raise an AssertionError if *error* is true,
2456    or just stop if *error is false.
2457
2458    Example:
2459
2460        for _ in support.busy_retry(support.SHORT_TIMEOUT):
2461            if check():
2462                break
2463
2464    Example of error=False usage:
2465
2466        for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False):
2467            if check():
2468                break
2469        else:
2470            raise RuntimeError('my custom error')
2471
2472    """
2473    if timeout <= 0:
2474        raise ValueError("timeout must be greater than zero")
2475
2476    start_time = time.monotonic()
2477    deadline = start_time + timeout
2478
2479    while True:
2480        yield
2481
2482        if time.monotonic() >= deadline:
2483            break
2484
2485    if error:
2486        dt = time.monotonic() - start_time
2487        msg = f"timeout ({dt:.1f} seconds)"
2488        if err_msg:
2489            msg = f"{msg}: {err_msg}"
2490        raise AssertionError(msg)
2491
2492
2493def sleeping_retry(timeout, err_msg=None, /,
2494                     *, init_delay=0.010, max_delay=1.0, error=True):
2495    """
2496    Wait strategy that applies exponential backoff.
2497
2498    Run the loop body until "break" stops the loop. Sleep at each loop
2499    iteration, but not at the first iteration. The sleep delay is doubled at
2500    each iteration (up to *max_delay* seconds).
2501
2502    See busy_retry() documentation for the parameters usage.
2503
2504    Example raising an exception after SHORT_TIMEOUT seconds:
2505
2506        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
2507            if check():
2508                break
2509
2510    Example of error=False usage:
2511
2512        for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False):
2513            if check():
2514                break
2515        else:
2516            raise RuntimeError('my custom error')
2517    """
2518
2519    delay = init_delay
2520    for _ in busy_retry(timeout, err_msg, error=error):
2521        yield
2522
2523        time.sleep(delay)
2524        delay = min(delay * 2, max_delay)
2525
2526
2527class CPUStopwatch:
2528    """Context manager to roughly time a CPU-bound operation.
2529
2530    Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of
2531    other processes).
2532
2533    N.B.:
2534    - This *includes* time spent in other threads.
2535    - Some systems only have a coarse resolution; check
2536      stopwatch.clock_info.rseolution if.
2537
2538    Usage:
2539
2540    with ProcessStopwatch() as stopwatch:
2541        ...
2542    elapsed = stopwatch.seconds
2543    resolution = stopwatch.clock_info.resolution
2544    """
2545    def __enter__(self):
2546        get_time = time.process_time
2547        clock_info = time.get_clock_info('process_time')
2548        if get_time() <= 0:  # some platforms like WASM lack process_time()
2549            get_time = time.monotonic
2550            clock_info = time.get_clock_info('monotonic')
2551        self.context = disable_gc()
2552        self.context.__enter__()
2553        self.get_time = get_time
2554        self.clock_info = clock_info
2555        self.start_time = get_time()
2556        return self
2557
2558    def __exit__(self, *exc):
2559        try:
2560            end_time = self.get_time()
2561        finally:
2562            result = self.context.__exit__(*exc)
2563        self.seconds = end_time - self.start_time
2564        return result
2565
2566
2567@contextlib.contextmanager
2568def adjust_int_max_str_digits(max_digits):
2569    """Temporarily change the integer string conversion length limit."""
2570    current = sys.get_int_max_str_digits()
2571    try:
2572        sys.set_int_max_str_digits(max_digits)
2573        yield
2574    finally:
2575        sys.set_int_max_str_digits(current)
2576
2577
2578def get_c_recursion_limit():
2579    try:
2580        import _testcapi
2581        return _testcapi.Py_C_RECURSION_LIMIT
2582    except ImportError:
2583        raise unittest.SkipTest('requires _testcapi')
2584
2585
2586def exceeds_recursion_limit():
2587    """For recursion tests, easily exceeds default recursion limit."""
2588    return get_c_recursion_limit() * 3
2589
2590
2591# Windows doesn't have os.uname() but it doesn't support s390x.
2592is_s390x = hasattr(os, 'uname') and os.uname().machine == 's390x'
2593skip_on_s390x = unittest.skipIf(is_s390x, 'skipped on s390x')
2594
2595Py_TRACE_REFS = hasattr(sys, 'getobjects')
2596
2597# Decorator to disable optimizer while a function run
2598def without_optimizer(func):
2599    try:
2600        from _testinternalcapi import get_optimizer, set_optimizer
2601    except ImportError:
2602        return func
2603    @functools.wraps(func)
2604    def wrapper(*args, **kwargs):
2605        save_opt = get_optimizer()
2606        try:
2607            set_optimizer(None)
2608            return func(*args, **kwargs)
2609        finally:
2610            set_optimizer(save_opt)
2611    return wrapper
2612
2613
2614_BASE_COPY_SRC_DIR_IGNORED_NAMES = frozenset({
2615    # SRC_DIR/.git
2616    '.git',
2617    # ignore all __pycache__/ sub-directories
2618    '__pycache__',
2619})
2620
2621# Ignore function for shutil.copytree() to copy the Python source code.
2622def copy_python_src_ignore(path, names):
2623    ignored = _BASE_COPY_SRC_DIR_IGNORED_NAMES
2624    if os.path.basename(path) == 'Doc':
2625        ignored |= {
2626            # SRC_DIR/Doc/build/
2627            'build',
2628            # SRC_DIR/Doc/venv/
2629            'venv',
2630        }
2631
2632    # check if we are at the root of the source code
2633    elif 'Modules' in names:
2634        ignored |= {
2635            # SRC_DIR/build/
2636            'build',
2637        }
2638    return ignored
2639
2640
2641def iter_builtin_types():
2642    for obj in __builtins__.values():
2643        if not isinstance(obj, type):
2644            continue
2645        cls = obj
2646        if cls.__module__ != 'builtins':
2647            continue
2648        yield cls
2649
2650
2651def iter_slot_wrappers(cls):
2652    assert cls.__module__ == 'builtins', cls
2653
2654    def is_slot_wrapper(name, value):
2655        if not isinstance(value, types.WrapperDescriptorType):
2656            assert not repr(value).startswith('<slot wrapper '), (cls, name, value)
2657            return False
2658        assert repr(value).startswith('<slot wrapper '), (cls, name, value)
2659        assert callable(value), (cls, name, value)
2660        assert name.startswith('__') and name.endswith('__'), (cls, name, value)
2661        return True
2662
2663    ns = vars(cls)
2664    unused = set(ns)
2665    for name in dir(cls):
2666        if name in ns:
2667            unused.remove(name)
2668
2669        try:
2670            value = getattr(cls, name)
2671        except AttributeError:
2672            # It's as though it weren't in __dir__.
2673            assert name in ('__annotate__', '__annotations__', '__abstractmethods__'), (cls, name)
2674            if name in ns and is_slot_wrapper(name, ns[name]):
2675                unused.add(name)
2676            continue
2677
2678        if not name.startswith('__') or not name.endswith('__'):
2679            assert not is_slot_wrapper(name, value), (cls, name, value)
2680        if not is_slot_wrapper(name, value):
2681            if name in ns:
2682                assert not is_slot_wrapper(name, ns[name]), (cls, name, value, ns[name])
2683        else:
2684            if name in ns:
2685                assert ns[name] is value, (cls, name, value, ns[name])
2686                yield name, True
2687            else:
2688                yield name, False
2689
2690    for name in unused:
2691        value = ns[name]
2692        if is_slot_wrapper(cls, name, value):
2693            yield name, True
2694
2695
2696def force_not_colorized(func):
2697    """Force the terminal not to be colorized."""
2698    @functools.wraps(func)
2699    def wrapper(*args, **kwargs):
2700        import _colorize
2701        original_fn = _colorize.can_colorize
2702        variables: dict[str, str | None] = {
2703            "PYTHON_COLORS": None, "FORCE_COLOR": None, "NO_COLOR": None
2704        }
2705        try:
2706            for key in variables:
2707                variables[key] = os.environ.pop(key, None)
2708            os.environ["NO_COLOR"] = "1"
2709            _colorize.can_colorize = lambda: False
2710            return func(*args, **kwargs)
2711        finally:
2712            _colorize.can_colorize = original_fn
2713            del os.environ["NO_COLOR"]
2714            for key, value in variables.items():
2715                if value is not None:
2716                    os.environ[key] = value
2717    return wrapper
2718
2719
2720def initialized_with_pyrepl():
2721    """Detect whether PyREPL was used during Python initialization."""
2722    # If the main module has a __file__ attribute it's a Python module, which means PyREPL.
2723    return hasattr(sys.modules["__main__"], "__file__")
2724
2725
2726class BrokenIter:
2727    def __init__(self, init_raises=False, next_raises=False, iter_raises=False):
2728        if init_raises:
2729            1/0
2730        self.next_raises = next_raises
2731        self.iter_raises = iter_raises
2732
2733    def __next__(self):
2734        if self.next_raises:
2735            1/0
2736
2737    def __iter__(self):
2738        if self.iter_raises:
2739            1/0
2740        return self
2741