• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import asyncio.events
7import collections.abc
8import contextlib
9import errno
10import faulthandler
11import fnmatch
12import functools
13import gc
14import glob
15import hashlib
16import importlib
17import importlib.util
18import locale
19import logging.handlers
20import nntplib
21import os
22import platform
23import re
24import shutil
25import socket
26import stat
27import struct
28import subprocess
29import sys
30import sysconfig
31import tempfile
32import _thread
33import threading
34import time
35import types
36import unittest
37import urllib.error
38import warnings
39
40from .testresult import get_test_runner
41
42try:
43    import multiprocessing.process
44except ImportError:
45    multiprocessing = None
46
47try:
48    import zlib
49except ImportError:
50    zlib = None
51
52try:
53    import gzip
54except ImportError:
55    gzip = None
56
57try:
58    import bz2
59except ImportError:
60    bz2 = None
61
62try:
63    import lzma
64except ImportError:
65    lzma = None
66
67try:
68    import resource
69except ImportError:
70    resource = None
71
72try:
73    import _hashlib
74except ImportError:
75    _hashlib = None
76
77__all__ = [
78    # globals
79    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
80    # exceptions
81    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
82    # imports
83    "import_module", "import_fresh_module", "CleanImport",
84    # modules
85    "unload", "forget",
86    # io
87    "record_original_stdout", "get_original_stdout", "captured_stdout",
88    "captured_stdin", "captured_stderr",
89    # filesystem
90    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
91    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
92    # unittest
93    "is_resource_enabled", "requires", "requires_freebsd_version",
94    "requires_linux_version", "requires_mac_ver", "requires_hashdigest",
95    "check_syntax_error", "check_syntax_warning",
96    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
97    "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
98    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
99    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
100    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
101    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
102    "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime",
103    "ignore_warnings",
104    # sys
105    "is_jython", "is_android", "check_impl_detail", "unix_shell",
106    "setswitchinterval",
107    # network
108    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
109    "bind_unix_socket",
110    # processes
111    'temp_umask', "reap_children",
112    # logging
113    "TestHandler",
114    # threads
115    "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
116    # miscellaneous
117    "check_warnings", "check_no_resource_warning", "check_no_warnings",
118    "EnvironmentVarGuard",
119    "run_with_locale", "swap_item",
120    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
121    "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
122    "ALWAYS_EQ", "LARGEST", "SMALLEST"
123    ]
124
125class Error(Exception):
126    """Base class for regression test exceptions."""
127
128class TestFailed(Error):
129    """Test failed."""
130
131class TestDidNotRun(Error):
132    """Test did not run any subtests."""
133
134class ResourceDenied(unittest.SkipTest):
135    """Test skipped because it requested a disallowed resource.
136
137    This is raised when a test calls requires() for a resource that
138    has not be enabled.  It is used to distinguish between expected
139    and unexpected skips.
140    """
141
142@contextlib.contextmanager
143def _ignore_deprecated_imports(ignore=True):
144    """Context manager to suppress package and module deprecation
145    warnings when importing them.
146
147    If ignore is False, this context manager has no effect.
148    """
149    if ignore:
150        with warnings.catch_warnings():
151            warnings.filterwarnings("ignore", ".+ (module|package)",
152                                    DeprecationWarning)
153            yield
154    else:
155        yield
156
157
158def ignore_warnings(*, category):
159    """Decorator to suppress deprecation warnings.
160
161    Use of context managers to hide warnings make diffs
162    more noisy and tools like 'git blame' less useful.
163    """
164    def decorator(test):
165        @functools.wraps(test)
166        def wrapper(self, *args, **kwargs):
167            with warnings.catch_warnings():
168                warnings.simplefilter('ignore', category=category)
169                return test(self, *args, **kwargs)
170        return wrapper
171    return decorator
172
173
174def import_module(name, deprecated=False, *, required_on=()):
175    """Import and return the module to be tested, raising SkipTest if
176    it is not available.
177
178    If deprecated is True, any module or package deprecation messages
179    will be suppressed. If a module is required on a platform but optional for
180    others, set required_on to an iterable of platform prefixes which will be
181    compared against sys.platform.
182    """
183    with _ignore_deprecated_imports(deprecated):
184        try:
185            return importlib.import_module(name)
186        except ImportError as msg:
187            if sys.platform.startswith(tuple(required_on)):
188                raise
189            raise unittest.SkipTest(str(msg))
190
191
192def _save_and_remove_module(name, orig_modules):
193    """Helper function to save and remove a module from sys.modules
194
195    Raise ImportError if the module can't be imported.
196    """
197    # try to import the module and raise an error if it can't be imported
198    if name not in sys.modules:
199        __import__(name)
200        del sys.modules[name]
201    for modname in list(sys.modules):
202        if modname == name or modname.startswith(name + '.'):
203            orig_modules[modname] = sys.modules[modname]
204            del sys.modules[modname]
205
206def _save_and_block_module(name, orig_modules):
207    """Helper function to save and block a module in sys.modules
208
209    Return True if the module was in sys.modules, False otherwise.
210    """
211    saved = True
212    try:
213        orig_modules[name] = sys.modules[name]
214    except KeyError:
215        saved = False
216    sys.modules[name] = None
217    return saved
218
219
220def anticipate_failure(condition):
221    """Decorator to mark a test that is known to be broken in some cases
222
223       Any use of this decorator should have a comment identifying the
224       associated tracker issue.
225    """
226    if condition:
227        return unittest.expectedFailure
228    return lambda f: f
229
230def load_package_tests(pkg_dir, loader, standard_tests, pattern):
231    """Generic load_tests implementation for simple test packages.
232
233    Most packages can implement load_tests using this function as follows:
234
235       def load_tests(*args):
236           return load_package_tests(os.path.dirname(__file__), *args)
237    """
238    if pattern is None:
239        pattern = "test*"
240    top_dir = os.path.dirname(              # Lib
241                  os.path.dirname(              # test
242                      os.path.dirname(__file__)))   # support
243    package_tests = loader.discover(start_dir=pkg_dir,
244                                    top_level_dir=top_dir,
245                                    pattern=pattern)
246    standard_tests.addTests(package_tests)
247    return standard_tests
248
249
250def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
251    """Import and return a module, deliberately bypassing sys.modules.
252
253    This function imports and returns a fresh copy of the named Python module
254    by removing the named module from sys.modules before doing the import.
255    Note that unlike reload, the original module is not affected by
256    this operation.
257
258    *fresh* is an iterable of additional module names that are also removed
259    from the sys.modules cache before doing the import.
260
261    *blocked* is an iterable of module names that are replaced with None
262    in the module cache during the import to ensure that attempts to import
263    them raise ImportError.
264
265    The named module and any modules named in the *fresh* and *blocked*
266    parameters are saved before starting the import and then reinserted into
267    sys.modules when the fresh import is complete.
268
269    Module and package deprecation messages are suppressed during this import
270    if *deprecated* is True.
271
272    This function will raise ImportError if the named module cannot be
273    imported.
274    """
275    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
276    # to make sure that this utility function is working as expected
277    with _ignore_deprecated_imports(deprecated):
278        # Keep track of modules saved for later restoration as well
279        # as those which just need a blocking entry removed
280        orig_modules = {}
281        names_to_remove = []
282        _save_and_remove_module(name, orig_modules)
283        try:
284            for fresh_name in fresh:
285                _save_and_remove_module(fresh_name, orig_modules)
286            for blocked_name in blocked:
287                if not _save_and_block_module(blocked_name, orig_modules):
288                    names_to_remove.append(blocked_name)
289            fresh_module = importlib.import_module(name)
290        except ImportError:
291            fresh_module = None
292        finally:
293            for orig_name, module in orig_modules.items():
294                sys.modules[orig_name] = module
295            for name_to_remove in names_to_remove:
296                del sys.modules[name_to_remove]
297        return fresh_module
298
299
300def get_attribute(obj, name):
301    """Get an attribute, raising SkipTest if AttributeError is raised."""
302    try:
303        attribute = getattr(obj, name)
304    except AttributeError:
305        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
306    else:
307        return attribute
308
309verbose = 1              # Flag set to 0 by regrtest.py
310use_resources = None     # Flag set to [] by regrtest.py
311max_memuse = 0           # Disable bigmem tests (they will still be run with
312                         # small sizes, to make sure they work.)
313real_max_memuse = 0
314junit_xml_list = None    # list of testsuite XML elements
315failfast = False
316
317# _original_stdout is meant to hold stdout at the time regrtest began.
318# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
319# The point is to have some flavor of stdout the user can actually see.
320_original_stdout = None
321def record_original_stdout(stdout):
322    global _original_stdout
323    _original_stdout = stdout
324
325def get_original_stdout():
326    return _original_stdout or sys.stdout
327
328def unload(name):
329    try:
330        del sys.modules[name]
331    except KeyError:
332        pass
333
334def _force_run(path, func, *args):
335    try:
336        return func(*args)
337    except OSError as err:
338        if verbose >= 2:
339            print('%s: %s' % (err.__class__.__name__, err))
340            print('re-run %s%r' % (func.__name__, args))
341        os.chmod(path, stat.S_IRWXU)
342        return func(*args)
343
344if sys.platform.startswith("win"):
345    def _waitfor(func, pathname, waitall=False):
346        # Perform the operation
347        func(pathname)
348        # Now setup the wait loop
349        if waitall:
350            dirname = pathname
351        else:
352            dirname, name = os.path.split(pathname)
353            dirname = dirname or '.'
354        # Check for `pathname` to be removed from the filesystem.
355        # The exponential backoff of the timeout amounts to a total
356        # of ~1 second after which the deletion is probably an error
357        # anyway.
358        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
359        # required when contention occurs.
360        timeout = 0.001
361        while timeout < 1.0:
362            # Note we are only testing for the existence of the file(s) in
363            # the contents of the directory regardless of any security or
364            # access rights.  If we have made it this far, we have sufficient
365            # permissions to do that much using Python's equivalent of the
366            # Windows API FindFirstFile.
367            # Other Windows APIs can fail or give incorrect results when
368            # dealing with files that are pending deletion.
369            L = os.listdir(dirname)
370            if not (L if waitall else name in L):
371                return
372            # Increase the timeout and try again
373            time.sleep(timeout)
374            timeout *= 2
375        warnings.warn('tests may fail, delete still pending for ' + pathname,
376                      RuntimeWarning, stacklevel=4)
377
378    def _unlink(filename):
379        _waitfor(os.unlink, filename)
380
381    def _rmdir(dirname):
382        _waitfor(os.rmdir, dirname)
383
384    def _rmtree(path):
385        def _rmtree_inner(path):
386            for name in _force_run(path, os.listdir, path):
387                fullname = os.path.join(path, name)
388                try:
389                    mode = os.lstat(fullname).st_mode
390                except OSError as exc:
391                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
392                          file=sys.__stderr__)
393                    mode = 0
394                if stat.S_ISDIR(mode):
395                    _waitfor(_rmtree_inner, fullname, waitall=True)
396                    _force_run(fullname, os.rmdir, fullname)
397                else:
398                    _force_run(fullname, os.unlink, fullname)
399        _waitfor(_rmtree_inner, path, waitall=True)
400        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
401
402    def _longpath(path):
403        try:
404            import ctypes
405        except ImportError:
406            # No ctypes means we can't expands paths.
407            pass
408        else:
409            buffer = ctypes.create_unicode_buffer(len(path) * 2)
410            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
411                                                             len(buffer))
412            if length:
413                return buffer[:length]
414        return path
415else:
416    _unlink = os.unlink
417    _rmdir = os.rmdir
418
419    def _rmtree(path):
420        try:
421            shutil.rmtree(path)
422            return
423        except OSError:
424            pass
425
426        def _rmtree_inner(path):
427            for name in _force_run(path, os.listdir, path):
428                fullname = os.path.join(path, name)
429                try:
430                    mode = os.lstat(fullname).st_mode
431                except OSError:
432                    mode = 0
433                if stat.S_ISDIR(mode):
434                    _rmtree_inner(fullname)
435                    _force_run(path, os.rmdir, fullname)
436                else:
437                    _force_run(path, os.unlink, fullname)
438        _rmtree_inner(path)
439        os.rmdir(path)
440
441    def _longpath(path):
442        return path
443
444def unlink(filename):
445    try:
446        _unlink(filename)
447    except (FileNotFoundError, NotADirectoryError):
448        pass
449
450def rmdir(dirname):
451    try:
452        _rmdir(dirname)
453    except FileNotFoundError:
454        pass
455
456def rmtree(path):
457    try:
458        _rmtree(path)
459    except FileNotFoundError:
460        pass
461
462def make_legacy_pyc(source):
463    """Move a PEP 3147/488 pyc file to its legacy pyc location.
464
465    :param source: The file system path to the source file.  The source file
466        does not need to exist, however the PEP 3147/488 pyc file must exist.
467    :return: The file system path to the legacy pyc file.
468    """
469    pyc_file = importlib.util.cache_from_source(source)
470    up_one = os.path.dirname(os.path.abspath(source))
471    legacy_pyc = os.path.join(up_one, source + 'c')
472    os.rename(pyc_file, legacy_pyc)
473    return legacy_pyc
474
475def forget(modname):
476    """'Forget' a module was ever imported.
477
478    This removes the module from sys.modules and deletes any PEP 3147/488 or
479    legacy .pyc files.
480    """
481    unload(modname)
482    for dirname in sys.path:
483        source = os.path.join(dirname, modname + '.py')
484        # It doesn't matter if they exist or not, unlink all possible
485        # combinations of PEP 3147/488 and legacy pyc files.
486        unlink(source + 'c')
487        for opt in ('', 1, 2):
488            unlink(importlib.util.cache_from_source(source, optimization=opt))
489
490# Check whether a gui is actually available
491def _is_gui_available():
492    if hasattr(_is_gui_available, 'result'):
493        return _is_gui_available.result
494    reason = None
495    if sys.platform.startswith('win'):
496        # if Python is running as a service (such as the buildbot service),
497        # gui interaction may be disallowed
498        import ctypes
499        import ctypes.wintypes
500        UOI_FLAGS = 1
501        WSF_VISIBLE = 0x0001
502        class USEROBJECTFLAGS(ctypes.Structure):
503            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
504                        ("fReserved", ctypes.wintypes.BOOL),
505                        ("dwFlags", ctypes.wintypes.DWORD)]
506        dll = ctypes.windll.user32
507        h = dll.GetProcessWindowStation()
508        if not h:
509            raise ctypes.WinError()
510        uof = USEROBJECTFLAGS()
511        needed = ctypes.wintypes.DWORD()
512        res = dll.GetUserObjectInformationW(h,
513            UOI_FLAGS,
514            ctypes.byref(uof),
515            ctypes.sizeof(uof),
516            ctypes.byref(needed))
517        if not res:
518            raise ctypes.WinError()
519        if not bool(uof.dwFlags & WSF_VISIBLE):
520            reason = "gui not available (WSF_VISIBLE flag not set)"
521    elif sys.platform == 'darwin':
522        # The Aqua Tk implementations on OS X can abort the process if
523        # being called in an environment where a window server connection
524        # cannot be made, for instance when invoked by a buildbot or ssh
525        # process not running under the same user id as the current console
526        # user.  To avoid that, raise an exception if the window manager
527        # connection is not available.
528        from ctypes import cdll, c_int, pointer, Structure
529        from ctypes.util import find_library
530
531        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
532
533        if app_services.CGMainDisplayID() == 0:
534            reason = "gui tests cannot run without OS X window manager"
535        else:
536            class ProcessSerialNumber(Structure):
537                _fields_ = [("highLongOfPSN", c_int),
538                            ("lowLongOfPSN", c_int)]
539            psn = ProcessSerialNumber()
540            psn_p = pointer(psn)
541            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
542                  (app_services.SetFrontProcess(psn_p) < 0) ):
543                reason = "cannot run without OS X gui process"
544
545    # check on every platform whether tkinter can actually do anything
546    if not reason:
547        try:
548            from tkinter import Tk
549            root = Tk()
550            root.withdraw()
551            root.update()
552            root.destroy()
553        except Exception as e:
554            err_string = str(e)
555            if len(err_string) > 50:
556                err_string = err_string[:50] + ' [...]'
557            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
558                                                           err_string)
559
560    _is_gui_available.reason = reason
561    _is_gui_available.result = not reason
562
563    return _is_gui_available.result
564
565def is_resource_enabled(resource):
566    """Test whether a resource is enabled.
567
568    Known resources are set by regrtest.py.  If not running under regrtest.py,
569    all resources are assumed enabled unless use_resources has been set.
570    """
571    return use_resources is None or resource in use_resources
572
573def requires(resource, msg=None):
574    """Raise ResourceDenied if the specified resource is not available."""
575    if not is_resource_enabled(resource):
576        if msg is None:
577            msg = "Use of the %r resource not enabled" % resource
578        raise ResourceDenied(msg)
579    if resource == 'gui' and not _is_gui_available():
580        raise ResourceDenied(_is_gui_available.reason)
581
582def _requires_unix_version(sysname, min_version):
583    """Decorator raising SkipTest if the OS is `sysname` and the version is less
584    than `min_version`.
585
586    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
587    the FreeBSD version is less than 7.2.
588    """
589    def decorator(func):
590        @functools.wraps(func)
591        def wrapper(*args, **kw):
592            if platform.system() == sysname:
593                version_txt = platform.release().split('-', 1)[0]
594                try:
595                    version = tuple(map(int, version_txt.split('.')))
596                except ValueError:
597                    pass
598                else:
599                    if version < min_version:
600                        min_version_txt = '.'.join(map(str, min_version))
601                        raise unittest.SkipTest(
602                            "%s version %s or higher required, not %s"
603                            % (sysname, min_version_txt, version_txt))
604            return func(*args, **kw)
605        wrapper.min_version = min_version
606        return wrapper
607    return decorator
608
609def requires_freebsd_version(*min_version):
610    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
611    less than `min_version`.
612
613    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
614    version is less than 7.2.
615    """
616    return _requires_unix_version('FreeBSD', min_version)
617
618def requires_linux_version(*min_version):
619    """Decorator raising SkipTest if the OS is Linux and the Linux version is
620    less than `min_version`.
621
622    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
623    version is less than 2.6.32.
624    """
625    return _requires_unix_version('Linux', min_version)
626
627def requires_mac_ver(*min_version):
628    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
629    version if less than min_version.
630
631    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
632    is lesser than 10.5.
633    """
634    def decorator(func):
635        @functools.wraps(func)
636        def wrapper(*args, **kw):
637            if sys.platform == 'darwin':
638                version_txt = platform.mac_ver()[0]
639                try:
640                    version = tuple(map(int, version_txt.split('.')))
641                except ValueError:
642                    pass
643                else:
644                    if version < min_version:
645                        min_version_txt = '.'.join(map(str, min_version))
646                        raise unittest.SkipTest(
647                            "Mac OS X %s or higher required, not %s"
648                            % (min_version_txt, version_txt))
649            return func(*args, **kw)
650        wrapper.min_version = min_version
651        return wrapper
652    return decorator
653
654
655def requires_hashdigest(digestname, openssl=None):
656    """Decorator raising SkipTest if a hashing algorithm is not available
657
658    The hashing algorithm could be missing or blocked by a strict crypto
659    policy.
660
661    If 'openssl' is True, then the decorator checks that OpenSSL provides
662    the algorithm. Otherwise the check falls back to built-in
663    implementations.
664
665    ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS
666    ValueError: unsupported hash type md4
667    """
668    def decorator(func):
669        @functools.wraps(func)
670        def wrapper(*args, **kwargs):
671            try:
672                if openssl and _hashlib is not None:
673                    _hashlib.new(digestname)
674                else:
675                    hashlib.new(digestname)
676            except ValueError:
677                raise unittest.SkipTest(
678                    f"hash digest '{digestname}' is not available."
679                )
680            return func(*args, **kwargs)
681        return wrapper
682    return decorator
683
684
685HOST = "localhost"
686HOSTv4 = "127.0.0.1"
687HOSTv6 = "::1"
688
689
690def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
691    """Returns an unused port that should be suitable for binding.  This is
692    achieved by creating a temporary socket with the same family and type as
693    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
694    the specified host address (defaults to 0.0.0.0) with the port set to 0,
695    eliciting an unused ephemeral port from the OS.  The temporary socket is
696    then closed and deleted, and the ephemeral port is returned.
697
698    Either this method or bind_port() should be used for any tests where a
699    server socket needs to be bound to a particular port for the duration of
700    the test.  Which one to use depends on whether the calling code is creating
701    a python socket, or if an unused port needs to be provided in a constructor
702    or passed to an external program (i.e. the -accept argument to openssl's
703    s_server mode).  Always prefer bind_port() over find_unused_port() where
704    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
705    socket is bound to a hard coded port, the ability to run multiple instances
706    of the test simultaneously on the same host is compromised, which makes the
707    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
708    may simply manifest as a failed test, which can be recovered from without
709    intervention in most cases, but on Windows, the entire python process can
710    completely and utterly wedge, requiring someone to log in to the buildbot
711    and manually kill the affected process.
712
713    (This is easy to reproduce on Windows, unfortunately, and can be traced to
714    the SO_REUSEADDR socket option having different semantics on Windows versus
715    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
716    listen and then accept connections on identical host/ports.  An EADDRINUSE
717    OSError will be raised at some point (depending on the platform and
718    the order bind and listen were called on each socket).
719
720    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
721    will ever be raised when attempting to bind two identical host/ports. When
722    accept() is called on each socket, the second caller's process will steal
723    the port from the first caller, leaving them both in an awkwardly wedged
724    state where they'll no longer respond to any signals or graceful kills, and
725    must be forcibly killed via OpenProcess()/TerminateProcess().
726
727    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
728    instead of SO_REUSEADDR, which effectively affords the same semantics as
729    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
730    Source world compared to Windows ones, this is a common mistake.  A quick
731    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
732    openssl.exe is called with the 's_server' option, for example. See
733    http://bugs.python.org/issue2550 for more info.  The following site also
734    has a very thorough description about the implications of both REUSEADDR
735    and EXCLUSIVEADDRUSE on Windows:
736    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
737
738    XXX: although this approach is a vast improvement on previous attempts to
739    elicit unused ports, it rests heavily on the assumption that the ephemeral
740    port returned to us by the OS won't immediately be dished back out to some
741    other process when we close and delete our temporary socket but before our
742    calling code has a chance to bind the returned port.  We can deal with this
743    issue if/when we come across it.
744    """
745
746    with socket.socket(family, socktype) as tempsock:
747        port = bind_port(tempsock)
748    del tempsock
749    return port
750
751def bind_port(sock, host=HOST):
752    """Bind the socket to a free port and return the port number.  Relies on
753    ephemeral ports in order to ensure we are using an unbound port.  This is
754    important as many tests may be running simultaneously, especially in a
755    buildbot environment.  This method raises an exception if the sock.family
756    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
757    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
758    for TCP/IP sockets.  The only case for setting these options is testing
759    multicasting via multiple UDP sockets.
760
761    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
762    on Windows), it will be set on the socket.  This will prevent anyone else
763    from bind()'ing to our host/port for the duration of the test.
764    """
765
766    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
767        if hasattr(socket, 'SO_REUSEADDR'):
768            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
769                raise TestFailed("tests should never set the SO_REUSEADDR "   \
770                                 "socket option on TCP/IP sockets!")
771        if hasattr(socket, 'SO_REUSEPORT'):
772            try:
773                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
774                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
775                                     "socket option on TCP/IP sockets!")
776            except OSError:
777                # Python's socket module was compiled using modern headers
778                # thus defining SO_REUSEPORT but this process is running
779                # under an older kernel that does not support SO_REUSEPORT.
780                pass
781        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
782            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
783
784    sock.bind((host, 0))
785    port = sock.getsockname()[1]
786    return port
787
788def bind_unix_socket(sock, addr):
789    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
790    assert sock.family == socket.AF_UNIX
791    try:
792        sock.bind(addr)
793    except PermissionError:
794        sock.close()
795        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
796
797def _is_ipv6_enabled():
798    """Check whether IPv6 is enabled on this host."""
799    if socket.has_ipv6:
800        sock = None
801        try:
802            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
803            sock.bind((HOSTv6, 0))
804            return True
805        except OSError:
806            pass
807        finally:
808            if sock:
809                sock.close()
810    return False
811
812IPV6_ENABLED = _is_ipv6_enabled()
813
814def system_must_validate_cert(f):
815    """Skip the test on TLS certificate validation failures."""
816    @functools.wraps(f)
817    def dec(*args, **kwargs):
818        try:
819            f(*args, **kwargs)
820        except OSError as e:
821            if "CERTIFICATE_VERIFY_FAILED" in str(e):
822                raise unittest.SkipTest("system does not contain "
823                                        "necessary certificates")
824            raise
825    return dec
826
827# A constant likely larger than the underlying OS pipe buffer size, to
828# make writes blocking.
829# Windows limit seems to be around 512 B, and many Unix kernels have a
830# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
831# (see issue #17835 for a discussion of this number).
832PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
833
834# A constant likely larger than the underlying OS socket buffer size, to make
835# writes blocking.
836# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
837# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
838# for a discussion of this number).
839SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
840
841# decorator for skipping tests on non-IEEE 754 platforms
842requires_IEEE_754 = unittest.skipUnless(
843    float.__getformat__("double").startswith("IEEE"),
844    "test requires IEEE 754 doubles")
845
846requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
847
848requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
849
850requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
851
852requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
853
854is_jython = sys.platform.startswith('java')
855
856is_android = hasattr(sys, 'getandroidapilevel')
857
858if sys.platform != 'win32':
859    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
860else:
861    unix_shell = None
862
863# Filename used for testing
864if os.name == 'java':
865    # Jython disallows @ in module names
866    TESTFN = '$test'
867else:
868    TESTFN = '@test'
869
870# Disambiguate TESTFN for parallel testing, while letting it remain a valid
871# module name.
872TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
873
874# Define the URL of a dedicated HTTP server for the network tests.
875# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
876TEST_HTTP_URL = "http://www.pythontest.net"
877
878# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
879# or None if there is no such character.
880FS_NONASCII = None
881for character in (
882    # First try printable and common characters to have a readable filename.
883    # For each character, the encoding list are just example of encodings able
884    # to encode the character (the list is not exhaustive).
885
886    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
887    '\u00E6',
888    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
889    '\u0130',
890    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
891    '\u0141',
892    # U+03C6 (Greek Small Letter Phi): cp1253
893    '\u03C6',
894    # U+041A (Cyrillic Capital Letter Ka): cp1251
895    '\u041A',
896    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
897    '\u05D0',
898    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
899    '\u060C',
900    # U+062A (Arabic Letter Teh): cp720
901    '\u062A',
902    # U+0E01 (Thai Character Ko Kai): cp874
903    '\u0E01',
904
905    # Then try more "special" characters. "special" because they may be
906    # interpreted or displayed differently depending on the exact locale
907    # encoding and the font.
908
909    # U+00A0 (No-Break Space)
910    '\u00A0',
911    # U+20AC (Euro Sign)
912    '\u20AC',
913):
914    try:
915        # If Python is set up to use the legacy 'mbcs' in Windows,
916        # 'replace' error mode is used, and encode() returns b'?'
917        # for characters missing in the ANSI codepage
918        if os.fsdecode(os.fsencode(character)) != character:
919            raise UnicodeError
920    except UnicodeError:
921        pass
922    else:
923        FS_NONASCII = character
924        break
925
926# TESTFN_UNICODE is a non-ascii filename
927TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
928if sys.platform == 'darwin':
929    # In Mac OS X's VFS API file names are, by definition, canonically
930    # decomposed Unicode, encoded using UTF-8. See QA1173:
931    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
932    import unicodedata
933    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
934TESTFN_ENCODING = sys.getfilesystemencoding()
935
936# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
937# encoded by the filesystem encoding (in strict mode). It can be None if we
938# cannot generate such filename.
939TESTFN_UNENCODABLE = None
940if os.name == 'nt':
941    # skip win32s (0) or Windows 9x/ME (1)
942    if sys.getwindowsversion().platform >= 2:
943        # Different kinds of characters from various languages to minimize the
944        # probability that the whole name is encodable to MBCS (issue #9819)
945        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
946        try:
947            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
948        except UnicodeEncodeError:
949            pass
950        else:
951            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
952                  'Unicode filename tests may not be effective'
953                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
954            TESTFN_UNENCODABLE = None
955# Mac OS X denies unencodable filenames (invalid utf-8)
956elif sys.platform != 'darwin':
957    try:
958        # ascii and utf-8 cannot encode the byte 0xff
959        b'\xff'.decode(TESTFN_ENCODING)
960    except UnicodeDecodeError:
961        # 0xff will be encoded using the surrogate character u+DCFF
962        TESTFN_UNENCODABLE = TESTFN \
963            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
964    else:
965        # File system encoding (eg. ISO-8859-* encodings) can encode
966        # the byte 0xff. Skip some unicode filename tests.
967        pass
968
969# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
970# decoded from the filesystem encoding (in strict mode). It can be None if we
971# cannot generate such filename (ex: the latin1 encoding can decode any byte
972# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
973# to the surrogateescape error handler (PEP 383), but not from the filesystem
974# encoding in strict mode.
975TESTFN_UNDECODABLE = None
976for name in (
977    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
978    # accepts it to create a file or a directory, or don't accept to enter to
979    # such directory (when the bytes name is used). So test b'\xe7' first: it is
980    # not decodable from cp932.
981    b'\xe7w\xf0',
982    # undecodable from ASCII, UTF-8
983    b'\xff',
984    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
985    # and cp857
986    b'\xae\xd5'
987    # undecodable from UTF-8 (UNIX and Mac OS X)
988    b'\xed\xb2\x80', b'\xed\xb4\x80',
989    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
990    # cp1253, cp1254, cp1255, cp1257, cp1258
991    b'\x81\x98',
992):
993    try:
994        name.decode(TESTFN_ENCODING)
995    except UnicodeDecodeError:
996        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
997        break
998
999if FS_NONASCII:
1000    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
1001else:
1002    TESTFN_NONASCII = None
1003
1004# Save the initial cwd
1005SAVEDCWD = os.getcwd()
1006
1007# Set by libregrtest/main.py so we can skip tests that are not
1008# useful for PGO
1009PGO = False
1010
1011# Set by libregrtest/main.py if we are running the extended (time consuming)
1012# PGO task.  If this is True, PGO is also True.
1013PGO_EXTENDED = False
1014
1015@contextlib.contextmanager
1016def temp_dir(path=None, quiet=False):
1017    """Return a context manager that creates a temporary directory.
1018
1019    Arguments:
1020
1021      path: the directory to create temporarily.  If omitted or None,
1022        defaults to creating a temporary directory using tempfile.mkdtemp.
1023
1024      quiet: if False (the default), the context manager raises an exception
1025        on error.  Otherwise, if the path is specified and cannot be
1026        created, only a warning is issued.
1027
1028    """
1029    dir_created = False
1030    if path is None:
1031        path = tempfile.mkdtemp()
1032        dir_created = True
1033        path = os.path.realpath(path)
1034    else:
1035        try:
1036            os.mkdir(path)
1037            dir_created = True
1038        except OSError as exc:
1039            if not quiet:
1040                raise
1041            warnings.warn(f'tests may fail, unable to create '
1042                          f'temporary directory {path!r}: {exc}',
1043                          RuntimeWarning, stacklevel=3)
1044    if dir_created:
1045        pid = os.getpid()
1046    try:
1047        yield path
1048    finally:
1049        # In case the process forks, let only the parent remove the
1050        # directory. The child has a different process id. (bpo-30028)
1051        if dir_created and pid == os.getpid():
1052            rmtree(path)
1053
1054@contextlib.contextmanager
1055def change_cwd(path, quiet=False):
1056    """Return a context manager that changes the current working directory.
1057
1058    Arguments:
1059
1060      path: the directory to use as the temporary current working directory.
1061
1062      quiet: if False (the default), the context manager raises an exception
1063        on error.  Otherwise, it issues only a warning and keeps the current
1064        working directory the same.
1065
1066    """
1067    saved_dir = os.getcwd()
1068    try:
1069        os.chdir(os.path.realpath(path))
1070    except OSError as exc:
1071        if not quiet:
1072            raise
1073        warnings.warn(f'tests may fail, unable to change the current working '
1074                      f'directory to {path!r}: {exc}',
1075                      RuntimeWarning, stacklevel=3)
1076    try:
1077        yield os.getcwd()
1078    finally:
1079        os.chdir(saved_dir)
1080
1081
1082@contextlib.contextmanager
1083def temp_cwd(name='tempcwd', quiet=False):
1084    """
1085    Context manager that temporarily creates and changes the CWD.
1086
1087    The function temporarily changes the current working directory
1088    after creating a temporary directory in the current directory with
1089    name *name*.  If *name* is None, the temporary directory is
1090    created using tempfile.mkdtemp.
1091
1092    If *quiet* is False (default) and it is not possible to
1093    create or change the CWD, an error is raised.  If *quiet* is True,
1094    only a warning is raised and the original CWD is used.
1095
1096    """
1097    with temp_dir(path=name, quiet=quiet) as temp_path:
1098        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
1099            yield cwd_dir
1100
1101if hasattr(os, "umask"):
1102    @contextlib.contextmanager
1103    def temp_umask(umask):
1104        """Context manager that temporarily sets the process umask."""
1105        oldmask = os.umask(umask)
1106        try:
1107            yield
1108        finally:
1109            os.umask(oldmask)
1110
1111# TEST_HOME_DIR refers to the top level directory of the "test" package
1112# that contains Python's regression test suite
1113TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
1114TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
1115
1116# TEST_DATA_DIR is used as a target download location for remote resources
1117TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
1118
1119def findfile(filename, subdir=None):
1120    """Try to find a file on sys.path or in the test directory.  If it is not
1121    found the argument passed to the function is returned (this does not
1122    necessarily signal failure; could still be the legitimate path).
1123
1124    Setting *subdir* indicates a relative path to use to find the file
1125    rather than looking directly in the path directories.
1126    """
1127    if os.path.isabs(filename):
1128        return filename
1129    if subdir is not None:
1130        filename = os.path.join(subdir, filename)
1131    path = [TEST_HOME_DIR] + sys.path
1132    for dn in path:
1133        fn = os.path.join(dn, filename)
1134        if os.path.exists(fn): return fn
1135    return filename
1136
1137def create_empty_file(filename):
1138    """Create an empty file. If the file already exists, truncate it."""
1139    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
1140    os.close(fd)
1141
1142def sortdict(dict):
1143    "Like repr(dict), but in sorted order."
1144    items = sorted(dict.items())
1145    reprpairs = ["%r: %r" % pair for pair in items]
1146    withcommas = ", ".join(reprpairs)
1147    return "{%s}" % withcommas
1148
1149def make_bad_fd():
1150    """
1151    Create an invalid file descriptor by opening and closing a file and return
1152    its fd.
1153    """
1154    file = open(TESTFN, "wb")
1155    try:
1156        return file.fileno()
1157    finally:
1158        file.close()
1159        unlink(TESTFN)
1160
1161
1162def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
1163    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
1164        compile(statement, '<test string>', 'exec')
1165    err = cm.exception
1166    testcase.assertIsNotNone(err.lineno)
1167    if lineno is not None:
1168        testcase.assertEqual(err.lineno, lineno)
1169    testcase.assertIsNotNone(err.offset)
1170    if offset is not None:
1171        testcase.assertEqual(err.offset, offset)
1172
1173def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None):
1174    # Test also that a warning is emitted only once.
1175    with warnings.catch_warnings(record=True) as warns:
1176        warnings.simplefilter('always', SyntaxWarning)
1177        compile(statement, '<testcase>', 'exec')
1178    testcase.assertEqual(len(warns), 1, warns)
1179
1180    warn, = warns
1181    testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category)
1182    if errtext:
1183        testcase.assertRegex(str(warn.message), errtext)
1184    testcase.assertEqual(warn.filename, '<testcase>')
1185    testcase.assertIsNotNone(warn.lineno)
1186    if lineno is not None:
1187        testcase.assertEqual(warn.lineno, lineno)
1188
1189    # SyntaxWarning should be converted to SyntaxError when raised,
1190    # since the latter contains more information and provides better
1191    # error report.
1192    with warnings.catch_warnings(record=True) as warns:
1193        warnings.simplefilter('error', SyntaxWarning)
1194        check_syntax_error(testcase, statement, errtext,
1195                           lineno=lineno, offset=offset)
1196    # No warnings are leaked when a SyntaxError is raised.
1197    testcase.assertEqual(warns, [])
1198
1199
1200def open_urlresource(url, *args, **kw):
1201    import urllib.request, urllib.parse
1202
1203    check = kw.pop('check', None)
1204
1205    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1206
1207    fn = os.path.join(TEST_DATA_DIR, filename)
1208
1209    def check_valid_file(fn):
1210        f = open(fn, *args, **kw)
1211        if check is None:
1212            return f
1213        elif check(f):
1214            f.seek(0)
1215            return f
1216        f.close()
1217
1218    if os.path.exists(fn):
1219        f = check_valid_file(fn)
1220        if f is not None:
1221            return f
1222        unlink(fn)
1223
1224    # Verify the requirement before downloading the file
1225    requires('urlfetch')
1226
1227    if verbose:
1228        print('\tfetching %s ...' % url, file=get_original_stdout())
1229    opener = urllib.request.build_opener()
1230    if gzip:
1231        opener.addheaders.append(('Accept-Encoding', 'gzip'))
1232    f = opener.open(url, timeout=15)
1233    if gzip and f.headers.get('Content-Encoding') == 'gzip':
1234        f = gzip.GzipFile(fileobj=f)
1235    try:
1236        with open(fn, "wb") as out:
1237            s = f.read()
1238            while s:
1239                out.write(s)
1240                s = f.read()
1241    finally:
1242        f.close()
1243
1244    f = check_valid_file(fn)
1245    if f is not None:
1246        return f
1247    raise TestFailed('invalid resource %r' % fn)
1248
1249
1250class WarningsRecorder(object):
1251    """Convenience wrapper for the warnings list returned on
1252       entry to the warnings.catch_warnings() context manager.
1253    """
1254    def __init__(self, warnings_list):
1255        self._warnings = warnings_list
1256        self._last = 0
1257
1258    def __getattr__(self, attr):
1259        if len(self._warnings) > self._last:
1260            return getattr(self._warnings[-1], attr)
1261        elif attr in warnings.WarningMessage._WARNING_DETAILS:
1262            return None
1263        raise AttributeError("%r has no attribute %r" % (self, attr))
1264
1265    @property
1266    def warnings(self):
1267        return self._warnings[self._last:]
1268
1269    def reset(self):
1270        self._last = len(self._warnings)
1271
1272
1273def _filterwarnings(filters, quiet=False):
1274    """Catch the warnings, then check if all the expected
1275    warnings have been raised and re-raise unexpected warnings.
1276    If 'quiet' is True, only re-raise the unexpected warnings.
1277    """
1278    # Clear the warning registry of the calling module
1279    # in order to re-raise the warnings.
1280    frame = sys._getframe(2)
1281    registry = frame.f_globals.get('__warningregistry__')
1282    if registry:
1283        registry.clear()
1284    with warnings.catch_warnings(record=True) as w:
1285        # Set filter "always" to record all warnings.  Because
1286        # test_warnings swap the module, we need to look up in
1287        # the sys.modules dictionary.
1288        sys.modules['warnings'].simplefilter("always")
1289        yield WarningsRecorder(w)
1290    # Filter the recorded warnings
1291    reraise = list(w)
1292    missing = []
1293    for msg, cat in filters:
1294        seen = False
1295        for w in reraise[:]:
1296            warning = w.message
1297            # Filter out the matching messages
1298            if (re.match(msg, str(warning), re.I) and
1299                issubclass(warning.__class__, cat)):
1300                seen = True
1301                reraise.remove(w)
1302        if not seen and not quiet:
1303            # This filter caught nothing
1304            missing.append((msg, cat.__name__))
1305    if reraise:
1306        raise AssertionError("unhandled warning %s" % reraise[0])
1307    if missing:
1308        raise AssertionError("filter (%r, %s) did not catch any warning" %
1309                             missing[0])
1310
1311
1312@contextlib.contextmanager
1313def check_warnings(*filters, **kwargs):
1314    """Context manager to silence warnings.
1315
1316    Accept 2-tuples as positional arguments:
1317        ("message regexp", WarningCategory)
1318
1319    Optional argument:
1320     - if 'quiet' is True, it does not fail if a filter catches nothing
1321        (default True without argument,
1322         default False if some filters are defined)
1323
1324    Without argument, it defaults to:
1325        check_warnings(("", Warning), quiet=True)
1326    """
1327    quiet = kwargs.get('quiet')
1328    if not filters:
1329        filters = (("", Warning),)
1330        # Preserve backward compatibility
1331        if quiet is None:
1332            quiet = True
1333    return _filterwarnings(filters, quiet)
1334
1335
1336@contextlib.contextmanager
1337def check_no_warnings(testcase, message='', category=Warning, force_gc=False):
1338    """Context manager to check that no warnings are emitted.
1339
1340    This context manager enables a given warning within its scope
1341    and checks that no warnings are emitted even with that warning
1342    enabled.
1343
1344    If force_gc is True, a garbage collection is attempted before checking
1345    for warnings. This may help to catch warnings emitted when objects
1346    are deleted, such as ResourceWarning.
1347
1348    Other keyword arguments are passed to warnings.filterwarnings().
1349    """
1350    with warnings.catch_warnings(record=True) as warns:
1351        warnings.filterwarnings('always',
1352                                message=message,
1353                                category=category)
1354        yield
1355        if force_gc:
1356            gc_collect()
1357    testcase.assertEqual(warns, [])
1358
1359
1360@contextlib.contextmanager
1361def check_no_resource_warning(testcase):
1362    """Context manager to check that no ResourceWarning is emitted.
1363
1364    Usage:
1365
1366        with check_no_resource_warning(self):
1367            f = open(...)
1368            ...
1369            del f
1370
1371    You must remove the object which may emit ResourceWarning before
1372    the end of the context manager.
1373    """
1374    with check_no_warnings(testcase, category=ResourceWarning, force_gc=True):
1375        yield
1376
1377
1378class CleanImport(object):
1379    """Context manager to force import to return a new module reference.
1380
1381    This is useful for testing module-level behaviours, such as
1382    the emission of a DeprecationWarning on import.
1383
1384    Use like this:
1385
1386        with CleanImport("foo"):
1387            importlib.import_module("foo") # new reference
1388    """
1389
1390    def __init__(self, *module_names):
1391        self.original_modules = sys.modules.copy()
1392        for module_name in module_names:
1393            if module_name in sys.modules:
1394                module = sys.modules[module_name]
1395                # It is possible that module_name is just an alias for
1396                # another module (e.g. stub for modules renamed in 3.x).
1397                # In that case, we also need delete the real module to clear
1398                # the import cache.
1399                if module.__name__ != module_name:
1400                    del sys.modules[module.__name__]
1401                del sys.modules[module_name]
1402
1403    def __enter__(self):
1404        return self
1405
1406    def __exit__(self, *ignore_exc):
1407        sys.modules.update(self.original_modules)
1408
1409
1410class EnvironmentVarGuard(collections.abc.MutableMapping):
1411
1412    """Class to help protect the environment variable properly.  Can be used as
1413    a context manager."""
1414
1415    def __init__(self):
1416        self._environ = os.environ
1417        self._changed = {}
1418
1419    def __getitem__(self, envvar):
1420        return self._environ[envvar]
1421
1422    def __setitem__(self, envvar, value):
1423        # Remember the initial value on the first access
1424        if envvar not in self._changed:
1425            self._changed[envvar] = self._environ.get(envvar)
1426        self._environ[envvar] = value
1427
1428    def __delitem__(self, envvar):
1429        # Remember the initial value on the first access
1430        if envvar not in self._changed:
1431            self._changed[envvar] = self._environ.get(envvar)
1432        if envvar in self._environ:
1433            del self._environ[envvar]
1434
1435    def keys(self):
1436        return self._environ.keys()
1437
1438    def __iter__(self):
1439        return iter(self._environ)
1440
1441    def __len__(self):
1442        return len(self._environ)
1443
1444    def set(self, envvar, value):
1445        self[envvar] = value
1446
1447    def unset(self, envvar):
1448        del self[envvar]
1449
1450    def __enter__(self):
1451        return self
1452
1453    def __exit__(self, *ignore_exc):
1454        for (k, v) in self._changed.items():
1455            if v is None:
1456                if k in self._environ:
1457                    del self._environ[k]
1458            else:
1459                self._environ[k] = v
1460        os.environ = self._environ
1461
1462
1463class DirsOnSysPath(object):
1464    """Context manager to temporarily add directories to sys.path.
1465
1466    This makes a copy of sys.path, appends any directories given
1467    as positional arguments, then reverts sys.path to the copied
1468    settings when the context ends.
1469
1470    Note that *all* sys.path modifications in the body of the
1471    context manager, including replacement of the object,
1472    will be reverted at the end of the block.
1473    """
1474
1475    def __init__(self, *paths):
1476        self.original_value = sys.path[:]
1477        self.original_object = sys.path
1478        sys.path.extend(paths)
1479
1480    def __enter__(self):
1481        return self
1482
1483    def __exit__(self, *ignore_exc):
1484        sys.path = self.original_object
1485        sys.path[:] = self.original_value
1486
1487
1488class TransientResource(object):
1489
1490    """Raise ResourceDenied if an exception is raised while the context manager
1491    is in effect that matches the specified exception and attributes."""
1492
1493    def __init__(self, exc, **kwargs):
1494        self.exc = exc
1495        self.attrs = kwargs
1496
1497    def __enter__(self):
1498        return self
1499
1500    def __exit__(self, type_=None, value=None, traceback=None):
1501        """If type_ is a subclass of self.exc and value has attributes matching
1502        self.attrs, raise ResourceDenied.  Otherwise let the exception
1503        propagate (if any)."""
1504        if type_ is not None and issubclass(self.exc, type_):
1505            for attr, attr_value in self.attrs.items():
1506                if not hasattr(value, attr):
1507                    break
1508                if getattr(value, attr) != attr_value:
1509                    break
1510            else:
1511                raise ResourceDenied("an optional resource is not available")
1512
1513# Context managers that raise ResourceDenied when various issues
1514# with the Internet connection manifest themselves as exceptions.
1515# XXX deprecate these and use transient_internet() instead
1516time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1517socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1518ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1519
1520
1521def get_socket_conn_refused_errs():
1522    """
1523    Get the different socket error numbers ('errno') which can be received
1524    when a connection is refused.
1525    """
1526    errors = [errno.ECONNREFUSED]
1527    if hasattr(errno, 'ENETUNREACH'):
1528        # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
1529        errors.append(errno.ENETUNREACH)
1530    if hasattr(errno, 'EADDRNOTAVAIL'):
1531        # bpo-31910: socket.create_connection() fails randomly
1532        # with EADDRNOTAVAIL on Travis CI
1533        errors.append(errno.EADDRNOTAVAIL)
1534    if hasattr(errno, 'EHOSTUNREACH'):
1535        # bpo-37583: The destination host cannot be reached
1536        errors.append(errno.EHOSTUNREACH)
1537    if not IPV6_ENABLED:
1538        errors.append(errno.EAFNOSUPPORT)
1539    return errors
1540
1541
1542@contextlib.contextmanager
1543def transient_internet(resource_name, *, timeout=30.0, errnos=()):
1544    """Return a context manager that raises ResourceDenied when various issues
1545    with the Internet connection manifest themselves as exceptions."""
1546    default_errnos = [
1547        ('ECONNREFUSED', 111),
1548        ('ECONNRESET', 104),
1549        ('EHOSTUNREACH', 113),
1550        ('ENETUNREACH', 101),
1551        ('ETIMEDOUT', 110),
1552        # socket.create_connection() fails randomly with
1553        # EADDRNOTAVAIL on Travis CI.
1554        ('EADDRNOTAVAIL', 99),
1555    ]
1556    default_gai_errnos = [
1557        ('EAI_AGAIN', -3),
1558        ('EAI_FAIL', -4),
1559        ('EAI_NONAME', -2),
1560        ('EAI_NODATA', -5),
1561        # Encountered when trying to resolve IPv6-only hostnames
1562        ('WSANO_DATA', 11004),
1563    ]
1564
1565    denied = ResourceDenied("Resource %r is not available" % resource_name)
1566    captured_errnos = errnos
1567    gai_errnos = []
1568    if not captured_errnos:
1569        captured_errnos = [getattr(errno, name, num)
1570                           for (name, num) in default_errnos]
1571        gai_errnos = [getattr(socket, name, num)
1572                      for (name, num) in default_gai_errnos]
1573
1574    def filter_error(err):
1575        n = getattr(err, 'errno', None)
1576        if (isinstance(err, socket.timeout) or
1577            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1578            (isinstance(err, urllib.error.HTTPError) and
1579             500 <= err.code <= 599) or
1580            (isinstance(err, urllib.error.URLError) and
1581                 (("ConnectionRefusedError" in err.reason) or
1582                  ("TimeoutError" in err.reason) or
1583                  ("EOFError" in err.reason))) or
1584            n in captured_errnos):
1585            if not verbose:
1586                sys.stderr.write(denied.args[0] + "\n")
1587            raise denied from err
1588
1589    old_timeout = socket.getdefaulttimeout()
1590    try:
1591        if timeout is not None:
1592            socket.setdefaulttimeout(timeout)
1593        yield
1594    except nntplib.NNTPTemporaryError as err:
1595        if verbose:
1596            sys.stderr.write(denied.args[0] + "\n")
1597        raise denied from err
1598    except OSError as err:
1599        # urllib can wrap original socket errors multiple times (!), we must
1600        # unwrap to get at the original error.
1601        while True:
1602            a = err.args
1603            if len(a) >= 1 and isinstance(a[0], OSError):
1604                err = a[0]
1605            # The error can also be wrapped as args[1]:
1606            #    except socket.error as msg:
1607            #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1608            elif len(a) >= 2 and isinstance(a[1], OSError):
1609                err = a[1]
1610            else:
1611                break
1612        filter_error(err)
1613        raise
1614    # XXX should we catch generic exceptions and look for their
1615    # __cause__ or __context__?
1616    finally:
1617        socket.setdefaulttimeout(old_timeout)
1618
1619
1620@contextlib.contextmanager
1621def captured_output(stream_name):
1622    """Return a context manager used by captured_stdout/stdin/stderr
1623    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1624    import io
1625    orig_stdout = getattr(sys, stream_name)
1626    setattr(sys, stream_name, io.StringIO())
1627    try:
1628        yield getattr(sys, stream_name)
1629    finally:
1630        setattr(sys, stream_name, orig_stdout)
1631
1632def captured_stdout():
1633    """Capture the output of sys.stdout:
1634
1635       with captured_stdout() as stdout:
1636           print("hello")
1637       self.assertEqual(stdout.getvalue(), "hello\\n")
1638    """
1639    return captured_output("stdout")
1640
1641def captured_stderr():
1642    """Capture the output of sys.stderr:
1643
1644       with captured_stderr() as stderr:
1645           print("hello", file=sys.stderr)
1646       self.assertEqual(stderr.getvalue(), "hello\\n")
1647    """
1648    return captured_output("stderr")
1649
1650def captured_stdin():
1651    """Capture the input to sys.stdin:
1652
1653       with captured_stdin() as stdin:
1654           stdin.write('hello\\n')
1655           stdin.seek(0)
1656           # call test code that consumes from sys.stdin
1657           captured = input()
1658       self.assertEqual(captured, "hello")
1659    """
1660    return captured_output("stdin")
1661
1662
1663def gc_collect():
1664    """Force as many objects as possible to be collected.
1665
1666    In non-CPython implementations of Python, this is needed because timely
1667    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1668    this can be the case in case of reference cycles.)  This means that __del__
1669    methods may be called later than expected and weakrefs may remain alive for
1670    longer than expected.  This function tries its best to force all garbage
1671    objects to disappear.
1672    """
1673    gc.collect()
1674    if is_jython:
1675        time.sleep(0.1)
1676    gc.collect()
1677    gc.collect()
1678
1679@contextlib.contextmanager
1680def disable_gc():
1681    have_gc = gc.isenabled()
1682    gc.disable()
1683    try:
1684        yield
1685    finally:
1686        if have_gc:
1687            gc.enable()
1688
1689
1690def python_is_optimized():
1691    """Find if Python was built with optimizations."""
1692    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1693    final_opt = ""
1694    for opt in cflags.split():
1695        if opt.startswith('-O'):
1696            final_opt = opt
1697    return final_opt not in ('', '-O0', '-Og')
1698
1699
1700_header = 'nP'
1701_align = '0n'
1702if hasattr(sys, "getobjects"):
1703    _header = '2P' + _header
1704    _align = '0P'
1705_vheader = _header + 'n'
1706
1707def calcobjsize(fmt):
1708    return struct.calcsize(_header + fmt + _align)
1709
1710def calcvobjsize(fmt):
1711    return struct.calcsize(_vheader + fmt + _align)
1712
1713
1714_TPFLAGS_HAVE_GC = 1<<14
1715_TPFLAGS_HEAPTYPE = 1<<9
1716
1717def check_sizeof(test, o, size):
1718    import _testcapi
1719    result = sys.getsizeof(o)
1720    # add GC header size
1721    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1722        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1723        size += _testcapi.SIZEOF_PYGC_HEAD
1724    msg = 'wrong size for %s: got %d, expected %d' \
1725            % (type(o), result, size)
1726    test.assertEqual(result, size, msg)
1727
1728#=======================================================================
1729# Decorator for running a function in a different locale, correctly resetting
1730# it afterwards.
1731
1732def run_with_locale(catstr, *locales):
1733    def decorator(func):
1734        def inner(*args, **kwds):
1735            try:
1736                import locale
1737                category = getattr(locale, catstr)
1738                orig_locale = locale.setlocale(category)
1739            except AttributeError:
1740                # if the test author gives us an invalid category string
1741                raise
1742            except:
1743                # cannot retrieve original locale, so do nothing
1744                locale = orig_locale = None
1745            else:
1746                for loc in locales:
1747                    try:
1748                        locale.setlocale(category, loc)
1749                        break
1750                    except:
1751                        pass
1752
1753            # now run the function, resetting the locale on exceptions
1754            try:
1755                return func(*args, **kwds)
1756            finally:
1757                if locale and orig_locale:
1758                    locale.setlocale(category, orig_locale)
1759        inner.__name__ = func.__name__
1760        inner.__doc__ = func.__doc__
1761        return inner
1762    return decorator
1763
1764#=======================================================================
1765# Decorator for running a function in a specific timezone, correctly
1766# resetting it afterwards.
1767
1768def run_with_tz(tz):
1769    def decorator(func):
1770        def inner(*args, **kwds):
1771            try:
1772                tzset = time.tzset
1773            except AttributeError:
1774                raise unittest.SkipTest("tzset required")
1775            if 'TZ' in os.environ:
1776                orig_tz = os.environ['TZ']
1777            else:
1778                orig_tz = None
1779            os.environ['TZ'] = tz
1780            tzset()
1781
1782            # now run the function, resetting the tz on exceptions
1783            try:
1784                return func(*args, **kwds)
1785            finally:
1786                if orig_tz is None:
1787                    del os.environ['TZ']
1788                else:
1789                    os.environ['TZ'] = orig_tz
1790                time.tzset()
1791
1792        inner.__name__ = func.__name__
1793        inner.__doc__ = func.__doc__
1794        return inner
1795    return decorator
1796
1797#=======================================================================
1798# Big-memory-test support. Separate from 'resources' because memory use
1799# should be configurable.
1800
1801# Some handy shorthands. Note that these are used for byte-limits as well
1802# as size-limits, in the various bigmem tests
1803_1M = 1024*1024
1804_1G = 1024 * _1M
1805_2G = 2 * _1G
1806_4G = 4 * _1G
1807
1808MAX_Py_ssize_t = sys.maxsize
1809
1810def set_memlimit(limit):
1811    global max_memuse
1812    global real_max_memuse
1813    sizes = {
1814        'k': 1024,
1815        'm': _1M,
1816        'g': _1G,
1817        't': 1024*_1G,
1818    }
1819    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1820                 re.IGNORECASE | re.VERBOSE)
1821    if m is None:
1822        raise ValueError('Invalid memory limit %r' % (limit,))
1823    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1824    real_max_memuse = memlimit
1825    if memlimit > MAX_Py_ssize_t:
1826        memlimit = MAX_Py_ssize_t
1827    if memlimit < _2G - 1:
1828        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1829    max_memuse = memlimit
1830
1831class _MemoryWatchdog:
1832    """An object which periodically watches the process' memory consumption
1833    and prints it out.
1834    """
1835
1836    def __init__(self):
1837        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1838        self.started = False
1839
1840    def start(self):
1841        try:
1842            f = open(self.procfile, 'r')
1843        except OSError as e:
1844            warnings.warn('/proc not available for stats: {}'.format(e),
1845                          RuntimeWarning)
1846            sys.stderr.flush()
1847            return
1848
1849        with f:
1850            watchdog_script = findfile("memory_watchdog.py")
1851            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1852                                                 stdin=f,
1853                                                 stderr=subprocess.DEVNULL)
1854        self.started = True
1855
1856    def stop(self):
1857        if self.started:
1858            self.mem_watchdog.terminate()
1859            self.mem_watchdog.wait()
1860
1861
1862def bigmemtest(size, memuse, dry_run=True):
1863    """Decorator for bigmem tests.
1864
1865    'size' is a requested size for the test (in arbitrary, test-interpreted
1866    units.) 'memuse' is the number of bytes per unit for the test, or a good
1867    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1868    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1869
1870    The 'size' argument is normally passed to the decorated test method as an
1871    extra argument. If 'dry_run' is true, the value passed to the test method
1872    may be less than the requested value. If 'dry_run' is false, it means the
1873    test doesn't support dummy runs when -M is not specified.
1874    """
1875    def decorator(f):
1876        def wrapper(self):
1877            size = wrapper.size
1878            memuse = wrapper.memuse
1879            if not real_max_memuse:
1880                maxsize = 5147
1881            else:
1882                maxsize = size
1883
1884            if ((real_max_memuse or not dry_run)
1885                and real_max_memuse < maxsize * memuse):
1886                raise unittest.SkipTest(
1887                    "not enough memory: %.1fG minimum needed"
1888                    % (size * memuse / (1024 ** 3)))
1889
1890            if real_max_memuse and verbose:
1891                print()
1892                print(" ... expected peak memory use: {peak:.1f}G"
1893                      .format(peak=size * memuse / (1024 ** 3)))
1894                watchdog = _MemoryWatchdog()
1895                watchdog.start()
1896            else:
1897                watchdog = None
1898
1899            try:
1900                return f(self, maxsize)
1901            finally:
1902                if watchdog:
1903                    watchdog.stop()
1904
1905        wrapper.size = size
1906        wrapper.memuse = memuse
1907        return wrapper
1908    return decorator
1909
1910def bigaddrspacetest(f):
1911    """Decorator for tests that fill the address space."""
1912    def wrapper(self):
1913        if max_memuse < MAX_Py_ssize_t:
1914            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1915                raise unittest.SkipTest(
1916                    "not enough memory: try a 32-bit build instead")
1917            else:
1918                raise unittest.SkipTest(
1919                    "not enough memory: %.1fG minimum needed"
1920                    % (MAX_Py_ssize_t / (1024 ** 3)))
1921        else:
1922            return f(self)
1923    return wrapper
1924
1925#=======================================================================
1926# unittest integration.
1927
1928class BasicTestRunner:
1929    def run(self, test):
1930        result = unittest.TestResult()
1931        test(result)
1932        return result
1933
1934def _id(obj):
1935    return obj
1936
1937def requires_resource(resource):
1938    if resource == 'gui' and not _is_gui_available():
1939        return unittest.skip(_is_gui_available.reason)
1940    if is_resource_enabled(resource):
1941        return _id
1942    else:
1943        return unittest.skip("resource {0!r} is not enabled".format(resource))
1944
1945def cpython_only(test):
1946    """
1947    Decorator for tests only applicable on CPython.
1948    """
1949    return impl_detail(cpython=True)(test)
1950
1951def impl_detail(msg=None, **guards):
1952    if check_impl_detail(**guards):
1953        return _id
1954    if msg is None:
1955        guardnames, default = _parse_guards(guards)
1956        if default:
1957            msg = "implementation detail not available on {0}"
1958        else:
1959            msg = "implementation detail specific to {0}"
1960        guardnames = sorted(guardnames.keys())
1961        msg = msg.format(' or '.join(guardnames))
1962    return unittest.skip(msg)
1963
1964def _parse_guards(guards):
1965    # Returns a tuple ({platform_name: run_me}, default_value)
1966    if not guards:
1967        return ({'cpython': True}, False)
1968    is_true = list(guards.values())[0]
1969    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1970    return (guards, not is_true)
1971
1972# Use the following check to guard CPython's implementation-specific tests --
1973# or to run them only on the implementation(s) guarded by the arguments.
1974def check_impl_detail(**guards):
1975    """This function returns True or False depending on the host platform.
1976       Examples:
1977          if check_impl_detail():               # only on CPython (default)
1978          if check_impl_detail(jython=True):    # only on Jython
1979          if check_impl_detail(cpython=False):  # everywhere except on CPython
1980    """
1981    guards, default = _parse_guards(guards)
1982    return guards.get(platform.python_implementation().lower(), default)
1983
1984
1985def no_tracing(func):
1986    """Decorator to temporarily turn off tracing for the duration of a test."""
1987    if not hasattr(sys, 'gettrace'):
1988        return func
1989    else:
1990        @functools.wraps(func)
1991        def wrapper(*args, **kwargs):
1992            original_trace = sys.gettrace()
1993            try:
1994                sys.settrace(None)
1995                return func(*args, **kwargs)
1996            finally:
1997                sys.settrace(original_trace)
1998        return wrapper
1999
2000
2001def refcount_test(test):
2002    """Decorator for tests which involve reference counting.
2003
2004    To start, the decorator does not run the test if is not run by CPython.
2005    After that, any trace function is unset during the test to prevent
2006    unexpected refcounts caused by the trace function.
2007
2008    """
2009    return no_tracing(cpython_only(test))
2010
2011
2012def _filter_suite(suite, pred):
2013    """Recursively filter test cases in a suite based on a predicate."""
2014    newtests = []
2015    for test in suite._tests:
2016        if isinstance(test, unittest.TestSuite):
2017            _filter_suite(test, pred)
2018            newtests.append(test)
2019        else:
2020            if pred(test):
2021                newtests.append(test)
2022    suite._tests = newtests
2023
2024def _run_suite(suite):
2025    """Run tests from a unittest.TestSuite-derived class."""
2026    runner = get_test_runner(sys.stdout,
2027                             verbosity=verbose,
2028                             capture_output=(junit_xml_list is not None))
2029
2030    result = runner.run(suite)
2031
2032    if junit_xml_list is not None:
2033        junit_xml_list.append(result.get_xml_element())
2034
2035    if not result.testsRun and not result.skipped:
2036        raise TestDidNotRun
2037    if not result.wasSuccessful():
2038        if len(result.errors) == 1 and not result.failures:
2039            err = result.errors[0][1]
2040        elif len(result.failures) == 1 and not result.errors:
2041            err = result.failures[0][1]
2042        else:
2043            err = "multiple errors occurred"
2044            if not verbose: err += "; run in verbose mode for details"
2045        raise TestFailed(err)
2046
2047
2048# By default, don't filter tests
2049_match_test_func = None
2050_match_test_patterns = None
2051
2052
2053def match_test(test):
2054    # Function used by support.run_unittest() and regrtest --list-cases
2055    if _match_test_func is None:
2056        return True
2057    else:
2058        return _match_test_func(test.id())
2059
2060
2061def _is_full_match_test(pattern):
2062    # If a pattern contains at least one dot, it's considered
2063    # as a full test identifier.
2064    # Example: 'test.test_os.FileTests.test_access'.
2065    #
2066    # Reject patterns which contain fnmatch patterns: '*', '?', '[...]'
2067    # or '[!...]'. For example, reject 'test_access*'.
2068    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
2069
2070
2071def set_match_tests(patterns):
2072    global _match_test_func, _match_test_patterns
2073
2074    if patterns == _match_test_patterns:
2075        # No change: no need to recompile patterns.
2076        return
2077
2078    if not patterns:
2079        func = None
2080        # set_match_tests(None) behaves as set_match_tests(())
2081        patterns = ()
2082    elif all(map(_is_full_match_test, patterns)):
2083        # Simple case: all patterns are full test identifier.
2084        # The test.bisect_cmd utility only uses such full test identifiers.
2085        func = set(patterns).__contains__
2086    else:
2087        regex = '|'.join(map(fnmatch.translate, patterns))
2088        # The search *is* case sensitive on purpose:
2089        # don't use flags=re.IGNORECASE
2090        regex_match = re.compile(regex).match
2091
2092        def match_test_regex(test_id):
2093            if regex_match(test_id):
2094                # The regex matches the whole identifier, for example
2095                # 'test.test_os.FileTests.test_access'.
2096                return True
2097            else:
2098                # Try to match parts of the test identifier.
2099                # For example, split 'test.test_os.FileTests.test_access'
2100                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
2101                return any(map(regex_match, test_id.split(".")))
2102
2103        func = match_test_regex
2104
2105    # Create a copy since patterns can be mutable and so modified later
2106    _match_test_patterns = tuple(patterns)
2107    _match_test_func = func
2108
2109
2110
2111def run_unittest(*classes):
2112    """Run tests from unittest.TestCase-derived classes."""
2113    valid_types = (unittest.TestSuite, unittest.TestCase)
2114    suite = unittest.TestSuite()
2115    for cls in classes:
2116        if isinstance(cls, str):
2117            if cls in sys.modules:
2118                suite.addTest(unittest.findTestCases(sys.modules[cls]))
2119            else:
2120                raise ValueError("str arguments must be keys in sys.modules")
2121        elif isinstance(cls, valid_types):
2122            suite.addTest(cls)
2123        else:
2124            suite.addTest(unittest.makeSuite(cls))
2125    _filter_suite(suite, match_test)
2126    _run_suite(suite)
2127
2128#=======================================================================
2129# Check for the presence of docstrings.
2130
2131# Rather than trying to enumerate all the cases where docstrings may be
2132# disabled, we just check for that directly
2133
2134def _check_docstrings():
2135    """Just used to check if docstrings are enabled"""
2136
2137MISSING_C_DOCSTRINGS = (check_impl_detail() and
2138                        sys.platform != 'win32' and
2139                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
2140
2141HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
2142                   not MISSING_C_DOCSTRINGS)
2143
2144requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
2145                                          "test requires docstrings")
2146
2147
2148#=======================================================================
2149# doctest driver.
2150
2151def run_doctest(module, verbosity=None, optionflags=0):
2152    """Run doctest on the given module.  Return (#failures, #tests).
2153
2154    If optional argument verbosity is not specified (or is None), pass
2155    support's belief about verbosity on to doctest.  Else doctest's
2156    usual behavior is used (it searches sys.argv for -v).
2157    """
2158
2159    import doctest
2160
2161    if verbosity is None:
2162        verbosity = verbose
2163    else:
2164        verbosity = None
2165
2166    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
2167    if f:
2168        raise TestFailed("%d of %d doctests failed" % (f, t))
2169    if verbose:
2170        print('doctest (%s) ... %d tests with zero failures' %
2171              (module.__name__, t))
2172    return f, t
2173
2174
2175#=======================================================================
2176# Support for saving and restoring the imported modules.
2177
2178def modules_setup():
2179    return sys.modules.copy(),
2180
2181def modules_cleanup(oldmodules):
2182    # Encoders/decoders are registered permanently within the internal
2183    # codec cache. If we destroy the corresponding modules their
2184    # globals will be set to None which will trip up the cached functions.
2185    encodings = [(k, v) for k, v in sys.modules.items()
2186                 if k.startswith('encodings.')]
2187    sys.modules.clear()
2188    sys.modules.update(encodings)
2189    # XXX: This kind of problem can affect more than just encodings. In particular
2190    # extension modules (such as _ssl) don't cope with reloading properly.
2191    # Really, test modules should be cleaning out the test specific modules they
2192    # know they added (ala test_runpy) rather than relying on this function (as
2193    # test_importhooks and test_pkg do currently).
2194    # Implicitly imported *real* modules should be left alone (see issue 10556).
2195    sys.modules.update(oldmodules)
2196
2197#=======================================================================
2198# Threading support to prevent reporting refleaks when running regrtest.py -R
2199
2200# Flag used by saved_test_environment of test.libregrtest.save_env,
2201# to check if a test modified the environment. The flag should be set to False
2202# before running a new test.
2203#
2204# For example, threading_cleanup() sets the flag is the function fails
2205# to cleanup threads.
2206environment_altered = False
2207
2208# NOTE: we use thread._count() rather than threading.enumerate() (or the
2209# moral equivalent thereof) because a threading.Thread object is still alive
2210# until its __bootstrap() method has returned, even after it has been
2211# unregistered from the threading module.
2212# thread._count(), on the other hand, only gets decremented *after* the
2213# __bootstrap() method has returned, which gives us reliable reference counts
2214# at the end of a test run.
2215
2216def threading_setup():
2217    return _thread._count(), threading._dangling.copy()
2218
2219def threading_cleanup(*original_values):
2220    global environment_altered
2221
2222    _MAX_COUNT = 100
2223
2224    for count in range(_MAX_COUNT):
2225        values = _thread._count(), threading._dangling
2226        if values == original_values:
2227            break
2228
2229        if not count:
2230            # Display a warning at the first iteration
2231            environment_altered = True
2232            dangling_threads = values[1]
2233            print("Warning -- threading_cleanup() failed to cleanup "
2234                  "%s threads (count: %s, dangling: %s)"
2235                  % (values[0] - original_values[0],
2236                     values[0], len(dangling_threads)),
2237                  file=sys.stderr)
2238            for thread in dangling_threads:
2239                print(f"Dangling thread: {thread!r}", file=sys.stderr)
2240            sys.stderr.flush()
2241
2242            # Don't hold references to threads
2243            dangling_threads = None
2244        values = None
2245
2246        time.sleep(0.01)
2247        gc_collect()
2248
2249
2250def reap_threads(func):
2251    """Use this function when threads are being used.  This will
2252    ensure that the threads are cleaned up even when the test fails.
2253    """
2254    @functools.wraps(func)
2255    def decorator(*args):
2256        key = threading_setup()
2257        try:
2258            return func(*args)
2259        finally:
2260            threading_cleanup(*key)
2261    return decorator
2262
2263
2264@contextlib.contextmanager
2265def wait_threads_exit(timeout=60.0):
2266    """
2267    bpo-31234: Context manager to wait until all threads created in the with
2268    statement exit.
2269
2270    Use _thread.count() to check if threads exited. Indirectly, wait until
2271    threads exit the internal t_bootstrap() C function of the _thread module.
2272
2273    threading_setup() and threading_cleanup() are designed to emit a warning
2274    if a test leaves running threads in the background. This context manager
2275    is designed to cleanup threads started by the _thread.start_new_thread()
2276    which doesn't allow to wait for thread exit, whereas thread.Thread has a
2277    join() method.
2278    """
2279    old_count = _thread._count()
2280    try:
2281        yield
2282    finally:
2283        start_time = time.monotonic()
2284        deadline = start_time + timeout
2285        while True:
2286            count = _thread._count()
2287            if count <= old_count:
2288                break
2289            if time.monotonic() > deadline:
2290                dt = time.monotonic() - start_time
2291                msg = (f"wait_threads() failed to cleanup {count - old_count} "
2292                       f"threads after {dt:.1f} seconds "
2293                       f"(count: {count}, old count: {old_count})")
2294                raise AssertionError(msg)
2295            time.sleep(0.010)
2296            gc_collect()
2297
2298
2299def join_thread(thread, timeout=30.0):
2300    """Join a thread. Raise an AssertionError if the thread is still alive
2301    after timeout seconds.
2302    """
2303    thread.join(timeout)
2304    if thread.is_alive():
2305        msg = f"failed to join the thread in {timeout:.1f} seconds"
2306        raise AssertionError(msg)
2307
2308
2309def reap_children():
2310    """Use this function at the end of test_main() whenever sub-processes
2311    are started.  This will help ensure that no extra children (zombies)
2312    stick around to hog resources and create problems when looking
2313    for refleaks.
2314    """
2315    global environment_altered
2316
2317    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
2318    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
2319        return
2320
2321    # Reap all our dead child processes so we don't leave zombies around.
2322    # These hog resources and might be causing some of the buildbots to die.
2323    while True:
2324        try:
2325            # Read the exit status of any child process which already completed
2326            pid, status = os.waitpid(-1, os.WNOHANG)
2327        except OSError:
2328            break
2329
2330        if pid == 0:
2331            break
2332
2333        print("Warning -- reap_children() reaped child process %s"
2334              % pid, file=sys.stderr)
2335        environment_altered = True
2336
2337
2338@contextlib.contextmanager
2339def start_threads(threads, unlock=None):
2340    threads = list(threads)
2341    started = []
2342    try:
2343        try:
2344            for t in threads:
2345                t.start()
2346                started.append(t)
2347        except:
2348            if verbose:
2349                print("Can't start %d threads, only %d threads started" %
2350                      (len(threads), len(started)))
2351            raise
2352        yield
2353    finally:
2354        try:
2355            if unlock:
2356                unlock()
2357            endtime = starttime = time.monotonic()
2358            for timeout in range(1, 16):
2359                endtime += 60
2360                for t in started:
2361                    t.join(max(endtime - time.monotonic(), 0.01))
2362                started = [t for t in started if t.is_alive()]
2363                if not started:
2364                    break
2365                if verbose:
2366                    print('Unable to join %d threads during a period of '
2367                          '%d minutes' % (len(started), timeout))
2368        finally:
2369            started = [t for t in started if t.is_alive()]
2370            if started:
2371                faulthandler.dump_traceback(sys.stdout)
2372                raise AssertionError('Unable to join %d threads' % len(started))
2373
2374@contextlib.contextmanager
2375def swap_attr(obj, attr, new_val):
2376    """Temporary swap out an attribute with a new object.
2377
2378    Usage:
2379        with swap_attr(obj, "attr", 5):
2380            ...
2381
2382        This will set obj.attr to 5 for the duration of the with: block,
2383        restoring the old value at the end of the block. If `attr` doesn't
2384        exist on `obj`, it will be created and then deleted at the end of the
2385        block.
2386
2387        The old value (or None if it doesn't exist) will be assigned to the
2388        target of the "as" clause, if there is one.
2389    """
2390    if hasattr(obj, attr):
2391        real_val = getattr(obj, attr)
2392        setattr(obj, attr, new_val)
2393        try:
2394            yield real_val
2395        finally:
2396            setattr(obj, attr, real_val)
2397    else:
2398        setattr(obj, attr, new_val)
2399        try:
2400            yield
2401        finally:
2402            if hasattr(obj, attr):
2403                delattr(obj, attr)
2404
2405@contextlib.contextmanager
2406def swap_item(obj, item, new_val):
2407    """Temporary swap out an item with a new object.
2408
2409    Usage:
2410        with swap_item(obj, "item", 5):
2411            ...
2412
2413        This will set obj["item"] to 5 for the duration of the with: block,
2414        restoring the old value at the end of the block. If `item` doesn't
2415        exist on `obj`, it will be created and then deleted at the end of the
2416        block.
2417
2418        The old value (or None if it doesn't exist) will be assigned to the
2419        target of the "as" clause, if there is one.
2420    """
2421    if item in obj:
2422        real_val = obj[item]
2423        obj[item] = new_val
2424        try:
2425            yield real_val
2426        finally:
2427            obj[item] = real_val
2428    else:
2429        obj[item] = new_val
2430        try:
2431            yield
2432        finally:
2433            if item in obj:
2434                del obj[item]
2435
2436def strip_python_stderr(stderr):
2437    """Strip the stderr of a Python process from potential debug output
2438    emitted by the interpreter.
2439
2440    This will typically be run on the result of the communicate() method
2441    of a subprocess.Popen object.
2442    """
2443    stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
2444    return stderr
2445
2446requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
2447                        'types are immortal if COUNT_ALLOCS is defined')
2448
2449def args_from_interpreter_flags():
2450    """Return a list of command-line arguments reproducing the current
2451    settings in sys.flags and sys.warnoptions."""
2452    return subprocess._args_from_interpreter_flags()
2453
2454def optim_args_from_interpreter_flags():
2455    """Return a list of command-line arguments reproducing the current
2456    optimization settings in sys.flags."""
2457    return subprocess._optim_args_from_interpreter_flags()
2458
2459#============================================================
2460# Support for assertions about logging.
2461#============================================================
2462
2463class TestHandler(logging.handlers.BufferingHandler):
2464    def __init__(self, matcher):
2465        # BufferingHandler takes a "capacity" argument
2466        # so as to know when to flush. As we're overriding
2467        # shouldFlush anyway, we can set a capacity of zero.
2468        # You can call flush() manually to clear out the
2469        # buffer.
2470        logging.handlers.BufferingHandler.__init__(self, 0)
2471        self.matcher = matcher
2472
2473    def shouldFlush(self):
2474        return False
2475
2476    def emit(self, record):
2477        self.format(record)
2478        self.buffer.append(record.__dict__)
2479
2480    def matches(self, **kwargs):
2481        """
2482        Look for a saved dict whose keys/values match the supplied arguments.
2483        """
2484        result = False
2485        for d in self.buffer:
2486            if self.matcher.matches(d, **kwargs):
2487                result = True
2488                break
2489        return result
2490
2491class Matcher(object):
2492
2493    _partial_matches = ('msg', 'message')
2494
2495    def matches(self, d, **kwargs):
2496        """
2497        Try to match a single dict with the supplied arguments.
2498
2499        Keys whose values are strings and which are in self._partial_matches
2500        will be checked for partial (i.e. substring) matches. You can extend
2501        this scheme to (for example) do regular expression matching, etc.
2502        """
2503        result = True
2504        for k in kwargs:
2505            v = kwargs[k]
2506            dv = d.get(k)
2507            if not self.match_value(k, dv, v):
2508                result = False
2509                break
2510        return result
2511
2512    def match_value(self, k, dv, v):
2513        """
2514        Try to match a single stored value (dv) with a supplied value (v).
2515        """
2516        if type(v) != type(dv):
2517            result = False
2518        elif type(dv) is not str or k not in self._partial_matches:
2519            result = (v == dv)
2520        else:
2521            result = dv.find(v) >= 0
2522        return result
2523
2524
2525_can_symlink = None
2526def can_symlink():
2527    global _can_symlink
2528    if _can_symlink is not None:
2529        return _can_symlink
2530    symlink_path = TESTFN + "can_symlink"
2531    try:
2532        os.symlink(TESTFN, symlink_path)
2533        can = True
2534    except (OSError, NotImplementedError, AttributeError):
2535        can = False
2536    else:
2537        os.remove(symlink_path)
2538    _can_symlink = can
2539    return can
2540
2541def skip_unless_symlink(test):
2542    """Skip decorator for tests that require functional symlink"""
2543    ok = can_symlink()
2544    msg = "Requires functional symlink implementation"
2545    return test if ok else unittest.skip(msg)(test)
2546
2547_buggy_ucrt = None
2548def skip_if_buggy_ucrt_strfptime(test):
2549    """
2550    Skip decorator for tests that use buggy strptime/strftime
2551
2552    If the UCRT bugs are present time.localtime().tm_zone will be
2553    an empty string, otherwise we assume the UCRT bugs are fixed
2554
2555    See bpo-37552 [Windows] strptime/strftime return invalid
2556    results with UCRT version 17763.615
2557    """
2558    global _buggy_ucrt
2559    if _buggy_ucrt is None:
2560        if(sys.platform == 'win32' and
2561                locale.getdefaultlocale()[1]  == 'cp65001' and
2562                time.localtime().tm_zone == ''):
2563            _buggy_ucrt = True
2564        else:
2565            _buggy_ucrt = False
2566    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
2567
2568class PythonSymlink:
2569    """Creates a symlink for the current Python executable"""
2570    def __init__(self, link=None):
2571        self.link = link or os.path.abspath(TESTFN)
2572        self._linked = []
2573        self.real = os.path.realpath(sys.executable)
2574        self._also_link = []
2575
2576        self._env = None
2577
2578        self._platform_specific()
2579
2580    def _platform_specific(self):
2581        pass
2582
2583    if sys.platform == "win32":
2584        def _platform_specific(self):
2585            import _winapi
2586
2587            if os.path.lexists(self.real) and not os.path.exists(self.real):
2588                # App symlink appears to not exist, but we want the
2589                # real executable here anyway
2590                self.real = _winapi.GetModuleFileName(0)
2591
2592            dll = _winapi.GetModuleFileName(sys.dllhandle)
2593            src_dir = os.path.dirname(dll)
2594            dest_dir = os.path.dirname(self.link)
2595            self._also_link.append((
2596                dll,
2597                os.path.join(dest_dir, os.path.basename(dll))
2598            ))
2599            for runtime in glob.glob(os.path.join(src_dir, "vcruntime*.dll")):
2600                self._also_link.append((
2601                    runtime,
2602                    os.path.join(dest_dir, os.path.basename(runtime))
2603                ))
2604
2605            self._env = {k.upper(): os.getenv(k) for k in os.environ}
2606            self._env["PYTHONHOME"] = os.path.dirname(self.real)
2607            if sysconfig.is_python_build(True):
2608                self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
2609
2610    def __enter__(self):
2611        os.symlink(self.real, self.link)
2612        self._linked.append(self.link)
2613        for real, link in self._also_link:
2614            os.symlink(real, link)
2615            self._linked.append(link)
2616        return self
2617
2618    def __exit__(self, exc_type, exc_value, exc_tb):
2619        for link in self._linked:
2620            try:
2621                os.remove(link)
2622            except IOError as ex:
2623                if verbose:
2624                    print("failed to clean up {}: {}".format(link, ex))
2625
2626    def _call(self, python, args, env, returncode):
2627        cmd = [python, *args]
2628        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
2629                             stderr=subprocess.PIPE, env=env)
2630        r = p.communicate()
2631        if p.returncode != returncode:
2632            if verbose:
2633                print(repr(r[0]))
2634                print(repr(r[1]), file=sys.stderr)
2635            raise RuntimeError(
2636                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
2637        return r
2638
2639    def call_real(self, *args, returncode=0):
2640        return self._call(self.real, args, None, returncode)
2641
2642    def call_link(self, *args, returncode=0):
2643        return self._call(self.link, args, self._env, returncode)
2644
2645
2646_can_xattr = None
2647def can_xattr():
2648    global _can_xattr
2649    if _can_xattr is not None:
2650        return _can_xattr
2651    if not hasattr(os, "setxattr"):
2652        can = False
2653    else:
2654        tmp_dir = tempfile.mkdtemp()
2655        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
2656        try:
2657            with open(TESTFN, "wb") as fp:
2658                try:
2659                    # TESTFN & tempfile may use different file systems with
2660                    # different capabilities
2661                    os.setxattr(tmp_fp, b"user.test", b"")
2662                    os.setxattr(tmp_name, b"trusted.foo", b"42")
2663                    os.setxattr(fp.fileno(), b"user.test", b"")
2664                    # Kernels < 2.6.39 don't respect setxattr flags.
2665                    kernel_version = platform.release()
2666                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
2667                    can = m is None or int(m.group(1)) >= 39
2668                except OSError:
2669                    can = False
2670        finally:
2671            unlink(TESTFN)
2672            unlink(tmp_name)
2673            rmdir(tmp_dir)
2674    _can_xattr = can
2675    return can
2676
2677def skip_unless_xattr(test):
2678    """Skip decorator for tests that require functional extended attributes"""
2679    ok = can_xattr()
2680    msg = "no non-broken extended attribute support"
2681    return test if ok else unittest.skip(msg)(test)
2682
2683def skip_if_pgo_task(test):
2684    """Skip decorator for tests not run in (non-extended) PGO task"""
2685    ok = not PGO or PGO_EXTENDED
2686    msg = "Not run for (non-extended) PGO task"
2687    return test if ok else unittest.skip(msg)(test)
2688
2689_bind_nix_socket_error = None
2690def skip_unless_bind_unix_socket(test):
2691    """Decorator for tests requiring a functional bind() for unix sockets."""
2692    if not hasattr(socket, 'AF_UNIX'):
2693        return unittest.skip('No UNIX Sockets')(test)
2694    global _bind_nix_socket_error
2695    if _bind_nix_socket_error is None:
2696        path = TESTFN + "can_bind_unix_socket"
2697        with socket.socket(socket.AF_UNIX) as sock:
2698            try:
2699                sock.bind(path)
2700                _bind_nix_socket_error = False
2701            except OSError as e:
2702                _bind_nix_socket_error = e
2703            finally:
2704                unlink(path)
2705    if _bind_nix_socket_error:
2706        msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
2707        return unittest.skip(msg)(test)
2708    else:
2709        return test
2710
2711
2712def fs_is_case_insensitive(directory):
2713    """Detects if the file system for the specified directory is case-insensitive."""
2714    with tempfile.NamedTemporaryFile(dir=directory) as base:
2715        base_path = base.name
2716        case_path = base_path.upper()
2717        if case_path == base_path:
2718            case_path = base_path.lower()
2719        try:
2720            return os.path.samefile(base_path, case_path)
2721        except FileNotFoundError:
2722            return False
2723
2724
2725def detect_api_mismatch(ref_api, other_api, *, ignore=()):
2726    """Returns the set of items in ref_api not in other_api, except for a
2727    defined list of items to be ignored in this check.
2728
2729    By default this skips private attributes beginning with '_' but
2730    includes all magic methods, i.e. those starting and ending in '__'.
2731    """
2732    missing_items = set(dir(ref_api)) - set(dir(other_api))
2733    if ignore:
2734        missing_items -= set(ignore)
2735    missing_items = set(m for m in missing_items
2736                        if not m.startswith('_') or m.endswith('__'))
2737    return missing_items
2738
2739
2740def check__all__(test_case, module, name_of_module=None, extra=(),
2741                 blacklist=()):
2742    """Assert that the __all__ variable of 'module' contains all public names.
2743
2744    The module's public names (its API) are detected automatically based on
2745    whether they match the public name convention and were defined in
2746    'module'.
2747
2748    The 'name_of_module' argument can specify (as a string or tuple thereof)
2749    what module(s) an API could be defined in in order to be detected as a
2750    public API. One case for this is when 'module' imports part of its public
2751    API from other modules, possibly a C backend (like 'csv' and its '_csv').
2752
2753    The 'extra' argument can be a set of names that wouldn't otherwise be
2754    automatically detected as "public", like objects without a proper
2755    '__module__' attribute. If provided, it will be added to the
2756    automatically detected ones.
2757
2758    The 'blacklist' argument can be a set of names that must not be treated
2759    as part of the public API even though their names indicate otherwise.
2760
2761    Usage:
2762        import bar
2763        import foo
2764        import unittest
2765        from test import support
2766
2767        class MiscTestCase(unittest.TestCase):
2768            def test__all__(self):
2769                support.check__all__(self, foo)
2770
2771        class OtherTestCase(unittest.TestCase):
2772            def test__all__(self):
2773                extra = {'BAR_CONST', 'FOO_CONST'}
2774                blacklist = {'baz'}  # Undocumented name.
2775                # bar imports part of its API from _bar.
2776                support.check__all__(self, bar, ('bar', '_bar'),
2777                                     extra=extra, blacklist=blacklist)
2778
2779    """
2780
2781    if name_of_module is None:
2782        name_of_module = (module.__name__, )
2783    elif isinstance(name_of_module, str):
2784        name_of_module = (name_of_module, )
2785
2786    expected = set(extra)
2787
2788    for name in dir(module):
2789        if name.startswith('_') or name in blacklist:
2790            continue
2791        obj = getattr(module, name)
2792        if (getattr(obj, '__module__', None) in name_of_module or
2793                (not hasattr(obj, '__module__') and
2794                 not isinstance(obj, types.ModuleType))):
2795            expected.add(name)
2796    test_case.assertCountEqual(module.__all__, expected)
2797
2798
2799class SuppressCrashReport:
2800    """Try to prevent a crash report from popping up.
2801
2802    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
2803    disable the creation of coredump file.
2804    """
2805    old_value = None
2806    old_modes = None
2807
2808    def __enter__(self):
2809        """On Windows, disable Windows Error Reporting dialogs using
2810        SetErrorMode.
2811
2812        On UNIX, try to save the previous core file size limit, then set
2813        soft limit to 0.
2814        """
2815        if sys.platform.startswith('win'):
2816            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2817            # GetErrorMode is not available on Windows XP and Windows Server 2003,
2818            # but SetErrorMode returns the previous value, so we can use that
2819            import ctypes
2820            self._k32 = ctypes.windll.kernel32
2821            SEM_NOGPFAULTERRORBOX = 0x02
2822            self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
2823            self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
2824
2825            # Suppress assert dialogs in debug builds
2826            # (see http://bugs.python.org/issue23314)
2827            try:
2828                import msvcrt
2829                msvcrt.CrtSetReportMode
2830            except (AttributeError, ImportError):
2831                # no msvcrt or a release build
2832                pass
2833            else:
2834                self.old_modes = {}
2835                for report_type in [msvcrt.CRT_WARN,
2836                                    msvcrt.CRT_ERROR,
2837                                    msvcrt.CRT_ASSERT]:
2838                    old_mode = msvcrt.CrtSetReportMode(report_type,
2839                            msvcrt.CRTDBG_MODE_FILE)
2840                    old_file = msvcrt.CrtSetReportFile(report_type,
2841                            msvcrt.CRTDBG_FILE_STDERR)
2842                    self.old_modes[report_type] = old_mode, old_file
2843
2844        else:
2845            if resource is not None:
2846                try:
2847                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2848                    resource.setrlimit(resource.RLIMIT_CORE,
2849                                       (0, self.old_value[1]))
2850                except (ValueError, OSError):
2851                    pass
2852
2853            if sys.platform == 'darwin':
2854                # Check if the 'Crash Reporter' on OSX was configured
2855                # in 'Developer' mode and warn that it will get triggered
2856                # when it is.
2857                #
2858                # This assumes that this context manager is used in tests
2859                # that might trigger the next manager.
2860                cmd = ['/usr/bin/defaults', 'read',
2861                       'com.apple.CrashReporter', 'DialogType']
2862                proc = subprocess.Popen(cmd,
2863                                        stdout=subprocess.PIPE,
2864                                        stderr=subprocess.PIPE)
2865                with proc:
2866                    stdout = proc.communicate()[0]
2867                if stdout.strip() == b'developer':
2868                    print("this test triggers the Crash Reporter, "
2869                          "that is intentional", end='', flush=True)
2870
2871        return self
2872
2873    def __exit__(self, *ignore_exc):
2874        """Restore Windows ErrorMode or core file behavior to initial value."""
2875        if self.old_value is None:
2876            return
2877
2878        if sys.platform.startswith('win'):
2879            self._k32.SetErrorMode(self.old_value)
2880
2881            if self.old_modes:
2882                import msvcrt
2883                for report_type, (old_mode, old_file) in self.old_modes.items():
2884                    msvcrt.CrtSetReportMode(report_type, old_mode)
2885                    msvcrt.CrtSetReportFile(report_type, old_file)
2886        else:
2887            if resource is not None:
2888                try:
2889                    resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2890                except (ValueError, OSError):
2891                    pass
2892
2893
2894def patch(test_instance, object_to_patch, attr_name, new_value):
2895    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2896
2897    Also, add a cleanup procedure to 'test_instance' to restore
2898    'object_to_patch' value for 'attr_name'.
2899    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2900
2901    """
2902    # check that 'attr_name' is a real attribute for 'object_to_patch'
2903    # will raise AttributeError if it does not exist
2904    getattr(object_to_patch, attr_name)
2905
2906    # keep a copy of the old value
2907    attr_is_local = False
2908    try:
2909        old_value = object_to_patch.__dict__[attr_name]
2910    except (AttributeError, KeyError):
2911        old_value = getattr(object_to_patch, attr_name, None)
2912    else:
2913        attr_is_local = True
2914
2915    # restore the value when the test is done
2916    def cleanup():
2917        if attr_is_local:
2918            setattr(object_to_patch, attr_name, old_value)
2919        else:
2920            delattr(object_to_patch, attr_name)
2921
2922    test_instance.addCleanup(cleanup)
2923
2924    # actually override the attribute
2925    setattr(object_to_patch, attr_name, new_value)
2926
2927
2928def run_in_subinterp(code):
2929    """
2930    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2931    module is enabled.
2932    """
2933    # Issue #10915, #15751: PyGILState_*() functions don't work with
2934    # sub-interpreters, the tracemalloc module uses these functions internally
2935    try:
2936        import tracemalloc
2937    except ImportError:
2938        pass
2939    else:
2940        if tracemalloc.is_tracing():
2941            raise unittest.SkipTest("run_in_subinterp() cannot be used "
2942                                     "if tracemalloc module is tracing "
2943                                     "memory allocations")
2944    import _testcapi
2945    return _testcapi.run_in_subinterp(code)
2946
2947
2948def check_free_after_iterating(test, iter, cls, args=()):
2949    class A(cls):
2950        def __del__(self):
2951            nonlocal done
2952            done = True
2953            try:
2954                next(it)
2955            except StopIteration:
2956                pass
2957
2958    done = False
2959    it = iter(A(*args))
2960    # Issue 26494: Shouldn't crash
2961    test.assertRaises(StopIteration, next, it)
2962    # The sequence should be deallocated just after the end of iterating
2963    gc_collect()
2964    test.assertTrue(done)
2965
2966
2967def missing_compiler_executable(cmd_names=[]):
2968    """Check if the compiler components used to build the interpreter exist.
2969
2970    Check for the existence of the compiler executables whose names are listed
2971    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
2972    and return the first missing executable or None when none is found
2973    missing.
2974
2975    """
2976    from distutils import ccompiler, sysconfig, spawn
2977    compiler = ccompiler.new_compiler()
2978    sysconfig.customize_compiler(compiler)
2979    for name in compiler.executables:
2980        if cmd_names and name not in cmd_names:
2981            continue
2982        cmd = getattr(compiler, name)
2983        if cmd_names:
2984            assert cmd is not None, \
2985                    "the '%s' executable is not configured" % name
2986        elif not cmd:
2987            continue
2988        if spawn.find_executable(cmd[0]) is None:
2989            return cmd[0]
2990
2991
2992_is_android_emulator = None
2993def setswitchinterval(interval):
2994    # Setting a very low gil interval on the Android emulator causes python
2995    # to hang (issue #26939).
2996    minimum_interval = 1e-5
2997    if is_android and interval < minimum_interval:
2998        global _is_android_emulator
2999        if _is_android_emulator is None:
3000            _is_android_emulator = (subprocess.check_output(
3001                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
3002        if _is_android_emulator:
3003            interval = minimum_interval
3004    return sys.setswitchinterval(interval)
3005
3006
3007@contextlib.contextmanager
3008def disable_faulthandler():
3009    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
3010    # sys.stderr with a StringIO which has no file descriptor when a test
3011    # is run with -W/--verbose3.
3012    fd = sys.__stderr__.fileno()
3013
3014    is_enabled = faulthandler.is_enabled()
3015    try:
3016        faulthandler.disable()
3017        yield
3018    finally:
3019        if is_enabled:
3020            faulthandler.enable(file=fd, all_threads=True)
3021
3022
3023def fd_count():
3024    """Count the number of open file descriptors.
3025    """
3026    if sys.platform.startswith(('linux', 'freebsd')):
3027        try:
3028            names = os.listdir("/proc/self/fd")
3029            # Subtract one because listdir() internally opens a file
3030            # descriptor to list the content of the /proc/self/fd/ directory.
3031            return len(names) - 1
3032        except FileNotFoundError:
3033            pass
3034
3035    MAXFD = 256
3036    if hasattr(os, 'sysconf'):
3037        try:
3038            MAXFD = os.sysconf("SC_OPEN_MAX")
3039        except OSError:
3040            pass
3041
3042    old_modes = None
3043    if sys.platform == 'win32':
3044        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
3045        # on invalid file descriptor if Python is compiled in debug mode
3046        try:
3047            import msvcrt
3048            msvcrt.CrtSetReportMode
3049        except (AttributeError, ImportError):
3050            # no msvcrt or a release build
3051            pass
3052        else:
3053            old_modes = {}
3054            for report_type in (msvcrt.CRT_WARN,
3055                                msvcrt.CRT_ERROR,
3056                                msvcrt.CRT_ASSERT):
3057                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
3058
3059    try:
3060        count = 0
3061        for fd in range(MAXFD):
3062            try:
3063                # Prefer dup() over fstat(). fstat() can require input/output
3064                # whereas dup() doesn't.
3065                fd2 = os.dup(fd)
3066            except OSError as e:
3067                if e.errno != errno.EBADF:
3068                    raise
3069            else:
3070                os.close(fd2)
3071                count += 1
3072    finally:
3073        if old_modes is not None:
3074            for report_type in (msvcrt.CRT_WARN,
3075                                msvcrt.CRT_ERROR,
3076                                msvcrt.CRT_ASSERT):
3077                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
3078
3079    return count
3080
3081
3082class SaveSignals:
3083    """
3084    Save and restore signal handlers.
3085
3086    This class is only able to save/restore signal handlers registered
3087    by the Python signal module: see bpo-13285 for "external" signal
3088    handlers.
3089    """
3090
3091    def __init__(self):
3092        import signal
3093        self.signal = signal
3094        self.signals = signal.valid_signals()
3095        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
3096        for signame in ('SIGKILL', 'SIGSTOP'):
3097            try:
3098                signum = getattr(signal, signame)
3099            except AttributeError:
3100                continue
3101            self.signals.remove(signum)
3102        self.handlers = {}
3103
3104    def save(self):
3105        for signum in self.signals:
3106            handler = self.signal.getsignal(signum)
3107            if handler is None:
3108                # getsignal() returns None if a signal handler was not
3109                # registered by the Python signal module,
3110                # and the handler is not SIG_DFL nor SIG_IGN.
3111                #
3112                # Ignore the signal: we cannot restore the handler.
3113                continue
3114            self.handlers[signum] = handler
3115
3116    def restore(self):
3117        for signum, handler in self.handlers.items():
3118            self.signal.signal(signum, handler)
3119
3120
3121def with_pymalloc():
3122    import _testcapi
3123    return _testcapi.WITH_PYMALLOC
3124
3125
3126class FakePath:
3127    """Simple implementing of the path protocol.
3128    """
3129    def __init__(self, path):
3130        self.path = path
3131
3132    def __repr__(self):
3133        return f'<FakePath {self.path!r}>'
3134
3135    def __fspath__(self):
3136        if (isinstance(self.path, BaseException) or
3137            isinstance(self.path, type) and
3138                issubclass(self.path, BaseException)):
3139            raise self.path
3140        else:
3141            return self.path
3142
3143
3144class _ALWAYS_EQ:
3145    """
3146    Object that is equal to anything.
3147    """
3148    def __eq__(self, other):
3149        return True
3150    def __ne__(self, other):
3151        return False
3152
3153ALWAYS_EQ = _ALWAYS_EQ()
3154
3155@functools.total_ordering
3156class _LARGEST:
3157    """
3158    Object that is greater than anything (except itself).
3159    """
3160    def __eq__(self, other):
3161        return isinstance(other, _LARGEST)
3162    def __lt__(self, other):
3163        return False
3164
3165LARGEST = _LARGEST()
3166
3167@functools.total_ordering
3168class _SMALLEST:
3169    """
3170    Object that is less than anything (except itself).
3171    """
3172    def __eq__(self, other):
3173        return isinstance(other, _SMALLEST)
3174    def __gt__(self, other):
3175        return False
3176
3177SMALLEST = _SMALLEST()
3178
3179def maybe_get_event_loop_policy():
3180    """Return the global event loop policy if one is set, else return None."""
3181    return asyncio.events._event_loop_policy
3182
3183# Helpers for testing hashing.
3184NHASHBITS = sys.hash_info.width # number of bits in hash() result
3185assert NHASHBITS in (32, 64)
3186
3187# Return mean and sdev of number of collisions when tossing nballs balls
3188# uniformly at random into nbins bins.  By definition, the number of
3189# collisions is the number of balls minus the number of occupied bins at
3190# the end.
3191def collision_stats(nbins, nballs):
3192    n, k = nbins, nballs
3193    # prob a bin empty after k trials = (1 - 1/n)**k
3194    # mean # empty is then n * (1 - 1/n)**k
3195    # so mean # occupied is n - n * (1 - 1/n)**k
3196    # so collisions = k - (n - n*(1 - 1/n)**k)
3197    #
3198    # For the variance:
3199    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
3200    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
3201    #
3202    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
3203    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
3204    # error-prone) efforts to rework the naive formulas to avoid those,
3205    # we use the `decimal` module to get plenty of extra precision.
3206    #
3207    # Note:  the exact values are straightforward to compute with
3208    # rationals, but in context that's unbearably slow, requiring
3209    # multi-million bit arithmetic.
3210    import decimal
3211    with decimal.localcontext() as ctx:
3212        bits = n.bit_length() * 2  # bits in n**2
3213        # At least that many bits will likely cancel out.
3214        # Use that many decimal digits instead.
3215        ctx.prec = max(bits, 30)
3216        dn = decimal.Decimal(n)
3217        p1empty = ((dn - 1) / dn) ** k
3218        meanempty = n * p1empty
3219        occupied = n - meanempty
3220        collisions = k - occupied
3221        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
3222        return float(collisions), float(var.sqrt())
3223
3224
3225class catch_unraisable_exception:
3226    """
3227    Context manager catching unraisable exception using sys.unraisablehook.
3228
3229    Storing the exception value (cm.unraisable.exc_value) creates a reference
3230    cycle. The reference cycle is broken explicitly when the context manager
3231    exits.
3232
3233    Storing the object (cm.unraisable.object) can resurrect it if it is set to
3234    an object which is being finalized. Exiting the context manager clears the
3235    stored object.
3236
3237    Usage:
3238
3239        with support.catch_unraisable_exception() as cm:
3240            # code creating an "unraisable exception"
3241            ...
3242
3243            # check the unraisable exception: use cm.unraisable
3244            ...
3245
3246        # cm.unraisable attribute no longer exists at this point
3247        # (to break a reference cycle)
3248    """
3249
3250    def __init__(self):
3251        self.unraisable = None
3252        self._old_hook = None
3253
3254    def _hook(self, unraisable):
3255        # Storing unraisable.object can resurrect an object which is being
3256        # finalized. Storing unraisable.exc_value creates a reference cycle.
3257        self.unraisable = unraisable
3258
3259    def __enter__(self):
3260        self._old_hook = sys.unraisablehook
3261        sys.unraisablehook = self._hook
3262        return self
3263
3264    def __exit__(self, *exc_info):
3265        sys.unraisablehook = self._old_hook
3266        del self.unraisable
3267
3268
3269class catch_threading_exception:
3270    """
3271    Context manager catching threading.Thread exception using
3272    threading.excepthook.
3273
3274    Attributes set when an exception is catched:
3275
3276    * exc_type
3277    * exc_value
3278    * exc_traceback
3279    * thread
3280
3281    See threading.excepthook() documentation for these attributes.
3282
3283    These attributes are deleted at the context manager exit.
3284
3285    Usage:
3286
3287        with support.catch_threading_exception() as cm:
3288            # code spawning a thread which raises an exception
3289            ...
3290
3291            # check the thread exception, use cm attributes:
3292            # exc_type, exc_value, exc_traceback, thread
3293            ...
3294
3295        # exc_type, exc_value, exc_traceback, thread attributes of cm no longer
3296        # exists at this point
3297        # (to avoid reference cycles)
3298    """
3299
3300    def __init__(self):
3301        self.exc_type = None
3302        self.exc_value = None
3303        self.exc_traceback = None
3304        self.thread = None
3305        self._old_hook = None
3306
3307    def _hook(self, args):
3308        self.exc_type = args.exc_type
3309        self.exc_value = args.exc_value
3310        self.exc_traceback = args.exc_traceback
3311        self.thread = args.thread
3312
3313    def __enter__(self):
3314        self._old_hook = threading.excepthook
3315        threading.excepthook = self._hook
3316        return self
3317
3318    def __exit__(self, *exc_info):
3319        threading.excepthook = self._old_hook
3320        del self.exc_type
3321        del self.exc_value
3322        del self.exc_traceback
3323        del self.thread
3324