• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import collections.abc
7import contextlib
8import errno
9import fnmatch
10import functools
11import glob
12import importlib
13import importlib.util
14import os
15import platform
16import re
17import stat
18import struct
19import subprocess
20import sys
21import sysconfig
22import _thread
23import threading
24import time
25import types
26import unittest
27import warnings
28
29from .testresult import get_test_runner
30
31__all__ = [
32    # globals
33    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
34    # exceptions
35    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
36    # imports
37    "import_module", "import_fresh_module", "CleanImport",
38    # modules
39    "unload", "forget",
40    # io
41    "record_original_stdout", "get_original_stdout", "captured_stdout",
42    "captured_stdin", "captured_stderr",
43    # filesystem
44    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
45    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
46    # unittest
47    "is_resource_enabled", "requires", "requires_freebsd_version",
48    "requires_linux_version", "requires_mac_ver",
49    "check_syntax_error", "check_syntax_warning",
50    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
51    "BasicTestRunner", "run_unittest", "run_doctest",
52    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
53    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
54    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
55    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
56    "check__all__", "skip_if_buggy_ucrt_strfptime",
57    "ignore_warnings",
58    # sys
59    "is_jython", "is_android", "check_impl_detail", "unix_shell",
60    "setswitchinterval",
61    # network
62    "open_urlresource",
63    # processes
64    'temp_umask', "reap_children",
65    # threads
66    "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
67    # miscellaneous
68    "check_warnings", "check_no_resource_warning", "check_no_warnings",
69    "EnvironmentVarGuard",
70    "run_with_locale", "swap_item",
71    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
72    "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
73    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
74    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
75    ]
76
77
78# Timeout in seconds for tests using a network server listening on the network
79# local loopback interface like 127.0.0.1.
80#
81# The timeout is long enough to prevent test failure: it takes into account
82# that the client and the server can run in different threads or even different
83# processes.
84#
85# The timeout should be long enough for connect(), recv() and send() methods
86# of socket.socket.
87LOOPBACK_TIMEOUT = 5.0
88if sys.platform == 'win32' and platform.machine() == 'ARM':
89    # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
90    # seconds on Windows ARM32 buildbot
91    LOOPBACK_TIMEOUT = 10
92
93# Timeout in seconds for network requests going to the Internet. The timeout is
94# short enough to prevent a test to wait for too long if the Internet request
95# is blocked for whatever reason.
96#
97# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
98# but skip the test instead: see transient_internet().
99INTERNET_TIMEOUT = 60.0
100
101# Timeout in seconds to mark a test as failed if the test takes "too long".
102#
103# The timeout value depends on the regrtest --timeout command line option.
104#
105# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
106# LONG_TIMEOUT instead.
107SHORT_TIMEOUT = 30.0
108
109# Timeout in seconds to detect when a test hangs.
110#
111# It is long enough to reduce the risk of test failure on the slowest Python
112# buildbots. It should not be used to mark a test as failed if the test takes
113# "too long". The timeout value depends on the regrtest --timeout command line
114# option.
115LONG_TIMEOUT = 5 * 60.0
116
117
118class Error(Exception):
119    """Base class for regression test exceptions."""
120
121class TestFailed(Error):
122    """Test failed."""
123
124class TestDidNotRun(Error):
125    """Test did not run any subtests."""
126
127class ResourceDenied(unittest.SkipTest):
128    """Test skipped because it requested a disallowed resource.
129
130    This is raised when a test calls requires() for a resource that
131    has not be enabled.  It is used to distinguish between expected
132    and unexpected skips.
133    """
134
135@contextlib.contextmanager
136def _ignore_deprecated_imports(ignore=True):
137    """Context manager to suppress package and module deprecation
138    warnings when importing them.
139
140    If ignore is False, this context manager has no effect.
141    """
142    if ignore:
143        with warnings.catch_warnings():
144            warnings.filterwarnings("ignore", ".+ (module|package)",
145                                    DeprecationWarning)
146            yield
147    else:
148        yield
149
150
151def ignore_warnings(*, category):
152    """Decorator to suppress deprecation warnings.
153
154    Use of context managers to hide warnings make diffs
155    more noisy and tools like 'git blame' less useful.
156    """
157    def decorator(test):
158        @functools.wraps(test)
159        def wrapper(self, *args, **kwargs):
160            with warnings.catch_warnings():
161                warnings.simplefilter('ignore', category=category)
162                return test(self, *args, **kwargs)
163        return wrapper
164    return decorator
165
166
167def import_module(name, deprecated=False, *, required_on=()):
168    """Import and return the module to be tested, raising SkipTest if
169    it is not available.
170
171    If deprecated is True, any module or package deprecation messages
172    will be suppressed. If a module is required on a platform but optional for
173    others, set required_on to an iterable of platform prefixes which will be
174    compared against sys.platform.
175    """
176    with _ignore_deprecated_imports(deprecated):
177        try:
178            return importlib.import_module(name)
179        except ImportError as msg:
180            if sys.platform.startswith(tuple(required_on)):
181                raise
182            raise unittest.SkipTest(str(msg))
183
184
185def _save_and_remove_module(name, orig_modules):
186    """Helper function to save and remove a module from sys.modules
187
188    Raise ImportError if the module can't be imported.
189    """
190    # try to import the module and raise an error if it can't be imported
191    if name not in sys.modules:
192        __import__(name)
193        del sys.modules[name]
194    for modname in list(sys.modules):
195        if modname == name or modname.startswith(name + '.'):
196            orig_modules[modname] = sys.modules[modname]
197            del sys.modules[modname]
198
199def _save_and_block_module(name, orig_modules):
200    """Helper function to save and block a module in sys.modules
201
202    Return True if the module was in sys.modules, False otherwise.
203    """
204    saved = True
205    try:
206        orig_modules[name] = sys.modules[name]
207    except KeyError:
208        saved = False
209    sys.modules[name] = None
210    return saved
211
212
213def anticipate_failure(condition):
214    """Decorator to mark a test that is known to be broken in some cases
215
216       Any use of this decorator should have a comment identifying the
217       associated tracker issue.
218    """
219    if condition:
220        return unittest.expectedFailure
221    return lambda f: f
222
223def load_package_tests(pkg_dir, loader, standard_tests, pattern):
224    """Generic load_tests implementation for simple test packages.
225
226    Most packages can implement load_tests using this function as follows:
227
228       def load_tests(*args):
229           return load_package_tests(os.path.dirname(__file__), *args)
230    """
231    if pattern is None:
232        pattern = "test*"
233    top_dir = os.path.dirname(              # Lib
234                  os.path.dirname(              # test
235                      os.path.dirname(__file__)))   # support
236    package_tests = loader.discover(start_dir=pkg_dir,
237                                    top_level_dir=top_dir,
238                                    pattern=pattern)
239    standard_tests.addTests(package_tests)
240    return standard_tests
241
242
243def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
244    """Import and return a module, deliberately bypassing sys.modules.
245
246    This function imports and returns a fresh copy of the named Python module
247    by removing the named module from sys.modules before doing the import.
248    Note that unlike reload, the original module is not affected by
249    this operation.
250
251    *fresh* is an iterable of additional module names that are also removed
252    from the sys.modules cache before doing the import.
253
254    *blocked* is an iterable of module names that are replaced with None
255    in the module cache during the import to ensure that attempts to import
256    them raise ImportError.
257
258    The named module and any modules named in the *fresh* and *blocked*
259    parameters are saved before starting the import and then reinserted into
260    sys.modules when the fresh import is complete.
261
262    Module and package deprecation messages are suppressed during this import
263    if *deprecated* is True.
264
265    This function will raise ImportError if the named module cannot be
266    imported.
267    """
268    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
269    # to make sure that this utility function is working as expected
270    with _ignore_deprecated_imports(deprecated):
271        # Keep track of modules saved for later restoration as well
272        # as those which just need a blocking entry removed
273        orig_modules = {}
274        names_to_remove = []
275        _save_and_remove_module(name, orig_modules)
276        try:
277            for fresh_name in fresh:
278                _save_and_remove_module(fresh_name, orig_modules)
279            for blocked_name in blocked:
280                if not _save_and_block_module(blocked_name, orig_modules):
281                    names_to_remove.append(blocked_name)
282            fresh_module = importlib.import_module(name)
283        except ImportError:
284            fresh_module = None
285        finally:
286            for orig_name, module in orig_modules.items():
287                sys.modules[orig_name] = module
288            for name_to_remove in names_to_remove:
289                del sys.modules[name_to_remove]
290        return fresh_module
291
292
293def get_attribute(obj, name):
294    """Get an attribute, raising SkipTest if AttributeError is raised."""
295    try:
296        attribute = getattr(obj, name)
297    except AttributeError:
298        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
299    else:
300        return attribute
301
302verbose = 1              # Flag set to 0 by regrtest.py
303use_resources = None     # Flag set to [] by regrtest.py
304max_memuse = 0           # Disable bigmem tests (they will still be run with
305                         # small sizes, to make sure they work.)
306real_max_memuse = 0
307junit_xml_list = None    # list of testsuite XML elements
308failfast = False
309
310# _original_stdout is meant to hold stdout at the time regrtest began.
311# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
312# The point is to have some flavor of stdout the user can actually see.
313_original_stdout = None
314def record_original_stdout(stdout):
315    global _original_stdout
316    _original_stdout = stdout
317
318def get_original_stdout():
319    return _original_stdout or sys.stdout
320
321def unload(name):
322    try:
323        del sys.modules[name]
324    except KeyError:
325        pass
326
327def _force_run(path, func, *args):
328    try:
329        return func(*args)
330    except OSError as err:
331        if verbose >= 2:
332            print('%s: %s' % (err.__class__.__name__, err))
333            print('re-run %s%r' % (func.__name__, args))
334        os.chmod(path, stat.S_IRWXU)
335        return func(*args)
336
337if sys.platform.startswith("win"):
338    def _waitfor(func, pathname, waitall=False):
339        # Perform the operation
340        func(pathname)
341        # Now setup the wait loop
342        if waitall:
343            dirname = pathname
344        else:
345            dirname, name = os.path.split(pathname)
346            dirname = dirname or '.'
347        # Check for `pathname` to be removed from the filesystem.
348        # The exponential backoff of the timeout amounts to a total
349        # of ~1 second after which the deletion is probably an error
350        # anyway.
351        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
352        # required when contention occurs.
353        timeout = 0.001
354        while timeout < 1.0:
355            # Note we are only testing for the existence of the file(s) in
356            # the contents of the directory regardless of any security or
357            # access rights.  If we have made it this far, we have sufficient
358            # permissions to do that much using Python's equivalent of the
359            # Windows API FindFirstFile.
360            # Other Windows APIs can fail or give incorrect results when
361            # dealing with files that are pending deletion.
362            L = os.listdir(dirname)
363            if not (L if waitall else name in L):
364                return
365            # Increase the timeout and try again
366            time.sleep(timeout)
367            timeout *= 2
368        warnings.warn('tests may fail, delete still pending for ' + pathname,
369                      RuntimeWarning, stacklevel=4)
370
371    def _unlink(filename):
372        _waitfor(os.unlink, filename)
373
374    def _rmdir(dirname):
375        _waitfor(os.rmdir, dirname)
376
377    def _rmtree(path):
378        def _rmtree_inner(path):
379            for name in _force_run(path, os.listdir, path):
380                fullname = os.path.join(path, name)
381                try:
382                    mode = os.lstat(fullname).st_mode
383                except OSError as exc:
384                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
385                          file=sys.__stderr__)
386                    mode = 0
387                if stat.S_ISDIR(mode):
388                    _waitfor(_rmtree_inner, fullname, waitall=True)
389                    _force_run(fullname, os.rmdir, fullname)
390                else:
391                    _force_run(fullname, os.unlink, fullname)
392        _waitfor(_rmtree_inner, path, waitall=True)
393        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
394
395    def _longpath(path):
396        try:
397            import ctypes
398        except ImportError:
399            # No ctypes means we can't expands paths.
400            pass
401        else:
402            buffer = ctypes.create_unicode_buffer(len(path) * 2)
403            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
404                                                             len(buffer))
405            if length:
406                return buffer[:length]
407        return path
408else:
409    _unlink = os.unlink
410    _rmdir = os.rmdir
411
412    def _rmtree(path):
413        import shutil
414        try:
415            shutil.rmtree(path)
416            return
417        except OSError:
418            pass
419
420        def _rmtree_inner(path):
421            for name in _force_run(path, os.listdir, path):
422                fullname = os.path.join(path, name)
423                try:
424                    mode = os.lstat(fullname).st_mode
425                except OSError:
426                    mode = 0
427                if stat.S_ISDIR(mode):
428                    _rmtree_inner(fullname)
429                    _force_run(path, os.rmdir, fullname)
430                else:
431                    _force_run(path, os.unlink, fullname)
432        _rmtree_inner(path)
433        os.rmdir(path)
434
435    def _longpath(path):
436        return path
437
438def unlink(filename):
439    try:
440        _unlink(filename)
441    except (FileNotFoundError, NotADirectoryError):
442        pass
443
444def rmdir(dirname):
445    try:
446        _rmdir(dirname)
447    except FileNotFoundError:
448        pass
449
450def rmtree(path):
451    try:
452        _rmtree(path)
453    except FileNotFoundError:
454        pass
455
456def make_legacy_pyc(source):
457    """Move a PEP 3147/488 pyc file to its legacy pyc location.
458
459    :param source: The file system path to the source file.  The source file
460        does not need to exist, however the PEP 3147/488 pyc file must exist.
461    :return: The file system path to the legacy pyc file.
462    """
463    pyc_file = importlib.util.cache_from_source(source)
464    up_one = os.path.dirname(os.path.abspath(source))
465    legacy_pyc = os.path.join(up_one, source + 'c')
466    os.rename(pyc_file, legacy_pyc)
467    return legacy_pyc
468
469def forget(modname):
470    """'Forget' a module was ever imported.
471
472    This removes the module from sys.modules and deletes any PEP 3147/488 or
473    legacy .pyc files.
474    """
475    unload(modname)
476    for dirname in sys.path:
477        source = os.path.join(dirname, modname + '.py')
478        # It doesn't matter if they exist or not, unlink all possible
479        # combinations of PEP 3147/488 and legacy pyc files.
480        unlink(source + 'c')
481        for opt in ('', 1, 2):
482            unlink(importlib.util.cache_from_source(source, optimization=opt))
483
484# Check whether a gui is actually available
485def _is_gui_available():
486    if hasattr(_is_gui_available, 'result'):
487        return _is_gui_available.result
488    reason = None
489    if sys.platform.startswith('win') and platform.win32_is_iot():
490        reason = "gui is not available on Windows IoT Core"
491    elif sys.platform.startswith('win'):
492        # if Python is running as a service (such as the buildbot service),
493        # gui interaction may be disallowed
494        import ctypes
495        import ctypes.wintypes
496        UOI_FLAGS = 1
497        WSF_VISIBLE = 0x0001
498        class USEROBJECTFLAGS(ctypes.Structure):
499            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
500                        ("fReserved", ctypes.wintypes.BOOL),
501                        ("dwFlags", ctypes.wintypes.DWORD)]
502        dll = ctypes.windll.user32
503        h = dll.GetProcessWindowStation()
504        if not h:
505            raise ctypes.WinError()
506        uof = USEROBJECTFLAGS()
507        needed = ctypes.wintypes.DWORD()
508        res = dll.GetUserObjectInformationW(h,
509            UOI_FLAGS,
510            ctypes.byref(uof),
511            ctypes.sizeof(uof),
512            ctypes.byref(needed))
513        if not res:
514            raise ctypes.WinError()
515        if not bool(uof.dwFlags & WSF_VISIBLE):
516            reason = "gui not available (WSF_VISIBLE flag not set)"
517    elif sys.platform == 'darwin':
518        # The Aqua Tk implementations on OS X can abort the process if
519        # being called in an environment where a window server connection
520        # cannot be made, for instance when invoked by a buildbot or ssh
521        # process not running under the same user id as the current console
522        # user.  To avoid that, raise an exception if the window manager
523        # connection is not available.
524        from ctypes import cdll, c_int, pointer, Structure
525        from ctypes.util import find_library
526
527        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
528
529        if app_services.CGMainDisplayID() == 0:
530            reason = "gui tests cannot run without OS X window manager"
531        else:
532            class ProcessSerialNumber(Structure):
533                _fields_ = [("highLongOfPSN", c_int),
534                            ("lowLongOfPSN", c_int)]
535            psn = ProcessSerialNumber()
536            psn_p = pointer(psn)
537            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
538                  (app_services.SetFrontProcess(psn_p) < 0) ):
539                reason = "cannot run without OS X gui process"
540
541    # check on every platform whether tkinter can actually do anything
542    if not reason:
543        try:
544            from tkinter import Tk
545            root = Tk()
546            root.withdraw()
547            root.update()
548            root.destroy()
549        except Exception as e:
550            err_string = str(e)
551            if len(err_string) > 50:
552                err_string = err_string[:50] + ' [...]'
553            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
554                                                           err_string)
555
556    _is_gui_available.reason = reason
557    _is_gui_available.result = not reason
558
559    return _is_gui_available.result
560
561def is_resource_enabled(resource):
562    """Test whether a resource is enabled.
563
564    Known resources are set by regrtest.py.  If not running under regrtest.py,
565    all resources are assumed enabled unless use_resources has been set.
566    """
567    return use_resources is None or resource in use_resources
568
569def requires(resource, msg=None):
570    """Raise ResourceDenied if the specified resource is not available."""
571    if not is_resource_enabled(resource):
572        if msg is None:
573            msg = "Use of the %r resource not enabled" % resource
574        raise ResourceDenied(msg)
575    if resource == 'gui' and not _is_gui_available():
576        raise ResourceDenied(_is_gui_available.reason)
577
578def _requires_unix_version(sysname, min_version):
579    """Decorator raising SkipTest if the OS is `sysname` and the version is less
580    than `min_version`.
581
582    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
583    the FreeBSD version is less than 7.2.
584    """
585    import platform
586    min_version_txt = '.'.join(map(str, min_version))
587    version_txt = platform.release().split('-', 1)[0]
588    if platform.system() == sysname:
589        try:
590            version = tuple(map(int, version_txt.split('.')))
591        except ValueError:
592            skip = False
593        else:
594            skip = version < min_version
595    else:
596        skip = False
597
598    return unittest.skipIf(
599        skip,
600        f"{sysname} version {min_version_txt} or higher required, not "
601        f"{version_txt}"
602    )
603
604
605def requires_freebsd_version(*min_version):
606    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
607    less than `min_version`.
608
609    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
610    version is less than 7.2.
611    """
612    return _requires_unix_version('FreeBSD', min_version)
613
614def requires_linux_version(*min_version):
615    """Decorator raising SkipTest if the OS is Linux and the Linux version is
616    less than `min_version`.
617
618    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
619    version is less than 2.6.32.
620    """
621    return _requires_unix_version('Linux', min_version)
622
623def requires_mac_ver(*min_version):
624    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
625    version if less than min_version.
626
627    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
628    is lesser than 10.5.
629    """
630    def decorator(func):
631        @functools.wraps(func)
632        def wrapper(*args, **kw):
633            if sys.platform == 'darwin':
634                version_txt = platform.mac_ver()[0]
635                try:
636                    version = tuple(map(int, version_txt.split('.')))
637                except ValueError:
638                    pass
639                else:
640                    if version < min_version:
641                        min_version_txt = '.'.join(map(str, min_version))
642                        raise unittest.SkipTest(
643                            "Mac OS X %s or higher required, not %s"
644                            % (min_version_txt, version_txt))
645            return func(*args, **kw)
646        wrapper.min_version = min_version
647        return wrapper
648    return decorator
649
650
651def system_must_validate_cert(f):
652    """Skip the test on TLS certificate validation failures."""
653    @functools.wraps(f)
654    def dec(*args, **kwargs):
655        try:
656            f(*args, **kwargs)
657        except OSError as e:
658            if "CERTIFICATE_VERIFY_FAILED" in str(e):
659                raise unittest.SkipTest("system does not contain "
660                                        "necessary certificates")
661            raise
662    return dec
663
664# A constant likely larger than the underlying OS pipe buffer size, to
665# make writes blocking.
666# Windows limit seems to be around 512 B, and many Unix kernels have a
667# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
668# (see issue #17835 for a discussion of this number).
669PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
670
671# A constant likely larger than the underlying OS socket buffer size, to make
672# writes blocking.
673# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
674# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
675# for a discussion of this number).
676SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
677
678# decorator for skipping tests on non-IEEE 754 platforms
679requires_IEEE_754 = unittest.skipUnless(
680    float.__getformat__("double").startswith("IEEE"),
681    "test requires IEEE 754 doubles")
682
683def requires_zlib(reason='requires zlib'):
684    try:
685        import zlib
686    except ImportError:
687        zlib = None
688    return unittest.skipUnless(zlib, reason)
689
690def requires_gzip(reason='requires gzip'):
691    try:
692        import gzip
693    except ImportError:
694        gzip = None
695    return unittest.skipUnless(gzip, reason)
696
697def requires_bz2(reason='requires bz2'):
698    try:
699        import bz2
700    except ImportError:
701        bz2 = None
702    return unittest.skipUnless(bz2, reason)
703
704def requires_lzma(reason='requires lzma'):
705    try:
706        import lzma
707    except ImportError:
708        lzma = None
709    return unittest.skipUnless(lzma, reason)
710
711is_jython = sys.platform.startswith('java')
712
713is_android = hasattr(sys, 'getandroidapilevel')
714
715if sys.platform != 'win32':
716    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
717else:
718    unix_shell = None
719
720# Filename used for testing
721if os.name == 'java':
722    # Jython disallows @ in module names
723    TESTFN_ASCII = '$test'
724else:
725    TESTFN_ASCII = '@test'
726
727# Disambiguate TESTFN for parallel testing, while letting it remain a valid
728# module name.
729TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
730
731# Define the URL of a dedicated HTTP server for the network tests.
732# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
733TEST_HTTP_URL = "http://www.pythontest.net"
734
735# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
736# or an empty string if there is no such character.
737FS_NONASCII = ''
738for character in (
739    # First try printable and common characters to have a readable filename.
740    # For each character, the encoding list are just example of encodings able
741    # to encode the character (the list is not exhaustive).
742
743    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
744    '\u00E6',
745    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
746    '\u0130',
747    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
748    '\u0141',
749    # U+03C6 (Greek Small Letter Phi): cp1253
750    '\u03C6',
751    # U+041A (Cyrillic Capital Letter Ka): cp1251
752    '\u041A',
753    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
754    '\u05D0',
755    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
756    '\u060C',
757    # U+062A (Arabic Letter Teh): cp720
758    '\u062A',
759    # U+0E01 (Thai Character Ko Kai): cp874
760    '\u0E01',
761
762    # Then try more "special" characters. "special" because they may be
763    # interpreted or displayed differently depending on the exact locale
764    # encoding and the font.
765
766    # U+00A0 (No-Break Space)
767    '\u00A0',
768    # U+20AC (Euro Sign)
769    '\u20AC',
770):
771    try:
772        # If Python is set up to use the legacy 'mbcs' in Windows,
773        # 'replace' error mode is used, and encode() returns b'?'
774        # for characters missing in the ANSI codepage
775        if os.fsdecode(os.fsencode(character)) != character:
776            raise UnicodeError
777    except UnicodeError:
778        pass
779    else:
780        FS_NONASCII = character
781        break
782
783# TESTFN_UNICODE is a non-ascii filename
784TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
785if sys.platform == 'darwin':
786    # In Mac OS X's VFS API file names are, by definition, canonically
787    # decomposed Unicode, encoded using UTF-8. See QA1173:
788    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
789    import unicodedata
790    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
791TESTFN_ENCODING = sys.getfilesystemencoding()
792
793# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
794# encoded by the filesystem encoding (in strict mode). It can be None if we
795# cannot generate such filename.
796TESTFN_UNENCODABLE = None
797if os.name == 'nt':
798    # skip win32s (0) or Windows 9x/ME (1)
799    if sys.getwindowsversion().platform >= 2:
800        # Different kinds of characters from various languages to minimize the
801        # probability that the whole name is encodable to MBCS (issue #9819)
802        TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
803        try:
804            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
805        except UnicodeEncodeError:
806            pass
807        else:
808            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
809                  'Unicode filename tests may not be effective'
810                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
811            TESTFN_UNENCODABLE = None
812# Mac OS X denies unencodable filenames (invalid utf-8)
813elif sys.platform != 'darwin':
814    try:
815        # ascii and utf-8 cannot encode the byte 0xff
816        b'\xff'.decode(TESTFN_ENCODING)
817    except UnicodeDecodeError:
818        # 0xff will be encoded using the surrogate character u+DCFF
819        TESTFN_UNENCODABLE = TESTFN_ASCII \
820            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
821    else:
822        # File system encoding (eg. ISO-8859-* encodings) can encode
823        # the byte 0xff. Skip some unicode filename tests.
824        pass
825
826# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
827# decoded from the filesystem encoding (in strict mode). It can be None if we
828# cannot generate such filename (ex: the latin1 encoding can decode any byte
829# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
830# to the surrogateescape error handler (PEP 383), but not from the filesystem
831# encoding in strict mode.
832TESTFN_UNDECODABLE = None
833for name in (
834    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
835    # accepts it to create a file or a directory, or don't accept to enter to
836    # such directory (when the bytes name is used). So test b'\xe7' first: it is
837    # not decodable from cp932.
838    b'\xe7w\xf0',
839    # undecodable from ASCII, UTF-8
840    b'\xff',
841    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
842    # and cp857
843    b'\xae\xd5'
844    # undecodable from UTF-8 (UNIX and Mac OS X)
845    b'\xed\xb2\x80', b'\xed\xb4\x80',
846    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
847    # cp1253, cp1254, cp1255, cp1257, cp1258
848    b'\x81\x98',
849):
850    try:
851        name.decode(TESTFN_ENCODING)
852    except UnicodeDecodeError:
853        TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
854        break
855
856if FS_NONASCII:
857    TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
858else:
859    TESTFN_NONASCII = None
860TESTFN = TESTFN_NONASCII or TESTFN_ASCII
861
862# Save the initial cwd
863SAVEDCWD = os.getcwd()
864
865# Set by libregrtest/main.py so we can skip tests that are not
866# useful for PGO
867PGO = False
868
869# Set by libregrtest/main.py if we are running the extended (time consuming)
870# PGO task.  If this is True, PGO is also True.
871PGO_EXTENDED = False
872
873@contextlib.contextmanager
874def temp_dir(path=None, quiet=False):
875    """Return a context manager that creates a temporary directory.
876
877    Arguments:
878
879      path: the directory to create temporarily.  If omitted or None,
880        defaults to creating a temporary directory using tempfile.mkdtemp.
881
882      quiet: if False (the default), the context manager raises an exception
883        on error.  Otherwise, if the path is specified and cannot be
884        created, only a warning is issued.
885
886    """
887    import tempfile
888    dir_created = False
889    if path is None:
890        path = tempfile.mkdtemp()
891        dir_created = True
892        path = os.path.realpath(path)
893    else:
894        try:
895            os.mkdir(path)
896            dir_created = True
897        except OSError as exc:
898            if not quiet:
899                raise
900            warnings.warn(f'tests may fail, unable to create '
901                          f'temporary directory {path!r}: {exc}',
902                          RuntimeWarning, stacklevel=3)
903    if dir_created:
904        pid = os.getpid()
905    try:
906        yield path
907    finally:
908        # In case the process forks, let only the parent remove the
909        # directory. The child has a different process id. (bpo-30028)
910        if dir_created and pid == os.getpid():
911            rmtree(path)
912
913@contextlib.contextmanager
914def change_cwd(path, quiet=False):
915    """Return a context manager that changes the current working directory.
916
917    Arguments:
918
919      path: the directory to use as the temporary current working directory.
920
921      quiet: if False (the default), the context manager raises an exception
922        on error.  Otherwise, it issues only a warning and keeps the current
923        working directory the same.
924
925    """
926    saved_dir = os.getcwd()
927    try:
928        os.chdir(os.path.realpath(path))
929    except OSError as exc:
930        if not quiet:
931            raise
932        warnings.warn(f'tests may fail, unable to change the current working '
933                      f'directory to {path!r}: {exc}',
934                      RuntimeWarning, stacklevel=3)
935    try:
936        yield os.getcwd()
937    finally:
938        os.chdir(saved_dir)
939
940
941@contextlib.contextmanager
942def temp_cwd(name='tempcwd', quiet=False):
943    """
944    Context manager that temporarily creates and changes the CWD.
945
946    The function temporarily changes the current working directory
947    after creating a temporary directory in the current directory with
948    name *name*.  If *name* is None, the temporary directory is
949    created using tempfile.mkdtemp.
950
951    If *quiet* is False (default) and it is not possible to
952    create or change the CWD, an error is raised.  If *quiet* is True,
953    only a warning is raised and the original CWD is used.
954
955    """
956    with temp_dir(path=name, quiet=quiet) as temp_path:
957        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
958            yield cwd_dir
959
960if hasattr(os, "umask"):
961    @contextlib.contextmanager
962    def temp_umask(umask):
963        """Context manager that temporarily sets the process umask."""
964        oldmask = os.umask(umask)
965        try:
966            yield
967        finally:
968            os.umask(oldmask)
969
970# TEST_HOME_DIR refers to the top level directory of the "test" package
971# that contains Python's regression test suite
972TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
973TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
974
975# TEST_DATA_DIR is used as a target download location for remote resources
976TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
977
978def findfile(filename, subdir=None):
979    """Try to find a file on sys.path or in the test directory.  If it is not
980    found the argument passed to the function is returned (this does not
981    necessarily signal failure; could still be the legitimate path).
982
983    Setting *subdir* indicates a relative path to use to find the file
984    rather than looking directly in the path directories.
985    """
986    if os.path.isabs(filename):
987        return filename
988    if subdir is not None:
989        filename = os.path.join(subdir, filename)
990    path = [TEST_HOME_DIR] + sys.path
991    for dn in path:
992        fn = os.path.join(dn, filename)
993        if os.path.exists(fn): return fn
994    return filename
995
996def create_empty_file(filename):
997    """Create an empty file. If the file already exists, truncate it."""
998    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
999    os.close(fd)
1000
1001def sortdict(dict):
1002    "Like repr(dict), but in sorted order."
1003    items = sorted(dict.items())
1004    reprpairs = ["%r: %r" % pair for pair in items]
1005    withcommas = ", ".join(reprpairs)
1006    return "{%s}" % withcommas
1007
1008def make_bad_fd():
1009    """
1010    Create an invalid file descriptor by opening and closing a file and return
1011    its fd.
1012    """
1013    file = open(TESTFN, "wb")
1014    try:
1015        return file.fileno()
1016    finally:
1017        file.close()
1018        unlink(TESTFN)
1019
1020
1021def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
1022    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
1023        compile(statement, '<test string>', 'exec')
1024    err = cm.exception
1025    testcase.assertIsNotNone(err.lineno)
1026    if lineno is not None:
1027        testcase.assertEqual(err.lineno, lineno)
1028    testcase.assertIsNotNone(err.offset)
1029    if offset is not None:
1030        testcase.assertEqual(err.offset, offset)
1031
1032def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None):
1033    # Test also that a warning is emitted only once.
1034    with warnings.catch_warnings(record=True) as warns:
1035        warnings.simplefilter('always', SyntaxWarning)
1036        compile(statement, '<testcase>', 'exec')
1037    testcase.assertEqual(len(warns), 1, warns)
1038
1039    warn, = warns
1040    testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category)
1041    if errtext:
1042        testcase.assertRegex(str(warn.message), errtext)
1043    testcase.assertEqual(warn.filename, '<testcase>')
1044    testcase.assertIsNotNone(warn.lineno)
1045    if lineno is not None:
1046        testcase.assertEqual(warn.lineno, lineno)
1047
1048    # SyntaxWarning should be converted to SyntaxError when raised,
1049    # since the latter contains more information and provides better
1050    # error report.
1051    with warnings.catch_warnings(record=True) as warns:
1052        warnings.simplefilter('error', SyntaxWarning)
1053        check_syntax_error(testcase, statement, errtext,
1054                           lineno=lineno, offset=offset)
1055    # No warnings are leaked when a SyntaxError is raised.
1056    testcase.assertEqual(warns, [])
1057
1058
1059def open_urlresource(url, *args, **kw):
1060    import urllib.request, urllib.parse
1061    try:
1062        import gzip
1063    except ImportError:
1064        gzip = None
1065
1066    check = kw.pop('check', None)
1067
1068    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1069
1070    fn = os.path.join(TEST_DATA_DIR, filename)
1071
1072    def check_valid_file(fn):
1073        f = open(fn, *args, **kw)
1074        if check is None:
1075            return f
1076        elif check(f):
1077            f.seek(0)
1078            return f
1079        f.close()
1080
1081    if os.path.exists(fn):
1082        f = check_valid_file(fn)
1083        if f is not None:
1084            return f
1085        unlink(fn)
1086
1087    # Verify the requirement before downloading the file
1088    requires('urlfetch')
1089
1090    if verbose:
1091        print('\tfetching %s ...' % url, file=get_original_stdout())
1092    opener = urllib.request.build_opener()
1093    if gzip:
1094        opener.addheaders.append(('Accept-Encoding', 'gzip'))
1095    f = opener.open(url, timeout=INTERNET_TIMEOUT)
1096    if gzip and f.headers.get('Content-Encoding') == 'gzip':
1097        f = gzip.GzipFile(fileobj=f)
1098    try:
1099        with open(fn, "wb") as out:
1100            s = f.read()
1101            while s:
1102                out.write(s)
1103                s = f.read()
1104    finally:
1105        f.close()
1106
1107    f = check_valid_file(fn)
1108    if f is not None:
1109        return f
1110    raise TestFailed('invalid resource %r' % fn)
1111
1112
1113class WarningsRecorder(object):
1114    """Convenience wrapper for the warnings list returned on
1115       entry to the warnings.catch_warnings() context manager.
1116    """
1117    def __init__(self, warnings_list):
1118        self._warnings = warnings_list
1119        self._last = 0
1120
1121    def __getattr__(self, attr):
1122        if len(self._warnings) > self._last:
1123            return getattr(self._warnings[-1], attr)
1124        elif attr in warnings.WarningMessage._WARNING_DETAILS:
1125            return None
1126        raise AttributeError("%r has no attribute %r" % (self, attr))
1127
1128    @property
1129    def warnings(self):
1130        return self._warnings[self._last:]
1131
1132    def reset(self):
1133        self._last = len(self._warnings)
1134
1135
1136def _filterwarnings(filters, quiet=False):
1137    """Catch the warnings, then check if all the expected
1138    warnings have been raised and re-raise unexpected warnings.
1139    If 'quiet' is True, only re-raise the unexpected warnings.
1140    """
1141    # Clear the warning registry of the calling module
1142    # in order to re-raise the warnings.
1143    frame = sys._getframe(2)
1144    registry = frame.f_globals.get('__warningregistry__')
1145    if registry:
1146        registry.clear()
1147    with warnings.catch_warnings(record=True) as w:
1148        # Set filter "always" to record all warnings.  Because
1149        # test_warnings swap the module, we need to look up in
1150        # the sys.modules dictionary.
1151        sys.modules['warnings'].simplefilter("always")
1152        yield WarningsRecorder(w)
1153    # Filter the recorded warnings
1154    reraise = list(w)
1155    missing = []
1156    for msg, cat in filters:
1157        seen = False
1158        for w in reraise[:]:
1159            warning = w.message
1160            # Filter out the matching messages
1161            if (re.match(msg, str(warning), re.I) and
1162                issubclass(warning.__class__, cat)):
1163                seen = True
1164                reraise.remove(w)
1165        if not seen and not quiet:
1166            # This filter caught nothing
1167            missing.append((msg, cat.__name__))
1168    if reraise:
1169        raise AssertionError("unhandled warning %s" % reraise[0])
1170    if missing:
1171        raise AssertionError("filter (%r, %s) did not catch any warning" %
1172                             missing[0])
1173
1174
1175@contextlib.contextmanager
1176def check_warnings(*filters, **kwargs):
1177    """Context manager to silence warnings.
1178
1179    Accept 2-tuples as positional arguments:
1180        ("message regexp", WarningCategory)
1181
1182    Optional argument:
1183     - if 'quiet' is True, it does not fail if a filter catches nothing
1184        (default True without argument,
1185         default False if some filters are defined)
1186
1187    Without argument, it defaults to:
1188        check_warnings(("", Warning), quiet=True)
1189    """
1190    quiet = kwargs.get('quiet')
1191    if not filters:
1192        filters = (("", Warning),)
1193        # Preserve backward compatibility
1194        if quiet is None:
1195            quiet = True
1196    return _filterwarnings(filters, quiet)
1197
1198
1199@contextlib.contextmanager
1200def check_no_warnings(testcase, message='', category=Warning, force_gc=False):
1201    """Context manager to check that no warnings are emitted.
1202
1203    This context manager enables a given warning within its scope
1204    and checks that no warnings are emitted even with that warning
1205    enabled.
1206
1207    If force_gc is True, a garbage collection is attempted before checking
1208    for warnings. This may help to catch warnings emitted when objects
1209    are deleted, such as ResourceWarning.
1210
1211    Other keyword arguments are passed to warnings.filterwarnings().
1212    """
1213    with warnings.catch_warnings(record=True) as warns:
1214        warnings.filterwarnings('always',
1215                                message=message,
1216                                category=category)
1217        yield
1218        if force_gc:
1219            gc_collect()
1220    testcase.assertEqual(warns, [])
1221
1222
1223@contextlib.contextmanager
1224def check_no_resource_warning(testcase):
1225    """Context manager to check that no ResourceWarning is emitted.
1226
1227    Usage:
1228
1229        with check_no_resource_warning(self):
1230            f = open(...)
1231            ...
1232            del f
1233
1234    You must remove the object which may emit ResourceWarning before
1235    the end of the context manager.
1236    """
1237    with check_no_warnings(testcase, category=ResourceWarning, force_gc=True):
1238        yield
1239
1240
1241class CleanImport(object):
1242    """Context manager to force import to return a new module reference.
1243
1244    This is useful for testing module-level behaviours, such as
1245    the emission of a DeprecationWarning on import.
1246
1247    Use like this:
1248
1249        with CleanImport("foo"):
1250            importlib.import_module("foo") # new reference
1251    """
1252
1253    def __init__(self, *module_names):
1254        self.original_modules = sys.modules.copy()
1255        for module_name in module_names:
1256            if module_name in sys.modules:
1257                module = sys.modules[module_name]
1258                # It is possible that module_name is just an alias for
1259                # another module (e.g. stub for modules renamed in 3.x).
1260                # In that case, we also need delete the real module to clear
1261                # the import cache.
1262                if module.__name__ != module_name:
1263                    del sys.modules[module.__name__]
1264                del sys.modules[module_name]
1265
1266    def __enter__(self):
1267        return self
1268
1269    def __exit__(self, *ignore_exc):
1270        sys.modules.update(self.original_modules)
1271
1272
1273class EnvironmentVarGuard(collections.abc.MutableMapping):
1274
1275    """Class to help protect the environment variable properly.  Can be used as
1276    a context manager."""
1277
1278    def __init__(self):
1279        self._environ = os.environ
1280        self._changed = {}
1281
1282    def __getitem__(self, envvar):
1283        return self._environ[envvar]
1284
1285    def __setitem__(self, envvar, value):
1286        # Remember the initial value on the first access
1287        if envvar not in self._changed:
1288            self._changed[envvar] = self._environ.get(envvar)
1289        self._environ[envvar] = value
1290
1291    def __delitem__(self, envvar):
1292        # Remember the initial value on the first access
1293        if envvar not in self._changed:
1294            self._changed[envvar] = self._environ.get(envvar)
1295        if envvar in self._environ:
1296            del self._environ[envvar]
1297
1298    def keys(self):
1299        return self._environ.keys()
1300
1301    def __iter__(self):
1302        return iter(self._environ)
1303
1304    def __len__(self):
1305        return len(self._environ)
1306
1307    def set(self, envvar, value):
1308        self[envvar] = value
1309
1310    def unset(self, envvar):
1311        del self[envvar]
1312
1313    def __enter__(self):
1314        return self
1315
1316    def __exit__(self, *ignore_exc):
1317        for (k, v) in self._changed.items():
1318            if v is None:
1319                if k in self._environ:
1320                    del self._environ[k]
1321            else:
1322                self._environ[k] = v
1323        os.environ = self._environ
1324
1325
1326class DirsOnSysPath(object):
1327    """Context manager to temporarily add directories to sys.path.
1328
1329    This makes a copy of sys.path, appends any directories given
1330    as positional arguments, then reverts sys.path to the copied
1331    settings when the context ends.
1332
1333    Note that *all* sys.path modifications in the body of the
1334    context manager, including replacement of the object,
1335    will be reverted at the end of the block.
1336    """
1337
1338    def __init__(self, *paths):
1339        self.original_value = sys.path[:]
1340        self.original_object = sys.path
1341        sys.path.extend(paths)
1342
1343    def __enter__(self):
1344        return self
1345
1346    def __exit__(self, *ignore_exc):
1347        sys.path = self.original_object
1348        sys.path[:] = self.original_value
1349
1350
1351class TransientResource(object):
1352
1353    """Raise ResourceDenied if an exception is raised while the context manager
1354    is in effect that matches the specified exception and attributes."""
1355
1356    def __init__(self, exc, **kwargs):
1357        self.exc = exc
1358        self.attrs = kwargs
1359
1360    def __enter__(self):
1361        return self
1362
1363    def __exit__(self, type_=None, value=None, traceback=None):
1364        """If type_ is a subclass of self.exc and value has attributes matching
1365        self.attrs, raise ResourceDenied.  Otherwise let the exception
1366        propagate (if any)."""
1367        if type_ is not None and issubclass(self.exc, type_):
1368            for attr, attr_value in self.attrs.items():
1369                if not hasattr(value, attr):
1370                    break
1371                if getattr(value, attr) != attr_value:
1372                    break
1373            else:
1374                raise ResourceDenied("an optional resource is not available")
1375
1376# Context managers that raise ResourceDenied when various issues
1377# with the Internet connection manifest themselves as exceptions.
1378# XXX deprecate these and use transient_internet() instead
1379time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1380socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1381ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1382
1383
1384@contextlib.contextmanager
1385def captured_output(stream_name):
1386    """Return a context manager used by captured_stdout/stdin/stderr
1387    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1388    import io
1389    orig_stdout = getattr(sys, stream_name)
1390    setattr(sys, stream_name, io.StringIO())
1391    try:
1392        yield getattr(sys, stream_name)
1393    finally:
1394        setattr(sys, stream_name, orig_stdout)
1395
1396def captured_stdout():
1397    """Capture the output of sys.stdout:
1398
1399       with captured_stdout() as stdout:
1400           print("hello")
1401       self.assertEqual(stdout.getvalue(), "hello\\n")
1402    """
1403    return captured_output("stdout")
1404
1405def captured_stderr():
1406    """Capture the output of sys.stderr:
1407
1408       with captured_stderr() as stderr:
1409           print("hello", file=sys.stderr)
1410       self.assertEqual(stderr.getvalue(), "hello\\n")
1411    """
1412    return captured_output("stderr")
1413
1414def captured_stdin():
1415    """Capture the input to sys.stdin:
1416
1417       with captured_stdin() as stdin:
1418           stdin.write('hello\\n')
1419           stdin.seek(0)
1420           # call test code that consumes from sys.stdin
1421           captured = input()
1422       self.assertEqual(captured, "hello")
1423    """
1424    return captured_output("stdin")
1425
1426
1427def gc_collect():
1428    """Force as many objects as possible to be collected.
1429
1430    In non-CPython implementations of Python, this is needed because timely
1431    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1432    this can be the case in case of reference cycles.)  This means that __del__
1433    methods may be called later than expected and weakrefs may remain alive for
1434    longer than expected.  This function tries its best to force all garbage
1435    objects to disappear.
1436    """
1437    import gc
1438    gc.collect()
1439    if is_jython:
1440        time.sleep(0.1)
1441    gc.collect()
1442    gc.collect()
1443
1444@contextlib.contextmanager
1445def disable_gc():
1446    import gc
1447    have_gc = gc.isenabled()
1448    gc.disable()
1449    try:
1450        yield
1451    finally:
1452        if have_gc:
1453            gc.enable()
1454
1455
1456def python_is_optimized():
1457    """Find if Python was built with optimizations."""
1458    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1459    final_opt = ""
1460    for opt in cflags.split():
1461        if opt.startswith('-O'):
1462            final_opt = opt
1463    return final_opt not in ('', '-O0', '-Og')
1464
1465
1466_header = 'nP'
1467_align = '0n'
1468if hasattr(sys, "getobjects"):
1469    _header = '2P' + _header
1470    _align = '0P'
1471_vheader = _header + 'n'
1472
1473def calcobjsize(fmt):
1474    return struct.calcsize(_header + fmt + _align)
1475
1476def calcvobjsize(fmt):
1477    return struct.calcsize(_vheader + fmt + _align)
1478
1479
1480_TPFLAGS_HAVE_GC = 1<<14
1481_TPFLAGS_HEAPTYPE = 1<<9
1482
1483def check_sizeof(test, o, size):
1484    import _testinternalcapi
1485    result = sys.getsizeof(o)
1486    # add GC header size
1487    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1488        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1489        size += _testinternalcapi.SIZEOF_PYGC_HEAD
1490    msg = 'wrong size for %s: got %d, expected %d' \
1491            % (type(o), result, size)
1492    test.assertEqual(result, size, msg)
1493
1494#=======================================================================
1495# Decorator for running a function in a different locale, correctly resetting
1496# it afterwards.
1497
1498def run_with_locale(catstr, *locales):
1499    def decorator(func):
1500        def inner(*args, **kwds):
1501            try:
1502                import locale
1503                category = getattr(locale, catstr)
1504                orig_locale = locale.setlocale(category)
1505            except AttributeError:
1506                # if the test author gives us an invalid category string
1507                raise
1508            except:
1509                # cannot retrieve original locale, so do nothing
1510                locale = orig_locale = None
1511            else:
1512                for loc in locales:
1513                    try:
1514                        locale.setlocale(category, loc)
1515                        break
1516                    except:
1517                        pass
1518
1519            # now run the function, resetting the locale on exceptions
1520            try:
1521                return func(*args, **kwds)
1522            finally:
1523                if locale and orig_locale:
1524                    locale.setlocale(category, orig_locale)
1525        inner.__name__ = func.__name__
1526        inner.__doc__ = func.__doc__
1527        return inner
1528    return decorator
1529
1530#=======================================================================
1531# Decorator for running a function in a specific timezone, correctly
1532# resetting it afterwards.
1533
1534def run_with_tz(tz):
1535    def decorator(func):
1536        def inner(*args, **kwds):
1537            try:
1538                tzset = time.tzset
1539            except AttributeError:
1540                raise unittest.SkipTest("tzset required")
1541            if 'TZ' in os.environ:
1542                orig_tz = os.environ['TZ']
1543            else:
1544                orig_tz = None
1545            os.environ['TZ'] = tz
1546            tzset()
1547
1548            # now run the function, resetting the tz on exceptions
1549            try:
1550                return func(*args, **kwds)
1551            finally:
1552                if orig_tz is None:
1553                    del os.environ['TZ']
1554                else:
1555                    os.environ['TZ'] = orig_tz
1556                time.tzset()
1557
1558        inner.__name__ = func.__name__
1559        inner.__doc__ = func.__doc__
1560        return inner
1561    return decorator
1562
1563#=======================================================================
1564# Big-memory-test support. Separate from 'resources' because memory use
1565# should be configurable.
1566
1567# Some handy shorthands. Note that these are used for byte-limits as well
1568# as size-limits, in the various bigmem tests
1569_1M = 1024*1024
1570_1G = 1024 * _1M
1571_2G = 2 * _1G
1572_4G = 4 * _1G
1573
1574MAX_Py_ssize_t = sys.maxsize
1575
1576def set_memlimit(limit):
1577    global max_memuse
1578    global real_max_memuse
1579    sizes = {
1580        'k': 1024,
1581        'm': _1M,
1582        'g': _1G,
1583        't': 1024*_1G,
1584    }
1585    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1586                 re.IGNORECASE | re.VERBOSE)
1587    if m is None:
1588        raise ValueError('Invalid memory limit %r' % (limit,))
1589    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1590    real_max_memuse = memlimit
1591    if memlimit > MAX_Py_ssize_t:
1592        memlimit = MAX_Py_ssize_t
1593    if memlimit < _2G - 1:
1594        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1595    max_memuse = memlimit
1596
1597class _MemoryWatchdog:
1598    """An object which periodically watches the process' memory consumption
1599    and prints it out.
1600    """
1601
1602    def __init__(self):
1603        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1604        self.started = False
1605
1606    def start(self):
1607        try:
1608            f = open(self.procfile, 'r')
1609        except OSError as e:
1610            warnings.warn('/proc not available for stats: {}'.format(e),
1611                          RuntimeWarning)
1612            sys.stderr.flush()
1613            return
1614
1615        with f:
1616            watchdog_script = findfile("memory_watchdog.py")
1617            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1618                                                 stdin=f,
1619                                                 stderr=subprocess.DEVNULL)
1620        self.started = True
1621
1622    def stop(self):
1623        if self.started:
1624            self.mem_watchdog.terminate()
1625            self.mem_watchdog.wait()
1626
1627
1628def bigmemtest(size, memuse, dry_run=True):
1629    """Decorator for bigmem tests.
1630
1631    'size' is a requested size for the test (in arbitrary, test-interpreted
1632    units.) 'memuse' is the number of bytes per unit for the test, or a good
1633    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1634    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1635
1636    The 'size' argument is normally passed to the decorated test method as an
1637    extra argument. If 'dry_run' is true, the value passed to the test method
1638    may be less than the requested value. If 'dry_run' is false, it means the
1639    test doesn't support dummy runs when -M is not specified.
1640    """
1641    def decorator(f):
1642        def wrapper(self):
1643            size = wrapper.size
1644            memuse = wrapper.memuse
1645            if not real_max_memuse:
1646                maxsize = 5147
1647            else:
1648                maxsize = size
1649
1650            if ((real_max_memuse or not dry_run)
1651                and real_max_memuse < maxsize * memuse):
1652                raise unittest.SkipTest(
1653                    "not enough memory: %.1fG minimum needed"
1654                    % (size * memuse / (1024 ** 3)))
1655
1656            if real_max_memuse and verbose:
1657                print()
1658                print(" ... expected peak memory use: {peak:.1f}G"
1659                      .format(peak=size * memuse / (1024 ** 3)))
1660                watchdog = _MemoryWatchdog()
1661                watchdog.start()
1662            else:
1663                watchdog = None
1664
1665            try:
1666                return f(self, maxsize)
1667            finally:
1668                if watchdog:
1669                    watchdog.stop()
1670
1671        wrapper.size = size
1672        wrapper.memuse = memuse
1673        return wrapper
1674    return decorator
1675
1676def bigaddrspacetest(f):
1677    """Decorator for tests that fill the address space."""
1678    def wrapper(self):
1679        if max_memuse < MAX_Py_ssize_t:
1680            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1681                raise unittest.SkipTest(
1682                    "not enough memory: try a 32-bit build instead")
1683            else:
1684                raise unittest.SkipTest(
1685                    "not enough memory: %.1fG minimum needed"
1686                    % (MAX_Py_ssize_t / (1024 ** 3)))
1687        else:
1688            return f(self)
1689    return wrapper
1690
1691#=======================================================================
1692# unittest integration.
1693
1694class BasicTestRunner:
1695    def run(self, test):
1696        result = unittest.TestResult()
1697        test(result)
1698        return result
1699
1700def _id(obj):
1701    return obj
1702
1703def requires_resource(resource):
1704    if resource == 'gui' and not _is_gui_available():
1705        return unittest.skip(_is_gui_available.reason)
1706    if is_resource_enabled(resource):
1707        return _id
1708    else:
1709        return unittest.skip("resource {0!r} is not enabled".format(resource))
1710
1711def cpython_only(test):
1712    """
1713    Decorator for tests only applicable on CPython.
1714    """
1715    return impl_detail(cpython=True)(test)
1716
1717def impl_detail(msg=None, **guards):
1718    if check_impl_detail(**guards):
1719        return _id
1720    if msg is None:
1721        guardnames, default = _parse_guards(guards)
1722        if default:
1723            msg = "implementation detail not available on {0}"
1724        else:
1725            msg = "implementation detail specific to {0}"
1726        guardnames = sorted(guardnames.keys())
1727        msg = msg.format(' or '.join(guardnames))
1728    return unittest.skip(msg)
1729
1730def _parse_guards(guards):
1731    # Returns a tuple ({platform_name: run_me}, default_value)
1732    if not guards:
1733        return ({'cpython': True}, False)
1734    is_true = list(guards.values())[0]
1735    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1736    return (guards, not is_true)
1737
1738# Use the following check to guard CPython's implementation-specific tests --
1739# or to run them only on the implementation(s) guarded by the arguments.
1740def check_impl_detail(**guards):
1741    """This function returns True or False depending on the host platform.
1742       Examples:
1743          if check_impl_detail():               # only on CPython (default)
1744          if check_impl_detail(jython=True):    # only on Jython
1745          if check_impl_detail(cpython=False):  # everywhere except on CPython
1746    """
1747    guards, default = _parse_guards(guards)
1748    return guards.get(platform.python_implementation().lower(), default)
1749
1750
1751def no_tracing(func):
1752    """Decorator to temporarily turn off tracing for the duration of a test."""
1753    if not hasattr(sys, 'gettrace'):
1754        return func
1755    else:
1756        @functools.wraps(func)
1757        def wrapper(*args, **kwargs):
1758            original_trace = sys.gettrace()
1759            try:
1760                sys.settrace(None)
1761                return func(*args, **kwargs)
1762            finally:
1763                sys.settrace(original_trace)
1764        return wrapper
1765
1766
1767def refcount_test(test):
1768    """Decorator for tests which involve reference counting.
1769
1770    To start, the decorator does not run the test if is not run by CPython.
1771    After that, any trace function is unset during the test to prevent
1772    unexpected refcounts caused by the trace function.
1773
1774    """
1775    return no_tracing(cpython_only(test))
1776
1777
1778def _filter_suite(suite, pred):
1779    """Recursively filter test cases in a suite based on a predicate."""
1780    newtests = []
1781    for test in suite._tests:
1782        if isinstance(test, unittest.TestSuite):
1783            _filter_suite(test, pred)
1784            newtests.append(test)
1785        else:
1786            if pred(test):
1787                newtests.append(test)
1788    suite._tests = newtests
1789
1790def _run_suite(suite):
1791    """Run tests from a unittest.TestSuite-derived class."""
1792    runner = get_test_runner(sys.stdout,
1793                             verbosity=verbose,
1794                             capture_output=(junit_xml_list is not None))
1795
1796    result = runner.run(suite)
1797
1798    if junit_xml_list is not None:
1799        junit_xml_list.append(result.get_xml_element())
1800
1801    if not result.testsRun and not result.skipped:
1802        raise TestDidNotRun
1803    if not result.wasSuccessful():
1804        if len(result.errors) == 1 and not result.failures:
1805            err = result.errors[0][1]
1806        elif len(result.failures) == 1 and not result.errors:
1807            err = result.failures[0][1]
1808        else:
1809            err = "multiple errors occurred"
1810            if not verbose: err += "; run in verbose mode for details"
1811        raise TestFailed(err)
1812
1813
1814# By default, don't filter tests
1815_match_test_func = None
1816
1817_accept_test_patterns = None
1818_ignore_test_patterns = None
1819
1820
1821def match_test(test):
1822    # Function used by support.run_unittest() and regrtest --list-cases
1823    if _match_test_func is None:
1824        return True
1825    else:
1826        return _match_test_func(test.id())
1827
1828
1829def _is_full_match_test(pattern):
1830    # If a pattern contains at least one dot, it's considered
1831    # as a full test identifier.
1832    # Example: 'test.test_os.FileTests.test_access'.
1833    #
1834    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1835    # or '[!...]'. For example, ignore 'test_access*'.
1836    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1837
1838
1839def set_match_tests(accept_patterns=None, ignore_patterns=None):
1840    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1841
1842
1843    if accept_patterns is None:
1844        accept_patterns = ()
1845    if ignore_patterns is None:
1846        ignore_patterns = ()
1847
1848    accept_func = ignore_func = None
1849
1850    if accept_patterns != _accept_test_patterns:
1851        accept_patterns, accept_func = _compile_match_function(accept_patterns)
1852    if ignore_patterns != _ignore_test_patterns:
1853        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1854
1855    # Create a copy since patterns can be mutable and so modified later
1856    _accept_test_patterns = tuple(accept_patterns)
1857    _ignore_test_patterns = tuple(ignore_patterns)
1858
1859    if accept_func is not None or ignore_func is not None:
1860        def match_function(test_id):
1861            accept = True
1862            ignore = False
1863            if accept_func:
1864                accept = accept_func(test_id)
1865            if ignore_func:
1866                ignore = ignore_func(test_id)
1867            return accept and not ignore
1868
1869        _match_test_func = match_function
1870
1871
1872def _compile_match_function(patterns):
1873    if not patterns:
1874        func = None
1875        # set_match_tests(None) behaves as set_match_tests(())
1876        patterns = ()
1877    elif all(map(_is_full_match_test, patterns)):
1878        # Simple case: all patterns are full test identifier.
1879        # The test.bisect_cmd utility only uses such full test identifiers.
1880        func = set(patterns).__contains__
1881    else:
1882        regex = '|'.join(map(fnmatch.translate, patterns))
1883        # The search *is* case sensitive on purpose:
1884        # don't use flags=re.IGNORECASE
1885        regex_match = re.compile(regex).match
1886
1887        def match_test_regex(test_id):
1888            if regex_match(test_id):
1889                # The regex matches the whole identifier, for example
1890                # 'test.test_os.FileTests.test_access'.
1891                return True
1892            else:
1893                # Try to match parts of the test identifier.
1894                # For example, split 'test.test_os.FileTests.test_access'
1895                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1896                return any(map(regex_match, test_id.split(".")))
1897
1898        func = match_test_regex
1899
1900    return patterns, func
1901
1902
1903def run_unittest(*classes):
1904    """Run tests from unittest.TestCase-derived classes."""
1905    valid_types = (unittest.TestSuite, unittest.TestCase)
1906    suite = unittest.TestSuite()
1907    for cls in classes:
1908        if isinstance(cls, str):
1909            if cls in sys.modules:
1910                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1911            else:
1912                raise ValueError("str arguments must be keys in sys.modules")
1913        elif isinstance(cls, valid_types):
1914            suite.addTest(cls)
1915        else:
1916            suite.addTest(unittest.makeSuite(cls))
1917    _filter_suite(suite, match_test)
1918    _run_suite(suite)
1919
1920#=======================================================================
1921# Check for the presence of docstrings.
1922
1923# Rather than trying to enumerate all the cases where docstrings may be
1924# disabled, we just check for that directly
1925
1926def _check_docstrings():
1927    """Just used to check if docstrings are enabled"""
1928
1929MISSING_C_DOCSTRINGS = (check_impl_detail() and
1930                        sys.platform != 'win32' and
1931                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1932
1933HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1934                   not MISSING_C_DOCSTRINGS)
1935
1936requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1937                                          "test requires docstrings")
1938
1939
1940#=======================================================================
1941# doctest driver.
1942
1943def run_doctest(module, verbosity=None, optionflags=0):
1944    """Run doctest on the given module.  Return (#failures, #tests).
1945
1946    If optional argument verbosity is not specified (or is None), pass
1947    support's belief about verbosity on to doctest.  Else doctest's
1948    usual behavior is used (it searches sys.argv for -v).
1949    """
1950
1951    import doctest
1952
1953    if verbosity is None:
1954        verbosity = verbose
1955    else:
1956        verbosity = None
1957
1958    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1959    if f:
1960        raise TestFailed("%d of %d doctests failed" % (f, t))
1961    if verbose:
1962        print('doctest (%s) ... %d tests with zero failures' %
1963              (module.__name__, t))
1964    return f, t
1965
1966
1967#=======================================================================
1968# Support for saving and restoring the imported modules.
1969
1970def print_warning(msg):
1971    # bpo-39983: Print into sys.__stderr__ to display the warning even
1972    # when sys.stderr is captured temporarily by a test
1973    for line in msg.splitlines():
1974        print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
1975
1976def modules_setup():
1977    return sys.modules.copy(),
1978
1979def modules_cleanup(oldmodules):
1980    # Encoders/decoders are registered permanently within the internal
1981    # codec cache. If we destroy the corresponding modules their
1982    # globals will be set to None which will trip up the cached functions.
1983    encodings = [(k, v) for k, v in sys.modules.items()
1984                 if k.startswith('encodings.')]
1985    sys.modules.clear()
1986    sys.modules.update(encodings)
1987    # XXX: This kind of problem can affect more than just encodings. In particular
1988    # extension modules (such as _ssl) don't cope with reloading properly.
1989    # Really, test modules should be cleaning out the test specific modules they
1990    # know they added (ala test_runpy) rather than relying on this function (as
1991    # test_importhooks and test_pkg do currently).
1992    # Implicitly imported *real* modules should be left alone (see issue 10556).
1993    sys.modules.update(oldmodules)
1994
1995#=======================================================================
1996# Threading support to prevent reporting refleaks when running regrtest.py -R
1997
1998# Flag used by saved_test_environment of test.libregrtest.save_env,
1999# to check if a test modified the environment. The flag should be set to False
2000# before running a new test.
2001#
2002# For example, threading_cleanup() sets the flag is the function fails
2003# to cleanup threads.
2004environment_altered = False
2005
2006# NOTE: we use thread._count() rather than threading.enumerate() (or the
2007# moral equivalent thereof) because a threading.Thread object is still alive
2008# until its __bootstrap() method has returned, even after it has been
2009# unregistered from the threading module.
2010# thread._count(), on the other hand, only gets decremented *after* the
2011# __bootstrap() method has returned, which gives us reliable reference counts
2012# at the end of a test run.
2013
2014def threading_setup():
2015    return _thread._count(), threading._dangling.copy()
2016
2017def threading_cleanup(*original_values):
2018    global environment_altered
2019
2020    _MAX_COUNT = 100
2021
2022    for count in range(_MAX_COUNT):
2023        values = _thread._count(), threading._dangling
2024        if values == original_values:
2025            break
2026
2027        if not count:
2028            # Display a warning at the first iteration
2029            environment_altered = True
2030            dangling_threads = values[1]
2031            print_warning(f"threading_cleanup() failed to cleanup "
2032                          f"{values[0] - original_values[0]} threads "
2033                          f"(count: {values[0]}, "
2034                          f"dangling: {len(dangling_threads)})")
2035            for thread in dangling_threads:
2036                print_warning(f"Dangling thread: {thread!r}")
2037
2038            # Don't hold references to threads
2039            dangling_threads = None
2040        values = None
2041
2042        time.sleep(0.01)
2043        gc_collect()
2044
2045
2046def reap_threads(func):
2047    """Use this function when threads are being used.  This will
2048    ensure that the threads are cleaned up even when the test fails.
2049    """
2050    @functools.wraps(func)
2051    def decorator(*args):
2052        key = threading_setup()
2053        try:
2054            return func(*args)
2055        finally:
2056            threading_cleanup(*key)
2057    return decorator
2058
2059
2060@contextlib.contextmanager
2061def wait_threads_exit(timeout=None):
2062    """
2063    bpo-31234: Context manager to wait until all threads created in the with
2064    statement exit.
2065
2066    Use _thread.count() to check if threads exited. Indirectly, wait until
2067    threads exit the internal t_bootstrap() C function of the _thread module.
2068
2069    threading_setup() and threading_cleanup() are designed to emit a warning
2070    if a test leaves running threads in the background. This context manager
2071    is designed to cleanup threads started by the _thread.start_new_thread()
2072    which doesn't allow to wait for thread exit, whereas thread.Thread has a
2073    join() method.
2074    """
2075    if timeout is None:
2076        timeout = SHORT_TIMEOUT
2077    old_count = _thread._count()
2078    try:
2079        yield
2080    finally:
2081        start_time = time.monotonic()
2082        deadline = start_time + timeout
2083        while True:
2084            count = _thread._count()
2085            if count <= old_count:
2086                break
2087            if time.monotonic() > deadline:
2088                dt = time.monotonic() - start_time
2089                msg = (f"wait_threads() failed to cleanup {count - old_count} "
2090                       f"threads after {dt:.1f} seconds "
2091                       f"(count: {count}, old count: {old_count})")
2092                raise AssertionError(msg)
2093            time.sleep(0.010)
2094            gc_collect()
2095
2096
2097def join_thread(thread, timeout=None):
2098    """Join a thread. Raise an AssertionError if the thread is still alive
2099    after timeout seconds.
2100    """
2101    if timeout is None:
2102        timeout = SHORT_TIMEOUT
2103    thread.join(timeout)
2104    if thread.is_alive():
2105        msg = f"failed to join the thread in {timeout:.1f} seconds"
2106        raise AssertionError(msg)
2107
2108
2109def reap_children():
2110    """Use this function at the end of test_main() whenever sub-processes
2111    are started.  This will help ensure that no extra children (zombies)
2112    stick around to hog resources and create problems when looking
2113    for refleaks.
2114    """
2115    global environment_altered
2116
2117    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
2118    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
2119        return
2120
2121    # Reap all our dead child processes so we don't leave zombies around.
2122    # These hog resources and might be causing some of the buildbots to die.
2123    while True:
2124        try:
2125            # Read the exit status of any child process which already completed
2126            pid, status = os.waitpid(-1, os.WNOHANG)
2127        except OSError:
2128            break
2129
2130        if pid == 0:
2131            break
2132
2133        print_warning(f"reap_children() reaped child process {pid}")
2134        environment_altered = True
2135
2136
2137@contextlib.contextmanager
2138def start_threads(threads, unlock=None):
2139    import faulthandler
2140    threads = list(threads)
2141    started = []
2142    try:
2143        try:
2144            for t in threads:
2145                t.start()
2146                started.append(t)
2147        except:
2148            if verbose:
2149                print("Can't start %d threads, only %d threads started" %
2150                      (len(threads), len(started)))
2151            raise
2152        yield
2153    finally:
2154        try:
2155            if unlock:
2156                unlock()
2157            endtime = starttime = time.monotonic()
2158            for timeout in range(1, 16):
2159                endtime += 60
2160                for t in started:
2161                    t.join(max(endtime - time.monotonic(), 0.01))
2162                started = [t for t in started if t.is_alive()]
2163                if not started:
2164                    break
2165                if verbose:
2166                    print('Unable to join %d threads during a period of '
2167                          '%d minutes' % (len(started), timeout))
2168        finally:
2169            started = [t for t in started if t.is_alive()]
2170            if started:
2171                faulthandler.dump_traceback(sys.stdout)
2172                raise AssertionError('Unable to join %d threads' % len(started))
2173
2174@contextlib.contextmanager
2175def swap_attr(obj, attr, new_val):
2176    """Temporary swap out an attribute with a new object.
2177
2178    Usage:
2179        with swap_attr(obj, "attr", 5):
2180            ...
2181
2182        This will set obj.attr to 5 for the duration of the with: block,
2183        restoring the old value at the end of the block. If `attr` doesn't
2184        exist on `obj`, it will be created and then deleted at the end of the
2185        block.
2186
2187        The old value (or None if it doesn't exist) will be assigned to the
2188        target of the "as" clause, if there is one.
2189    """
2190    if hasattr(obj, attr):
2191        real_val = getattr(obj, attr)
2192        setattr(obj, attr, new_val)
2193        try:
2194            yield real_val
2195        finally:
2196            setattr(obj, attr, real_val)
2197    else:
2198        setattr(obj, attr, new_val)
2199        try:
2200            yield
2201        finally:
2202            if hasattr(obj, attr):
2203                delattr(obj, attr)
2204
2205@contextlib.contextmanager
2206def swap_item(obj, item, new_val):
2207    """Temporary swap out an item with a new object.
2208
2209    Usage:
2210        with swap_item(obj, "item", 5):
2211            ...
2212
2213        This will set obj["item"] to 5 for the duration of the with: block,
2214        restoring the old value at the end of the block. If `item` doesn't
2215        exist on `obj`, it will be created and then deleted at the end of the
2216        block.
2217
2218        The old value (or None if it doesn't exist) will be assigned to the
2219        target of the "as" clause, if there is one.
2220    """
2221    if item in obj:
2222        real_val = obj[item]
2223        obj[item] = new_val
2224        try:
2225            yield real_val
2226        finally:
2227            obj[item] = real_val
2228    else:
2229        obj[item] = new_val
2230        try:
2231            yield
2232        finally:
2233            if item in obj:
2234                del obj[item]
2235
2236def args_from_interpreter_flags():
2237    """Return a list of command-line arguments reproducing the current
2238    settings in sys.flags and sys.warnoptions."""
2239    return subprocess._args_from_interpreter_flags()
2240
2241def optim_args_from_interpreter_flags():
2242    """Return a list of command-line arguments reproducing the current
2243    optimization settings in sys.flags."""
2244    return subprocess._optim_args_from_interpreter_flags()
2245
2246
2247class Matcher(object):
2248
2249    _partial_matches = ('msg', 'message')
2250
2251    def matches(self, d, **kwargs):
2252        """
2253        Try to match a single dict with the supplied arguments.
2254
2255        Keys whose values are strings and which are in self._partial_matches
2256        will be checked for partial (i.e. substring) matches. You can extend
2257        this scheme to (for example) do regular expression matching, etc.
2258        """
2259        result = True
2260        for k in kwargs:
2261            v = kwargs[k]
2262            dv = d.get(k)
2263            if not self.match_value(k, dv, v):
2264                result = False
2265                break
2266        return result
2267
2268    def match_value(self, k, dv, v):
2269        """
2270        Try to match a single stored value (dv) with a supplied value (v).
2271        """
2272        if type(v) != type(dv):
2273            result = False
2274        elif type(dv) is not str or k not in self._partial_matches:
2275            result = (v == dv)
2276        else:
2277            result = dv.find(v) >= 0
2278        return result
2279
2280
2281_can_symlink = None
2282def can_symlink():
2283    global _can_symlink
2284    if _can_symlink is not None:
2285        return _can_symlink
2286    symlink_path = TESTFN + "can_symlink"
2287    try:
2288        os.symlink(TESTFN, symlink_path)
2289        can = True
2290    except (OSError, NotImplementedError, AttributeError):
2291        can = False
2292    else:
2293        os.remove(symlink_path)
2294    _can_symlink = can
2295    return can
2296
2297def skip_unless_symlink(test):
2298    """Skip decorator for tests that require functional symlink"""
2299    ok = can_symlink()
2300    msg = "Requires functional symlink implementation"
2301    return test if ok else unittest.skip(msg)(test)
2302
2303_buggy_ucrt = None
2304def skip_if_buggy_ucrt_strfptime(test):
2305    """
2306    Skip decorator for tests that use buggy strptime/strftime
2307
2308    If the UCRT bugs are present time.localtime().tm_zone will be
2309    an empty string, otherwise we assume the UCRT bugs are fixed
2310
2311    See bpo-37552 [Windows] strptime/strftime return invalid
2312    results with UCRT version 17763.615
2313    """
2314    import locale
2315    global _buggy_ucrt
2316    if _buggy_ucrt is None:
2317        if(sys.platform == 'win32' and
2318                locale.getdefaultlocale()[1]  == 'cp65001' and
2319                time.localtime().tm_zone == ''):
2320            _buggy_ucrt = True
2321        else:
2322            _buggy_ucrt = False
2323    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
2324
2325class PythonSymlink:
2326    """Creates a symlink for the current Python executable"""
2327    def __init__(self, link=None):
2328        self.link = link or os.path.abspath(TESTFN)
2329        self._linked = []
2330        self.real = os.path.realpath(sys.executable)
2331        self._also_link = []
2332
2333        self._env = None
2334
2335        self._platform_specific()
2336
2337    def _platform_specific(self):
2338        pass
2339
2340    if sys.platform == "win32":
2341        def _platform_specific(self):
2342            import _winapi
2343
2344            if os.path.lexists(self.real) and not os.path.exists(self.real):
2345                # App symlink appears to not exist, but we want the
2346                # real executable here anyway
2347                self.real = _winapi.GetModuleFileName(0)
2348
2349            dll = _winapi.GetModuleFileName(sys.dllhandle)
2350            src_dir = os.path.dirname(dll)
2351            dest_dir = os.path.dirname(self.link)
2352            self._also_link.append((
2353                dll,
2354                os.path.join(dest_dir, os.path.basename(dll))
2355            ))
2356            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
2357                self._also_link.append((
2358                    runtime,
2359                    os.path.join(dest_dir, os.path.basename(runtime))
2360                ))
2361
2362            self._env = {k.upper(): os.getenv(k) for k in os.environ}
2363            self._env["PYTHONHOME"] = os.path.dirname(self.real)
2364            if sysconfig.is_python_build(True):
2365                self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
2366
2367    def __enter__(self):
2368        os.symlink(self.real, self.link)
2369        self._linked.append(self.link)
2370        for real, link in self._also_link:
2371            os.symlink(real, link)
2372            self._linked.append(link)
2373        return self
2374
2375    def __exit__(self, exc_type, exc_value, exc_tb):
2376        for link in self._linked:
2377            try:
2378                os.remove(link)
2379            except IOError as ex:
2380                if verbose:
2381                    print("failed to clean up {}: {}".format(link, ex))
2382
2383    def _call(self, python, args, env, returncode):
2384        cmd = [python, *args]
2385        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
2386                             stderr=subprocess.PIPE, env=env)
2387        r = p.communicate()
2388        if p.returncode != returncode:
2389            if verbose:
2390                print(repr(r[0]))
2391                print(repr(r[1]), file=sys.stderr)
2392            raise RuntimeError(
2393                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
2394        return r
2395
2396    def call_real(self, *args, returncode=0):
2397        return self._call(self.real, args, None, returncode)
2398
2399    def call_link(self, *args, returncode=0):
2400        return self._call(self.link, args, self._env, returncode)
2401
2402
2403_can_xattr = None
2404def can_xattr():
2405    import tempfile
2406    global _can_xattr
2407    if _can_xattr is not None:
2408        return _can_xattr
2409    if not hasattr(os, "setxattr"):
2410        can = False
2411    else:
2412        tmp_dir = tempfile.mkdtemp()
2413        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
2414        try:
2415            with open(TESTFN, "wb") as fp:
2416                try:
2417                    # TESTFN & tempfile may use different file systems with
2418                    # different capabilities
2419                    os.setxattr(tmp_fp, b"user.test", b"")
2420                    os.setxattr(tmp_name, b"trusted.foo", b"42")
2421                    os.setxattr(fp.fileno(), b"user.test", b"")
2422                    # Kernels < 2.6.39 don't respect setxattr flags.
2423                    kernel_version = platform.release()
2424                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
2425                    can = m is None or int(m.group(1)) >= 39
2426                except OSError:
2427                    can = False
2428        finally:
2429            unlink(TESTFN)
2430            unlink(tmp_name)
2431            rmdir(tmp_dir)
2432    _can_xattr = can
2433    return can
2434
2435def skip_unless_xattr(test):
2436    """Skip decorator for tests that require functional extended attributes"""
2437    ok = can_xattr()
2438    msg = "no non-broken extended attribute support"
2439    return test if ok else unittest.skip(msg)(test)
2440
2441def skip_if_pgo_task(test):
2442    """Skip decorator for tests not run in (non-extended) PGO task"""
2443    ok = not PGO or PGO_EXTENDED
2444    msg = "Not run for (non-extended) PGO task"
2445    return test if ok else unittest.skip(msg)(test)
2446
2447
2448def fs_is_case_insensitive(directory):
2449    """Detects if the file system for the specified directory is case-insensitive."""
2450    import tempfile
2451    with tempfile.NamedTemporaryFile(dir=directory) as base:
2452        base_path = base.name
2453        case_path = base_path.upper()
2454        if case_path == base_path:
2455            case_path = base_path.lower()
2456        try:
2457            return os.path.samefile(base_path, case_path)
2458        except FileNotFoundError:
2459            return False
2460
2461
2462def detect_api_mismatch(ref_api, other_api, *, ignore=()):
2463    """Returns the set of items in ref_api not in other_api, except for a
2464    defined list of items to be ignored in this check.
2465
2466    By default this skips private attributes beginning with '_' but
2467    includes all magic methods, i.e. those starting and ending in '__'.
2468    """
2469    missing_items = set(dir(ref_api)) - set(dir(other_api))
2470    if ignore:
2471        missing_items -= set(ignore)
2472    missing_items = set(m for m in missing_items
2473                        if not m.startswith('_') or m.endswith('__'))
2474    return missing_items
2475
2476
2477def check__all__(test_case, module, name_of_module=None, extra=(),
2478                 blacklist=()):
2479    """Assert that the __all__ variable of 'module' contains all public names.
2480
2481    The module's public names (its API) are detected automatically based on
2482    whether they match the public name convention and were defined in
2483    'module'.
2484
2485    The 'name_of_module' argument can specify (as a string or tuple thereof)
2486    what module(s) an API could be defined in in order to be detected as a
2487    public API. One case for this is when 'module' imports part of its public
2488    API from other modules, possibly a C backend (like 'csv' and its '_csv').
2489
2490    The 'extra' argument can be a set of names that wouldn't otherwise be
2491    automatically detected as "public", like objects without a proper
2492    '__module__' attribute. If provided, it will be added to the
2493    automatically detected ones.
2494
2495    The 'blacklist' argument can be a set of names that must not be treated
2496    as part of the public API even though their names indicate otherwise.
2497
2498    Usage:
2499        import bar
2500        import foo
2501        import unittest
2502        from test import support
2503
2504        class MiscTestCase(unittest.TestCase):
2505            def test__all__(self):
2506                support.check__all__(self, foo)
2507
2508        class OtherTestCase(unittest.TestCase):
2509            def test__all__(self):
2510                extra = {'BAR_CONST', 'FOO_CONST'}
2511                blacklist = {'baz'}  # Undocumented name.
2512                # bar imports part of its API from _bar.
2513                support.check__all__(self, bar, ('bar', '_bar'),
2514                                     extra=extra, blacklist=blacklist)
2515
2516    """
2517
2518    if name_of_module is None:
2519        name_of_module = (module.__name__, )
2520    elif isinstance(name_of_module, str):
2521        name_of_module = (name_of_module, )
2522
2523    expected = set(extra)
2524
2525    for name in dir(module):
2526        if name.startswith('_') or name in blacklist:
2527            continue
2528        obj = getattr(module, name)
2529        if (getattr(obj, '__module__', None) in name_of_module or
2530                (not hasattr(obj, '__module__') and
2531                 not isinstance(obj, types.ModuleType))):
2532            expected.add(name)
2533    test_case.assertCountEqual(module.__all__, expected)
2534
2535
2536def suppress_msvcrt_asserts(verbose=False):
2537    try:
2538        import msvcrt
2539    except ImportError:
2540        return
2541
2542    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
2543                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
2544                        | msvcrt.SEM_NOGPFAULTERRORBOX
2545                        | msvcrt.SEM_NOOPENFILEERRORBOX)
2546
2547    # CrtSetReportMode() is only available in debug build
2548    if hasattr(msvcrt, 'CrtSetReportMode'):
2549        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
2550            if verbose:
2551                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
2552                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
2553            else:
2554                msvcrt.CrtSetReportMode(m, 0)
2555
2556
2557class SuppressCrashReport:
2558    """Try to prevent a crash report from popping up.
2559
2560    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
2561    disable the creation of coredump file.
2562    """
2563    old_value = None
2564    old_modes = None
2565
2566    def __enter__(self):
2567        """On Windows, disable Windows Error Reporting dialogs using
2568        SetErrorMode() and CrtSetReportMode().
2569
2570        On UNIX, try to save the previous core file size limit, then set
2571        soft limit to 0.
2572        """
2573        if sys.platform.startswith('win'):
2574            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2575            # GetErrorMode is not available on Windows XP and Windows Server 2003,
2576            # but SetErrorMode returns the previous value, so we can use that
2577            try:
2578                import msvcrt
2579            except ImportError:
2580                return
2581
2582            self.old_value = msvcrt.SetErrorMode(msvcrt.SEM_NOGPFAULTERRORBOX)
2583
2584            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
2585
2586            # bpo-23314: Suppress assert dialogs in debug builds.
2587            # CrtSetReportMode() is only available in debug build.
2588            if hasattr(msvcrt, 'CrtSetReportMode'):
2589                self.old_modes = {}
2590                for report_type in [msvcrt.CRT_WARN,
2591                                    msvcrt.CRT_ERROR,
2592                                    msvcrt.CRT_ASSERT]:
2593                    old_mode = msvcrt.CrtSetReportMode(report_type,
2594                            msvcrt.CRTDBG_MODE_FILE)
2595                    old_file = msvcrt.CrtSetReportFile(report_type,
2596                            msvcrt.CRTDBG_FILE_STDERR)
2597                    self.old_modes[report_type] = old_mode, old_file
2598
2599        else:
2600            try:
2601                import resource
2602                self.resource = resource
2603            except ImportError:
2604                self.resource = None
2605            if self.resource is not None:
2606                try:
2607                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
2608                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
2609                                            (0, self.old_value[1]))
2610                except (ValueError, OSError):
2611                    pass
2612
2613            if sys.platform == 'darwin':
2614                # Check if the 'Crash Reporter' on OSX was configured
2615                # in 'Developer' mode and warn that it will get triggered
2616                # when it is.
2617                #
2618                # This assumes that this context manager is used in tests
2619                # that might trigger the next manager.
2620                cmd = ['/usr/bin/defaults', 'read',
2621                       'com.apple.CrashReporter', 'DialogType']
2622                proc = subprocess.Popen(cmd,
2623                                        stdout=subprocess.PIPE,
2624                                        stderr=subprocess.PIPE)
2625                with proc:
2626                    stdout = proc.communicate()[0]
2627                if stdout.strip() == b'developer':
2628                    print("this test triggers the Crash Reporter, "
2629                          "that is intentional", end='', flush=True)
2630
2631        return self
2632
2633    def __exit__(self, *ignore_exc):
2634        """Restore Windows ErrorMode or core file behavior to initial value."""
2635        if self.old_value is None:
2636            return
2637
2638        if sys.platform.startswith('win'):
2639            import msvcrt
2640            msvcrt.SetErrorMode(self.old_value)
2641
2642            if self.old_modes:
2643                for report_type, (old_mode, old_file) in self.old_modes.items():
2644                    msvcrt.CrtSetReportMode(report_type, old_mode)
2645                    msvcrt.CrtSetReportFile(report_type, old_file)
2646        else:
2647            if self.resource is not None:
2648                try:
2649                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
2650                except (ValueError, OSError):
2651                    pass
2652
2653
2654def patch(test_instance, object_to_patch, attr_name, new_value):
2655    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2656
2657    Also, add a cleanup procedure to 'test_instance' to restore
2658    'object_to_patch' value for 'attr_name'.
2659    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2660
2661    """
2662    # check that 'attr_name' is a real attribute for 'object_to_patch'
2663    # will raise AttributeError if it does not exist
2664    getattr(object_to_patch, attr_name)
2665
2666    # keep a copy of the old value
2667    attr_is_local = False
2668    try:
2669        old_value = object_to_patch.__dict__[attr_name]
2670    except (AttributeError, KeyError):
2671        old_value = getattr(object_to_patch, attr_name, None)
2672    else:
2673        attr_is_local = True
2674
2675    # restore the value when the test is done
2676    def cleanup():
2677        if attr_is_local:
2678            setattr(object_to_patch, attr_name, old_value)
2679        else:
2680            delattr(object_to_patch, attr_name)
2681
2682    test_instance.addCleanup(cleanup)
2683
2684    # actually override the attribute
2685    setattr(object_to_patch, attr_name, new_value)
2686
2687
2688def run_in_subinterp(code):
2689    """
2690    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2691    module is enabled.
2692    """
2693    # Issue #10915, #15751: PyGILState_*() functions don't work with
2694    # sub-interpreters, the tracemalloc module uses these functions internally
2695    try:
2696        import tracemalloc
2697    except ImportError:
2698        pass
2699    else:
2700        if tracemalloc.is_tracing():
2701            raise unittest.SkipTest("run_in_subinterp() cannot be used "
2702                                     "if tracemalloc module is tracing "
2703                                     "memory allocations")
2704    import _testcapi
2705    return _testcapi.run_in_subinterp(code)
2706
2707
2708def check_free_after_iterating(test, iter, cls, args=()):
2709    class A(cls):
2710        def __del__(self):
2711            nonlocal done
2712            done = True
2713            try:
2714                next(it)
2715            except StopIteration:
2716                pass
2717
2718    done = False
2719    it = iter(A(*args))
2720    # Issue 26494: Shouldn't crash
2721    test.assertRaises(StopIteration, next, it)
2722    # The sequence should be deallocated just after the end of iterating
2723    gc_collect()
2724    test.assertTrue(done)
2725
2726
2727def missing_compiler_executable(cmd_names=[]):
2728    """Check if the compiler components used to build the interpreter exist.
2729
2730    Check for the existence of the compiler executables whose names are listed
2731    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
2732    and return the first missing executable or None when none is found
2733    missing.
2734
2735    """
2736    from distutils import ccompiler, sysconfig, spawn, errors
2737    compiler = ccompiler.new_compiler()
2738    sysconfig.customize_compiler(compiler)
2739    if compiler.compiler_type == "msvc":
2740        # MSVC has no executables, so check whether initialization succeeds
2741        try:
2742            compiler.initialize()
2743        except errors.DistutilsPlatformError:
2744            return "msvc"
2745    for name in compiler.executables:
2746        if cmd_names and name not in cmd_names:
2747            continue
2748        cmd = getattr(compiler, name)
2749        if cmd_names:
2750            assert cmd is not None, \
2751                    "the '%s' executable is not configured" % name
2752        elif not cmd:
2753            continue
2754        if spawn.find_executable(cmd[0]) is None:
2755            return cmd[0]
2756
2757
2758_is_android_emulator = None
2759def setswitchinterval(interval):
2760    # Setting a very low gil interval on the Android emulator causes python
2761    # to hang (issue #26939).
2762    minimum_interval = 1e-5
2763    if is_android and interval < minimum_interval:
2764        global _is_android_emulator
2765        if _is_android_emulator is None:
2766            _is_android_emulator = (subprocess.check_output(
2767                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
2768        if _is_android_emulator:
2769            interval = minimum_interval
2770    return sys.setswitchinterval(interval)
2771
2772
2773@contextlib.contextmanager
2774def disable_faulthandler():
2775    import faulthandler
2776
2777    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
2778    # sys.stderr with a StringIO which has no file descriptor when a test
2779    # is run with -W/--verbose3.
2780    fd = sys.__stderr__.fileno()
2781
2782    is_enabled = faulthandler.is_enabled()
2783    try:
2784        faulthandler.disable()
2785        yield
2786    finally:
2787        if is_enabled:
2788            faulthandler.enable(file=fd, all_threads=True)
2789
2790
2791def fd_count():
2792    """Count the number of open file descriptors.
2793    """
2794    if sys.platform.startswith(('linux', 'freebsd')):
2795        try:
2796            names = os.listdir("/proc/self/fd")
2797            # Subtract one because listdir() internally opens a file
2798            # descriptor to list the content of the /proc/self/fd/ directory.
2799            return len(names) - 1
2800        except FileNotFoundError:
2801            pass
2802
2803    MAXFD = 256
2804    if hasattr(os, 'sysconf'):
2805        try:
2806            MAXFD = os.sysconf("SC_OPEN_MAX")
2807        except OSError:
2808            pass
2809
2810    old_modes = None
2811    if sys.platform == 'win32':
2812        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
2813        # on invalid file descriptor if Python is compiled in debug mode
2814        try:
2815            import msvcrt
2816            msvcrt.CrtSetReportMode
2817        except (AttributeError, ImportError):
2818            # no msvcrt or a release build
2819            pass
2820        else:
2821            old_modes = {}
2822            for report_type in (msvcrt.CRT_WARN,
2823                                msvcrt.CRT_ERROR,
2824                                msvcrt.CRT_ASSERT):
2825                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
2826
2827    try:
2828        count = 0
2829        for fd in range(MAXFD):
2830            try:
2831                # Prefer dup() over fstat(). fstat() can require input/output
2832                # whereas dup() doesn't.
2833                fd2 = os.dup(fd)
2834            except OSError as e:
2835                if e.errno != errno.EBADF:
2836                    raise
2837            else:
2838                os.close(fd2)
2839                count += 1
2840    finally:
2841        if old_modes is not None:
2842            for report_type in (msvcrt.CRT_WARN,
2843                                msvcrt.CRT_ERROR,
2844                                msvcrt.CRT_ASSERT):
2845                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
2846
2847    return count
2848
2849
2850class SaveSignals:
2851    """
2852    Save and restore signal handlers.
2853
2854    This class is only able to save/restore signal handlers registered
2855    by the Python signal module: see bpo-13285 for "external" signal
2856    handlers.
2857    """
2858
2859    def __init__(self):
2860        import signal
2861        self.signal = signal
2862        self.signals = signal.valid_signals()
2863        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
2864        for signame in ('SIGKILL', 'SIGSTOP'):
2865            try:
2866                signum = getattr(signal, signame)
2867            except AttributeError:
2868                continue
2869            self.signals.remove(signum)
2870        self.handlers = {}
2871
2872    def save(self):
2873        for signum in self.signals:
2874            handler = self.signal.getsignal(signum)
2875            if handler is None:
2876                # getsignal() returns None if a signal handler was not
2877                # registered by the Python signal module,
2878                # and the handler is not SIG_DFL nor SIG_IGN.
2879                #
2880                # Ignore the signal: we cannot restore the handler.
2881                continue
2882            self.handlers[signum] = handler
2883
2884    def restore(self):
2885        for signum, handler in self.handlers.items():
2886            self.signal.signal(signum, handler)
2887
2888
2889def with_pymalloc():
2890    import _testcapi
2891    return _testcapi.WITH_PYMALLOC
2892
2893
2894class FakePath:
2895    """Simple implementing of the path protocol.
2896    """
2897    def __init__(self, path):
2898        self.path = path
2899
2900    def __repr__(self):
2901        return f'<FakePath {self.path!r}>'
2902
2903    def __fspath__(self):
2904        if (isinstance(self.path, BaseException) or
2905            isinstance(self.path, type) and
2906                issubclass(self.path, BaseException)):
2907            raise self.path
2908        else:
2909            return self.path
2910
2911
2912class _ALWAYS_EQ:
2913    """
2914    Object that is equal to anything.
2915    """
2916    def __eq__(self, other):
2917        return True
2918    def __ne__(self, other):
2919        return False
2920
2921ALWAYS_EQ = _ALWAYS_EQ()
2922
2923class _NEVER_EQ:
2924    """
2925    Object that is not equal to anything.
2926    """
2927    def __eq__(self, other):
2928        return False
2929    def __ne__(self, other):
2930        return True
2931    def __hash__(self):
2932        return 1
2933
2934NEVER_EQ = _NEVER_EQ()
2935
2936@functools.total_ordering
2937class _LARGEST:
2938    """
2939    Object that is greater than anything (except itself).
2940    """
2941    def __eq__(self, other):
2942        return isinstance(other, _LARGEST)
2943    def __lt__(self, other):
2944        return False
2945
2946LARGEST = _LARGEST()
2947
2948@functools.total_ordering
2949class _SMALLEST:
2950    """
2951    Object that is less than anything (except itself).
2952    """
2953    def __eq__(self, other):
2954        return isinstance(other, _SMALLEST)
2955    def __gt__(self, other):
2956        return False
2957
2958SMALLEST = _SMALLEST()
2959
2960def maybe_get_event_loop_policy():
2961    """Return the global event loop policy if one is set, else return None."""
2962    import asyncio.events
2963    return asyncio.events._event_loop_policy
2964
2965# Helpers for testing hashing.
2966NHASHBITS = sys.hash_info.width # number of bits in hash() result
2967assert NHASHBITS in (32, 64)
2968
2969# Return mean and sdev of number of collisions when tossing nballs balls
2970# uniformly at random into nbins bins.  By definition, the number of
2971# collisions is the number of balls minus the number of occupied bins at
2972# the end.
2973def collision_stats(nbins, nballs):
2974    n, k = nbins, nballs
2975    # prob a bin empty after k trials = (1 - 1/n)**k
2976    # mean # empty is then n * (1 - 1/n)**k
2977    # so mean # occupied is n - n * (1 - 1/n)**k
2978    # so collisions = k - (n - n*(1 - 1/n)**k)
2979    #
2980    # For the variance:
2981    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
2982    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
2983    #
2984    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
2985    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
2986    # error-prone) efforts to rework the naive formulas to avoid those,
2987    # we use the `decimal` module to get plenty of extra precision.
2988    #
2989    # Note:  the exact values are straightforward to compute with
2990    # rationals, but in context that's unbearably slow, requiring
2991    # multi-million bit arithmetic.
2992    import decimal
2993    with decimal.localcontext() as ctx:
2994        bits = n.bit_length() * 2  # bits in n**2
2995        # At least that many bits will likely cancel out.
2996        # Use that many decimal digits instead.
2997        ctx.prec = max(bits, 30)
2998        dn = decimal.Decimal(n)
2999        p1empty = ((dn - 1) / dn) ** k
3000        meanempty = n * p1empty
3001        occupied = n - meanempty
3002        collisions = k - occupied
3003        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
3004        return float(collisions), float(var.sqrt())
3005
3006
3007class catch_unraisable_exception:
3008    """
3009    Context manager catching unraisable exception using sys.unraisablehook.
3010
3011    Storing the exception value (cm.unraisable.exc_value) creates a reference
3012    cycle. The reference cycle is broken explicitly when the context manager
3013    exits.
3014
3015    Storing the object (cm.unraisable.object) can resurrect it if it is set to
3016    an object which is being finalized. Exiting the context manager clears the
3017    stored object.
3018
3019    Usage:
3020
3021        with support.catch_unraisable_exception() as cm:
3022            # code creating an "unraisable exception"
3023            ...
3024
3025            # check the unraisable exception: use cm.unraisable
3026            ...
3027
3028        # cm.unraisable attribute no longer exists at this point
3029        # (to break a reference cycle)
3030    """
3031
3032    def __init__(self):
3033        self.unraisable = None
3034        self._old_hook = None
3035
3036    def _hook(self, unraisable):
3037        # Storing unraisable.object can resurrect an object which is being
3038        # finalized. Storing unraisable.exc_value creates a reference cycle.
3039        self.unraisable = unraisable
3040
3041    def __enter__(self):
3042        self._old_hook = sys.unraisablehook
3043        sys.unraisablehook = self._hook
3044        return self
3045
3046    def __exit__(self, *exc_info):
3047        sys.unraisablehook = self._old_hook
3048        del self.unraisable
3049
3050
3051class catch_threading_exception:
3052    """
3053    Context manager catching threading.Thread exception using
3054    threading.excepthook.
3055
3056    Attributes set when an exception is catched:
3057
3058    * exc_type
3059    * exc_value
3060    * exc_traceback
3061    * thread
3062
3063    See threading.excepthook() documentation for these attributes.
3064
3065    These attributes are deleted at the context manager exit.
3066
3067    Usage:
3068
3069        with support.catch_threading_exception() as cm:
3070            # code spawning a thread which raises an exception
3071            ...
3072
3073            # check the thread exception, use cm attributes:
3074            # exc_type, exc_value, exc_traceback, thread
3075            ...
3076
3077        # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
3078        # exists at this point
3079        # (to avoid reference cycles)
3080    """
3081
3082    def __init__(self):
3083        self.exc_type = None
3084        self.exc_value = None
3085        self.exc_traceback = None
3086        self.thread = None
3087        self._old_hook = None
3088
3089    def _hook(self, args):
3090        self.exc_type = args.exc_type
3091        self.exc_value = args.exc_value
3092        self.exc_traceback = args.exc_traceback
3093        self.thread = args.thread
3094
3095    def __enter__(self):
3096        self._old_hook = threading.excepthook
3097        threading.excepthook = self._hook
3098        return self
3099
3100    def __exit__(self, *exc_info):
3101        threading.excepthook = self._old_hook
3102        del self.exc_type
3103        del self.exc_value
3104        del self.exc_traceback
3105        del self.thread
3106
3107
3108def wait_process(pid, *, exitcode, timeout=None):
3109    """
3110    Wait until process pid completes and check that the process exit code is
3111    exitcode.
3112
3113    Raise an AssertionError if the process exit code is not equal to exitcode.
3114
3115    If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
3116    kill the process (if signal.SIGKILL is available) and raise an
3117    AssertionError. The timeout feature is not available on Windows.
3118    """
3119    if os.name != "nt":
3120        import signal
3121
3122        if timeout is None:
3123            timeout = SHORT_TIMEOUT
3124        t0 = time.monotonic()
3125        sleep = 0.001
3126        max_sleep = 0.1
3127        while True:
3128            pid2, status = os.waitpid(pid, os.WNOHANG)
3129            if pid2 != 0:
3130                break
3131            # process is still running
3132
3133            dt = time.monotonic() - t0
3134            if dt > SHORT_TIMEOUT:
3135                try:
3136                    os.kill(pid, signal.SIGKILL)
3137                    os.waitpid(pid, 0)
3138                except OSError:
3139                    # Ignore errors like ChildProcessError or PermissionError
3140                    pass
3141
3142                raise AssertionError(f"process {pid} is still running "
3143                                     f"after {dt:.1f} seconds")
3144
3145            sleep = min(sleep * 2, max_sleep)
3146            time.sleep(sleep)
3147    else:
3148        # Windows implementation
3149        pid2, status = os.waitpid(pid, 0)
3150
3151    exitcode2 = os.waitstatus_to_exitcode(status)
3152    if exitcode2 != exitcode:
3153        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
3154                             f"but exit code {exitcode} is expected")
3155
3156    # sanity check: it should not fail in practice
3157    if pid2 != pid:
3158        raise AssertionError(f"pid {pid2} != pid {pid}")
3159
3160
3161def use_old_parser():
3162    import _testinternalcapi
3163    config = _testinternalcapi.get_configs()
3164    return (config['config']['_use_peg_parser'] == 0)
3165
3166
3167def skip_if_new_parser(msg):
3168    return unittest.skipIf(not use_old_parser(), msg)
3169
3170
3171@contextlib.contextmanager
3172def save_restore_warnings_filters():
3173    old_filters = warnings.filters[:]
3174    try:
3175        yield
3176    finally:
3177        warnings.filters[:] = old_filters
3178
3179
3180def skip_if_broken_multiprocessing_synchronize():
3181    """
3182    Skip tests if the multiprocessing.synchronize module is missing, if there
3183    is no available semaphore implementation, or if creating a lock raises an
3184    OSError (on Linux only).
3185    """
3186
3187    # Skip tests if the _multiprocessing extension is missing.
3188    import_module('_multiprocessing')
3189
3190    # Skip tests if there is no available semaphore implementation:
3191    # multiprocessing.synchronize requires _multiprocessing.SemLock.
3192    synchronize = import_module('multiprocessing.synchronize')
3193
3194    if sys.platform == "linux":
3195        try:
3196            # bpo-38377: On Linux, creating a semaphore fails with OSError
3197            # if the current user does not have the permission to create
3198            # a file in /dev/shm/ directory.
3199            synchronize.Lock(ctx=None)
3200        except OSError as exc:
3201            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
3202