• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import collections.abc
7import contextlib
8import errno
9import faulthandler
10import fnmatch
11import functools
12import gc
13import importlib
14import importlib.util
15import logging.handlers
16import nntplib
17import os
18import platform
19import re
20import shutil
21import socket
22import stat
23import struct
24import subprocess
25import sys
26import sysconfig
27import tempfile
28import time
29import types
30import unittest
31import urllib.error
32import warnings
33
34try:
35    import _thread, threading
36except ImportError:
37    _thread = None
38    threading = None
39try:
40    import multiprocessing.process
41except ImportError:
42    multiprocessing = None
43
44try:
45    import zlib
46except ImportError:
47    zlib = None
48
49try:
50    import gzip
51except ImportError:
52    gzip = None
53
54try:
55    import bz2
56except ImportError:
57    bz2 = None
58
59try:
60    import lzma
61except ImportError:
62    lzma = None
63
64try:
65    import resource
66except ImportError:
67    resource = None
68
69__all__ = [
70    # globals
71    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
72    # exceptions
73    "Error", "TestFailed", "ResourceDenied",
74    # imports
75    "import_module", "import_fresh_module", "CleanImport",
76    # modules
77    "unload", "forget",
78    # io
79    "record_original_stdout", "get_original_stdout", "captured_stdout",
80    "captured_stdin", "captured_stderr",
81    # filesystem
82    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
83    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
84    # unittest
85    "is_resource_enabled", "requires", "requires_freebsd_version",
86    "requires_linux_version", "requires_mac_ver", "check_syntax_error",
87    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
88    "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
89    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
90    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
91    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
92    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
93    "check__all__", "requires_android_level", "requires_multiprocessing_queue",
94    # sys
95    "is_jython", "is_android", "check_impl_detail", "unix_shell",
96    "setswitchinterval", "android_not_root",
97    # network
98    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
99    "bind_unix_socket",
100    # processes
101    'temp_umask', "reap_children",
102    # logging
103    "TestHandler",
104    # threads
105    "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
106    # miscellaneous
107    "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
108    "run_with_locale", "swap_item",
109    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
110    "run_with_tz", "PGO", "missing_compiler_executable",
111    ]
112
113class Error(Exception):
114    """Base class for regression test exceptions."""
115
116class TestFailed(Error):
117    """Test failed."""
118
119class ResourceDenied(unittest.SkipTest):
120    """Test skipped because it requested a disallowed resource.
121
122    This is raised when a test calls requires() for a resource that
123    has not be enabled.  It is used to distinguish between expected
124    and unexpected skips.
125    """
126
127@contextlib.contextmanager
128def _ignore_deprecated_imports(ignore=True):
129    """Context manager to suppress package and module deprecation
130    warnings when importing them.
131
132    If ignore is False, this context manager has no effect.
133    """
134    if ignore:
135        with warnings.catch_warnings():
136            warnings.filterwarnings("ignore", ".+ (module|package)",
137                                    DeprecationWarning)
138            yield
139    else:
140        yield
141
142
143def import_module(name, deprecated=False, *, required_on=()):
144    """Import and return the module to be tested, raising SkipTest if
145    it is not available.
146
147    If deprecated is True, any module or package deprecation messages
148    will be suppressed. If a module is required on a platform but optional for
149    others, set required_on to an iterable of platform prefixes which will be
150    compared against sys.platform.
151    """
152    with _ignore_deprecated_imports(deprecated):
153        try:
154            return importlib.import_module(name)
155        except ImportError as msg:
156            if sys.platform.startswith(tuple(required_on)):
157                raise
158            raise unittest.SkipTest(str(msg))
159
160
161def _save_and_remove_module(name, orig_modules):
162    """Helper function to save and remove a module from sys.modules
163
164    Raise ImportError if the module can't be imported.
165    """
166    # try to import the module and raise an error if it can't be imported
167    if name not in sys.modules:
168        __import__(name)
169        del sys.modules[name]
170    for modname in list(sys.modules):
171        if modname == name or modname.startswith(name + '.'):
172            orig_modules[modname] = sys.modules[modname]
173            del sys.modules[modname]
174
175def _save_and_block_module(name, orig_modules):
176    """Helper function to save and block a module in sys.modules
177
178    Return True if the module was in sys.modules, False otherwise.
179    """
180    saved = True
181    try:
182        orig_modules[name] = sys.modules[name]
183    except KeyError:
184        saved = False
185    sys.modules[name] = None
186    return saved
187
188
189def anticipate_failure(condition):
190    """Decorator to mark a test that is known to be broken in some cases
191
192       Any use of this decorator should have a comment identifying the
193       associated tracker issue.
194    """
195    if condition:
196        return unittest.expectedFailure
197    return lambda f: f
198
199def load_package_tests(pkg_dir, loader, standard_tests, pattern):
200    """Generic load_tests implementation for simple test packages.
201
202    Most packages can implement load_tests using this function as follows:
203
204       def load_tests(*args):
205           return load_package_tests(os.path.dirname(__file__), *args)
206    """
207    if pattern is None:
208        pattern = "test*"
209    top_dir = os.path.dirname(              # Lib
210                  os.path.dirname(              # test
211                      os.path.dirname(__file__)))   # support
212    package_tests = loader.discover(start_dir=pkg_dir,
213                                    top_level_dir=top_dir,
214                                    pattern=pattern)
215    standard_tests.addTests(package_tests)
216    return standard_tests
217
218
219def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
220    """Import and return a module, deliberately bypassing sys.modules.
221
222    This function imports and returns a fresh copy of the named Python module
223    by removing the named module from sys.modules before doing the import.
224    Note that unlike reload, the original module is not affected by
225    this operation.
226
227    *fresh* is an iterable of additional module names that are also removed
228    from the sys.modules cache before doing the import.
229
230    *blocked* is an iterable of module names that are replaced with None
231    in the module cache during the import to ensure that attempts to import
232    them raise ImportError.
233
234    The named module and any modules named in the *fresh* and *blocked*
235    parameters are saved before starting the import and then reinserted into
236    sys.modules when the fresh import is complete.
237
238    Module and package deprecation messages are suppressed during this import
239    if *deprecated* is True.
240
241    This function will raise ImportError if the named module cannot be
242    imported.
243    """
244    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
245    # to make sure that this utility function is working as expected
246    with _ignore_deprecated_imports(deprecated):
247        # Keep track of modules saved for later restoration as well
248        # as those which just need a blocking entry removed
249        orig_modules = {}
250        names_to_remove = []
251        _save_and_remove_module(name, orig_modules)
252        try:
253            for fresh_name in fresh:
254                _save_and_remove_module(fresh_name, orig_modules)
255            for blocked_name in blocked:
256                if not _save_and_block_module(blocked_name, orig_modules):
257                    names_to_remove.append(blocked_name)
258            fresh_module = importlib.import_module(name)
259        except ImportError:
260            fresh_module = None
261        finally:
262            for orig_name, module in orig_modules.items():
263                sys.modules[orig_name] = module
264            for name_to_remove in names_to_remove:
265                del sys.modules[name_to_remove]
266        return fresh_module
267
268
269def get_attribute(obj, name):
270    """Get an attribute, raising SkipTest if AttributeError is raised."""
271    try:
272        attribute = getattr(obj, name)
273    except AttributeError:
274        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
275    else:
276        return attribute
277
278verbose = 1              # Flag set to 0 by regrtest.py
279use_resources = None     # Flag set to [] by regrtest.py
280max_memuse = 0           # Disable bigmem tests (they will still be run with
281                         # small sizes, to make sure they work.)
282real_max_memuse = 0
283failfast = False
284match_tests = None
285
286# _original_stdout is meant to hold stdout at the time regrtest began.
287# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
288# The point is to have some flavor of stdout the user can actually see.
289_original_stdout = None
290def record_original_stdout(stdout):
291    global _original_stdout
292    _original_stdout = stdout
293
294def get_original_stdout():
295    return _original_stdout or sys.stdout
296
297def unload(name):
298    try:
299        del sys.modules[name]
300    except KeyError:
301        pass
302
303def _force_run(path, func, *args):
304    try:
305        return func(*args)
306    except OSError as err:
307        if verbose >= 2:
308            print('%s: %s' % (err.__class__.__name__, err))
309            print('re-run %s%r' % (func.__name__, args))
310        os.chmod(path, stat.S_IRWXU)
311        return func(*args)
312
313if sys.platform.startswith("win"):
314    def _waitfor(func, pathname, waitall=False):
315        # Perform the operation
316        func(pathname)
317        # Now setup the wait loop
318        if waitall:
319            dirname = pathname
320        else:
321            dirname, name = os.path.split(pathname)
322            dirname = dirname or '.'
323        # Check for `pathname` to be removed from the filesystem.
324        # The exponential backoff of the timeout amounts to a total
325        # of ~1 second after which the deletion is probably an error
326        # anyway.
327        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
328        # required when contention occurs.
329        timeout = 0.001
330        while timeout < 1.0:
331            # Note we are only testing for the existence of the file(s) in
332            # the contents of the directory regardless of any security or
333            # access rights.  If we have made it this far, we have sufficient
334            # permissions to do that much using Python's equivalent of the
335            # Windows API FindFirstFile.
336            # Other Windows APIs can fail or give incorrect results when
337            # dealing with files that are pending deletion.
338            L = os.listdir(dirname)
339            if not (L if waitall else name in L):
340                return
341            # Increase the timeout and try again
342            time.sleep(timeout)
343            timeout *= 2
344        warnings.warn('tests may fail, delete still pending for ' + pathname,
345                      RuntimeWarning, stacklevel=4)
346
347    def _unlink(filename):
348        _waitfor(os.unlink, filename)
349
350    def _rmdir(dirname):
351        _waitfor(os.rmdir, dirname)
352
353    def _rmtree(path):
354        def _rmtree_inner(path):
355            for name in _force_run(path, os.listdir, path):
356                fullname = os.path.join(path, name)
357                try:
358                    mode = os.lstat(fullname).st_mode
359                except OSError as exc:
360                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
361                          file=sys.__stderr__)
362                    mode = 0
363                if stat.S_ISDIR(mode):
364                    _waitfor(_rmtree_inner, fullname, waitall=True)
365                    _force_run(fullname, os.rmdir, fullname)
366                else:
367                    _force_run(fullname, os.unlink, fullname)
368        _waitfor(_rmtree_inner, path, waitall=True)
369        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
370else:
371    _unlink = os.unlink
372    _rmdir = os.rmdir
373
374    def _rmtree(path):
375        try:
376            shutil.rmtree(path)
377            return
378        except OSError:
379            pass
380
381        def _rmtree_inner(path):
382            for name in _force_run(path, os.listdir, path):
383                fullname = os.path.join(path, name)
384                try:
385                    mode = os.lstat(fullname).st_mode
386                except OSError:
387                    mode = 0
388                if stat.S_ISDIR(mode):
389                    _rmtree_inner(fullname)
390                    _force_run(path, os.rmdir, fullname)
391                else:
392                    _force_run(path, os.unlink, fullname)
393        _rmtree_inner(path)
394        os.rmdir(path)
395
396def unlink(filename):
397    try:
398        _unlink(filename)
399    except (FileNotFoundError, NotADirectoryError):
400        pass
401
402def rmdir(dirname):
403    try:
404        _rmdir(dirname)
405    except FileNotFoundError:
406        pass
407
408def rmtree(path):
409    try:
410        _rmtree(path)
411    except FileNotFoundError:
412        pass
413
414def make_legacy_pyc(source):
415    """Move a PEP 3147/488 pyc file to its legacy pyc location.
416
417    :param source: The file system path to the source file.  The source file
418        does not need to exist, however the PEP 3147/488 pyc file must exist.
419    :return: The file system path to the legacy pyc file.
420    """
421    pyc_file = importlib.util.cache_from_source(source)
422    up_one = os.path.dirname(os.path.abspath(source))
423    legacy_pyc = os.path.join(up_one, source + 'c')
424    os.rename(pyc_file, legacy_pyc)
425    return legacy_pyc
426
427def forget(modname):
428    """'Forget' a module was ever imported.
429
430    This removes the module from sys.modules and deletes any PEP 3147/488 or
431    legacy .pyc files.
432    """
433    unload(modname)
434    for dirname in sys.path:
435        source = os.path.join(dirname, modname + '.py')
436        # It doesn't matter if they exist or not, unlink all possible
437        # combinations of PEP 3147/488 and legacy pyc files.
438        unlink(source + 'c')
439        for opt in ('', 1, 2):
440            unlink(importlib.util.cache_from_source(source, optimization=opt))
441
442# Check whether a gui is actually available
443def _is_gui_available():
444    if hasattr(_is_gui_available, 'result'):
445        return _is_gui_available.result
446    reason = None
447    if sys.platform.startswith('win'):
448        # if Python is running as a service (such as the buildbot service),
449        # gui interaction may be disallowed
450        import ctypes
451        import ctypes.wintypes
452        UOI_FLAGS = 1
453        WSF_VISIBLE = 0x0001
454        class USEROBJECTFLAGS(ctypes.Structure):
455            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
456                        ("fReserved", ctypes.wintypes.BOOL),
457                        ("dwFlags", ctypes.wintypes.DWORD)]
458        dll = ctypes.windll.user32
459        h = dll.GetProcessWindowStation()
460        if not h:
461            raise ctypes.WinError()
462        uof = USEROBJECTFLAGS()
463        needed = ctypes.wintypes.DWORD()
464        res = dll.GetUserObjectInformationW(h,
465            UOI_FLAGS,
466            ctypes.byref(uof),
467            ctypes.sizeof(uof),
468            ctypes.byref(needed))
469        if not res:
470            raise ctypes.WinError()
471        if not bool(uof.dwFlags & WSF_VISIBLE):
472            reason = "gui not available (WSF_VISIBLE flag not set)"
473    elif sys.platform == 'darwin':
474        # The Aqua Tk implementations on OS X can abort the process if
475        # being called in an environment where a window server connection
476        # cannot be made, for instance when invoked by a buildbot or ssh
477        # process not running under the same user id as the current console
478        # user.  To avoid that, raise an exception if the window manager
479        # connection is not available.
480        from ctypes import cdll, c_int, pointer, Structure
481        from ctypes.util import find_library
482
483        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
484
485        if app_services.CGMainDisplayID() == 0:
486            reason = "gui tests cannot run without OS X window manager"
487        else:
488            class ProcessSerialNumber(Structure):
489                _fields_ = [("highLongOfPSN", c_int),
490                            ("lowLongOfPSN", c_int)]
491            psn = ProcessSerialNumber()
492            psn_p = pointer(psn)
493            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
494                  (app_services.SetFrontProcess(psn_p) < 0) ):
495                reason = "cannot run without OS X gui process"
496
497    # check on every platform whether tkinter can actually do anything
498    if not reason:
499        try:
500            from tkinter import Tk
501            root = Tk()
502            root.withdraw()
503            root.update()
504            root.destroy()
505        except Exception as e:
506            err_string = str(e)
507            if len(err_string) > 50:
508                err_string = err_string[:50] + ' [...]'
509            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
510                                                           err_string)
511
512    _is_gui_available.reason = reason
513    _is_gui_available.result = not reason
514
515    return _is_gui_available.result
516
517def is_resource_enabled(resource):
518    """Test whether a resource is enabled.
519
520    Known resources are set by regrtest.py.  If not running under regrtest.py,
521    all resources are assumed enabled unless use_resources has been set.
522    """
523    return use_resources is None or resource in use_resources
524
525def requires(resource, msg=None):
526    """Raise ResourceDenied if the specified resource is not available."""
527    if not is_resource_enabled(resource):
528        if msg is None:
529            msg = "Use of the %r resource not enabled" % resource
530        raise ResourceDenied(msg)
531    if resource == 'gui' and not _is_gui_available():
532        raise ResourceDenied(_is_gui_available.reason)
533
534def _requires_unix_version(sysname, min_version):
535    """Decorator raising SkipTest if the OS is `sysname` and the version is less
536    than `min_version`.
537
538    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
539    the FreeBSD version is less than 7.2.
540    """
541    def decorator(func):
542        @functools.wraps(func)
543        def wrapper(*args, **kw):
544            if platform.system() == sysname:
545                version_txt = platform.release().split('-', 1)[0]
546                try:
547                    version = tuple(map(int, version_txt.split('.')))
548                except ValueError:
549                    pass
550                else:
551                    if version < min_version:
552                        min_version_txt = '.'.join(map(str, min_version))
553                        raise unittest.SkipTest(
554                            "%s version %s or higher required, not %s"
555                            % (sysname, min_version_txt, version_txt))
556            return func(*args, **kw)
557        wrapper.min_version = min_version
558        return wrapper
559    return decorator
560
561def requires_freebsd_version(*min_version):
562    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
563    less than `min_version`.
564
565    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
566    version is less than 7.2.
567    """
568    return _requires_unix_version('FreeBSD', min_version)
569
570def requires_linux_version(*min_version):
571    """Decorator raising SkipTest if the OS is Linux and the Linux version is
572    less than `min_version`.
573
574    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
575    version is less than 2.6.32.
576    """
577    return _requires_unix_version('Linux', min_version)
578
579def requires_mac_ver(*min_version):
580    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
581    version if less than min_version.
582
583    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
584    is lesser than 10.5.
585    """
586    def decorator(func):
587        @functools.wraps(func)
588        def wrapper(*args, **kw):
589            if sys.platform == 'darwin':
590                version_txt = platform.mac_ver()[0]
591                try:
592                    version = tuple(map(int, version_txt.split('.')))
593                except ValueError:
594                    pass
595                else:
596                    if version < min_version:
597                        min_version_txt = '.'.join(map(str, min_version))
598                        raise unittest.SkipTest(
599                            "Mac OS X %s or higher required, not %s"
600                            % (min_version_txt, version_txt))
601            return func(*args, **kw)
602        wrapper.min_version = min_version
603        return wrapper
604    return decorator
605
606
607# Don't use "localhost", since resolving it uses the DNS under recent
608# Windows versions (see issue #18792).
609HOST = "127.0.0.1"
610HOSTv6 = "::1"
611
612
613def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
614    """Returns an unused port that should be suitable for binding.  This is
615    achieved by creating a temporary socket with the same family and type as
616    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
617    the specified host address (defaults to 0.0.0.0) with the port set to 0,
618    eliciting an unused ephemeral port from the OS.  The temporary socket is
619    then closed and deleted, and the ephemeral port is returned.
620
621    Either this method or bind_port() should be used for any tests where a
622    server socket needs to be bound to a particular port for the duration of
623    the test.  Which one to use depends on whether the calling code is creating
624    a python socket, or if an unused port needs to be provided in a constructor
625    or passed to an external program (i.e. the -accept argument to openssl's
626    s_server mode).  Always prefer bind_port() over find_unused_port() where
627    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
628    socket is bound to a hard coded port, the ability to run multiple instances
629    of the test simultaneously on the same host is compromised, which makes the
630    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
631    may simply manifest as a failed test, which can be recovered from without
632    intervention in most cases, but on Windows, the entire python process can
633    completely and utterly wedge, requiring someone to log in to the buildbot
634    and manually kill the affected process.
635
636    (This is easy to reproduce on Windows, unfortunately, and can be traced to
637    the SO_REUSEADDR socket option having different semantics on Windows versus
638    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
639    listen and then accept connections on identical host/ports.  An EADDRINUSE
640    OSError will be raised at some point (depending on the platform and
641    the order bind and listen were called on each socket).
642
643    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
644    will ever be raised when attempting to bind two identical host/ports. When
645    accept() is called on each socket, the second caller's process will steal
646    the port from the first caller, leaving them both in an awkwardly wedged
647    state where they'll no longer respond to any signals or graceful kills, and
648    must be forcibly killed via OpenProcess()/TerminateProcess().
649
650    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
651    instead of SO_REUSEADDR, which effectively affords the same semantics as
652    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
653    Source world compared to Windows ones, this is a common mistake.  A quick
654    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
655    openssl.exe is called with the 's_server' option, for example. See
656    http://bugs.python.org/issue2550 for more info.  The following site also
657    has a very thorough description about the implications of both REUSEADDR
658    and EXCLUSIVEADDRUSE on Windows:
659    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
660
661    XXX: although this approach is a vast improvement on previous attempts to
662    elicit unused ports, it rests heavily on the assumption that the ephemeral
663    port returned to us by the OS won't immediately be dished back out to some
664    other process when we close and delete our temporary socket but before our
665    calling code has a chance to bind the returned port.  We can deal with this
666    issue if/when we come across it.
667    """
668
669    tempsock = socket.socket(family, socktype)
670    port = bind_port(tempsock)
671    tempsock.close()
672    del tempsock
673    return port
674
675def bind_port(sock, host=HOST):
676    """Bind the socket to a free port and return the port number.  Relies on
677    ephemeral ports in order to ensure we are using an unbound port.  This is
678    important as many tests may be running simultaneously, especially in a
679    buildbot environment.  This method raises an exception if the sock.family
680    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
681    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
682    for TCP/IP sockets.  The only case for setting these options is testing
683    multicasting via multiple UDP sockets.
684
685    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
686    on Windows), it will be set on the socket.  This will prevent anyone else
687    from bind()'ing to our host/port for the duration of the test.
688    """
689
690    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
691        if hasattr(socket, 'SO_REUSEADDR'):
692            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
693                raise TestFailed("tests should never set the SO_REUSEADDR "   \
694                                 "socket option on TCP/IP sockets!")
695        if hasattr(socket, 'SO_REUSEPORT'):
696            try:
697                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
698                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
699                                     "socket option on TCP/IP sockets!")
700            except OSError:
701                # Python's socket module was compiled using modern headers
702                # thus defining SO_REUSEPORT but this process is running
703                # under an older kernel that does not support SO_REUSEPORT.
704                pass
705        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
706            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
707
708    sock.bind((host, 0))
709    port = sock.getsockname()[1]
710    return port
711
712def bind_unix_socket(sock, addr):
713    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
714    assert sock.family == socket.AF_UNIX
715    try:
716        sock.bind(addr)
717    except PermissionError:
718        sock.close()
719        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
720
721def _is_ipv6_enabled():
722    """Check whether IPv6 is enabled on this host."""
723    if socket.has_ipv6:
724        sock = None
725        try:
726            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
727            sock.bind((HOSTv6, 0))
728            return True
729        except OSError:
730            pass
731        finally:
732            if sock:
733                sock.close()
734    return False
735
736IPV6_ENABLED = _is_ipv6_enabled()
737
738def system_must_validate_cert(f):
739    """Skip the test on TLS certificate validation failures."""
740    @functools.wraps(f)
741    def dec(*args, **kwargs):
742        try:
743            f(*args, **kwargs)
744        except IOError as e:
745            if "CERTIFICATE_VERIFY_FAILED" in str(e):
746                raise unittest.SkipTest("system does not contain "
747                                        "necessary certificates")
748            raise
749    return dec
750
751# A constant likely larger than the underlying OS pipe buffer size, to
752# make writes blocking.
753# Windows limit seems to be around 512 B, and many Unix kernels have a
754# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
755# (see issue #17835 for a discussion of this number).
756PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
757
758# A constant likely larger than the underlying OS socket buffer size, to make
759# writes blocking.
760# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
761# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
762# for a discussion of this number).
763SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
764
765# decorator for skipping tests on non-IEEE 754 platforms
766requires_IEEE_754 = unittest.skipUnless(
767    float.__getformat__("double").startswith("IEEE"),
768    "test requires IEEE 754 doubles")
769
770requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
771
772requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
773
774requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
775
776requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
777
778is_jython = sys.platform.startswith('java')
779
780_ANDROID_API_LEVEL = sysconfig.get_config_var('ANDROID_API_LEVEL')
781is_android = (_ANDROID_API_LEVEL is not None and _ANDROID_API_LEVEL > 0)
782android_not_root = (is_android and os.geteuid() != 0)
783
784if sys.platform != 'win32':
785    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
786else:
787    unix_shell = None
788
789# Filename used for testing
790if os.name == 'java':
791    # Jython disallows @ in module names
792    TESTFN = '$test'
793else:
794    TESTFN = '@test'
795
796# Disambiguate TESTFN for parallel testing, while letting it remain a valid
797# module name.
798TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
799
800# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
801# or None if there is no such character.
802FS_NONASCII = None
803for character in (
804    # First try printable and common characters to have a readable filename.
805    # For each character, the encoding list are just example of encodings able
806    # to encode the character (the list is not exhaustive).
807
808    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
809    '\u00E6',
810    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
811    '\u0130',
812    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
813    '\u0141',
814    # U+03C6 (Greek Small Letter Phi): cp1253
815    '\u03C6',
816    # U+041A (Cyrillic Capital Letter Ka): cp1251
817    '\u041A',
818    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
819    '\u05D0',
820    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
821    '\u060C',
822    # U+062A (Arabic Letter Teh): cp720
823    '\u062A',
824    # U+0E01 (Thai Character Ko Kai): cp874
825    '\u0E01',
826
827    # Then try more "special" characters. "special" because they may be
828    # interpreted or displayed differently depending on the exact locale
829    # encoding and the font.
830
831    # U+00A0 (No-Break Space)
832    '\u00A0',
833    # U+20AC (Euro Sign)
834    '\u20AC',
835):
836    try:
837        os.fsdecode(os.fsencode(character))
838    except UnicodeError:
839        pass
840    else:
841        FS_NONASCII = character
842        break
843
844# TESTFN_UNICODE is a non-ascii filename
845TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
846if sys.platform == 'darwin':
847    # In Mac OS X's VFS API file names are, by definition, canonically
848    # decomposed Unicode, encoded using UTF-8. See QA1173:
849    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
850    import unicodedata
851    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
852TESTFN_ENCODING = sys.getfilesystemencoding()
853
854# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
855# encoded by the filesystem encoding (in strict mode). It can be None if we
856# cannot generate such filename.
857TESTFN_UNENCODABLE = None
858if os.name == 'nt':
859    # skip win32s (0) or Windows 9x/ME (1)
860    if sys.getwindowsversion().platform >= 2:
861        # Different kinds of characters from various languages to minimize the
862        # probability that the whole name is encodable to MBCS (issue #9819)
863        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
864        try:
865            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
866        except UnicodeEncodeError:
867            pass
868        else:
869            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
870                  'Unicode filename tests may not be effective'
871                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
872            TESTFN_UNENCODABLE = None
873# Mac OS X denies unencodable filenames (invalid utf-8)
874elif sys.platform != 'darwin':
875    try:
876        # ascii and utf-8 cannot encode the byte 0xff
877        b'\xff'.decode(TESTFN_ENCODING)
878    except UnicodeDecodeError:
879        # 0xff will be encoded using the surrogate character u+DCFF
880        TESTFN_UNENCODABLE = TESTFN \
881            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
882    else:
883        # File system encoding (eg. ISO-8859-* encodings) can encode
884        # the byte 0xff. Skip some unicode filename tests.
885        pass
886
887# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
888# decoded from the filesystem encoding (in strict mode). It can be None if we
889# cannot generate such filename (ex: the latin1 encoding can decode any byte
890# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
891# to the surrogateescape error handler (PEP 383), but not from the filesystem
892# encoding in strict mode.
893TESTFN_UNDECODABLE = None
894for name in (
895    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
896    # accepts it to create a file or a directory, or don't accept to enter to
897    # such directory (when the bytes name is used). So test b'\xe7' first: it is
898    # not decodable from cp932.
899    b'\xe7w\xf0',
900    # undecodable from ASCII, UTF-8
901    b'\xff',
902    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
903    # and cp857
904    b'\xae\xd5'
905    # undecodable from UTF-8 (UNIX and Mac OS X)
906    b'\xed\xb2\x80', b'\xed\xb4\x80',
907    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
908    # cp1253, cp1254, cp1255, cp1257, cp1258
909    b'\x81\x98',
910):
911    try:
912        name.decode(TESTFN_ENCODING)
913    except UnicodeDecodeError:
914        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
915        break
916
917if FS_NONASCII:
918    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
919else:
920    TESTFN_NONASCII = None
921
922# Save the initial cwd
923SAVEDCWD = os.getcwd()
924
925# Set by libregrtest/main.py so we can skip tests that are not
926# useful for PGO
927PGO = False
928
929@contextlib.contextmanager
930def temp_dir(path=None, quiet=False):
931    """Return a context manager that creates a temporary directory.
932
933    Arguments:
934
935      path: the directory to create temporarily.  If omitted or None,
936        defaults to creating a temporary directory using tempfile.mkdtemp.
937
938      quiet: if False (the default), the context manager raises an exception
939        on error.  Otherwise, if the path is specified and cannot be
940        created, only a warning is issued.
941
942    """
943    dir_created = False
944    if path is None:
945        path = tempfile.mkdtemp()
946        dir_created = True
947        path = os.path.realpath(path)
948    else:
949        try:
950            os.mkdir(path)
951            dir_created = True
952        except OSError:
953            if not quiet:
954                raise
955            warnings.warn('tests may fail, unable to create temp dir: ' + path,
956                          RuntimeWarning, stacklevel=3)
957    try:
958        yield path
959    finally:
960        if dir_created:
961            rmtree(path)
962
963@contextlib.contextmanager
964def change_cwd(path, quiet=False):
965    """Return a context manager that changes the current working directory.
966
967    Arguments:
968
969      path: the directory to use as the temporary current working directory.
970
971      quiet: if False (the default), the context manager raises an exception
972        on error.  Otherwise, it issues only a warning and keeps the current
973        working directory the same.
974
975    """
976    saved_dir = os.getcwd()
977    try:
978        os.chdir(path)
979    except OSError:
980        if not quiet:
981            raise
982        warnings.warn('tests may fail, unable to change CWD to: ' + path,
983                      RuntimeWarning, stacklevel=3)
984    try:
985        yield os.getcwd()
986    finally:
987        os.chdir(saved_dir)
988
989
990@contextlib.contextmanager
991def temp_cwd(name='tempcwd', quiet=False):
992    """
993    Context manager that temporarily creates and changes the CWD.
994
995    The function temporarily changes the current working directory
996    after creating a temporary directory in the current directory with
997    name *name*.  If *name* is None, the temporary directory is
998    created using tempfile.mkdtemp.
999
1000    If *quiet* is False (default) and it is not possible to
1001    create or change the CWD, an error is raised.  If *quiet* is True,
1002    only a warning is raised and the original CWD is used.
1003
1004    """
1005    with temp_dir(path=name, quiet=quiet) as temp_path:
1006        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
1007            yield cwd_dir
1008
1009if hasattr(os, "umask"):
1010    @contextlib.contextmanager
1011    def temp_umask(umask):
1012        """Context manager that temporarily sets the process umask."""
1013        oldmask = os.umask(umask)
1014        try:
1015            yield
1016        finally:
1017            os.umask(oldmask)
1018
1019# TEST_HOME_DIR refers to the top level directory of the "test" package
1020# that contains Python's regression test suite
1021TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
1022TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
1023
1024# TEST_DATA_DIR is used as a target download location for remote resources
1025TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
1026
1027def findfile(filename, subdir=None):
1028    """Try to find a file on sys.path or in the test directory.  If it is not
1029    found the argument passed to the function is returned (this does not
1030    necessarily signal failure; could still be the legitimate path).
1031
1032    Setting *subdir* indicates a relative path to use to find the file
1033    rather than looking directly in the path directories.
1034    """
1035    if os.path.isabs(filename):
1036        return filename
1037    if subdir is not None:
1038        filename = os.path.join(subdir, filename)
1039    path = [TEST_HOME_DIR] + sys.path
1040    for dn in path:
1041        fn = os.path.join(dn, filename)
1042        if os.path.exists(fn): return fn
1043    return filename
1044
1045def create_empty_file(filename):
1046    """Create an empty file. If the file already exists, truncate it."""
1047    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
1048    os.close(fd)
1049
1050def sortdict(dict):
1051    "Like repr(dict), but in sorted order."
1052    items = sorted(dict.items())
1053    reprpairs = ["%r: %r" % pair for pair in items]
1054    withcommas = ", ".join(reprpairs)
1055    return "{%s}" % withcommas
1056
1057def make_bad_fd():
1058    """
1059    Create an invalid file descriptor by opening and closing a file and return
1060    its fd.
1061    """
1062    file = open(TESTFN, "wb")
1063    try:
1064        return file.fileno()
1065    finally:
1066        file.close()
1067        unlink(TESTFN)
1068
1069def check_syntax_error(testcase, statement, *, lineno=None, offset=None):
1070    with testcase.assertRaises(SyntaxError) as cm:
1071        compile(statement, '<test string>', 'exec')
1072    err = cm.exception
1073    testcase.assertIsNotNone(err.lineno)
1074    if lineno is not None:
1075        testcase.assertEqual(err.lineno, lineno)
1076    testcase.assertIsNotNone(err.offset)
1077    if offset is not None:
1078        testcase.assertEqual(err.offset, offset)
1079
1080def open_urlresource(url, *args, **kw):
1081    import urllib.request, urllib.parse
1082
1083    check = kw.pop('check', None)
1084
1085    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1086
1087    fn = os.path.join(TEST_DATA_DIR, filename)
1088
1089    def check_valid_file(fn):
1090        f = open(fn, *args, **kw)
1091        if check is None:
1092            return f
1093        elif check(f):
1094            f.seek(0)
1095            return f
1096        f.close()
1097
1098    if os.path.exists(fn):
1099        f = check_valid_file(fn)
1100        if f is not None:
1101            return f
1102        unlink(fn)
1103
1104    # Verify the requirement before downloading the file
1105    requires('urlfetch')
1106
1107    if verbose:
1108        print('\tfetching %s ...' % url, file=get_original_stdout())
1109    opener = urllib.request.build_opener()
1110    if gzip:
1111        opener.addheaders.append(('Accept-Encoding', 'gzip'))
1112    f = opener.open(url, timeout=15)
1113    if gzip and f.headers.get('Content-Encoding') == 'gzip':
1114        f = gzip.GzipFile(fileobj=f)
1115    try:
1116        with open(fn, "wb") as out:
1117            s = f.read()
1118            while s:
1119                out.write(s)
1120                s = f.read()
1121    finally:
1122        f.close()
1123
1124    f = check_valid_file(fn)
1125    if f is not None:
1126        return f
1127    raise TestFailed('invalid resource %r' % fn)
1128
1129
1130class WarningsRecorder(object):
1131    """Convenience wrapper for the warnings list returned on
1132       entry to the warnings.catch_warnings() context manager.
1133    """
1134    def __init__(self, warnings_list):
1135        self._warnings = warnings_list
1136        self._last = 0
1137
1138    def __getattr__(self, attr):
1139        if len(self._warnings) > self._last:
1140            return getattr(self._warnings[-1], attr)
1141        elif attr in warnings.WarningMessage._WARNING_DETAILS:
1142            return None
1143        raise AttributeError("%r has no attribute %r" % (self, attr))
1144
1145    @property
1146    def warnings(self):
1147        return self._warnings[self._last:]
1148
1149    def reset(self):
1150        self._last = len(self._warnings)
1151
1152
1153def _filterwarnings(filters, quiet=False):
1154    """Catch the warnings, then check if all the expected
1155    warnings have been raised and re-raise unexpected warnings.
1156    If 'quiet' is True, only re-raise the unexpected warnings.
1157    """
1158    # Clear the warning registry of the calling module
1159    # in order to re-raise the warnings.
1160    frame = sys._getframe(2)
1161    registry = frame.f_globals.get('__warningregistry__')
1162    if registry:
1163        registry.clear()
1164    with warnings.catch_warnings(record=True) as w:
1165        # Set filter "always" to record all warnings.  Because
1166        # test_warnings swap the module, we need to look up in
1167        # the sys.modules dictionary.
1168        sys.modules['warnings'].simplefilter("always")
1169        yield WarningsRecorder(w)
1170    # Filter the recorded warnings
1171    reraise = list(w)
1172    missing = []
1173    for msg, cat in filters:
1174        seen = False
1175        for w in reraise[:]:
1176            warning = w.message
1177            # Filter out the matching messages
1178            if (re.match(msg, str(warning), re.I) and
1179                issubclass(warning.__class__, cat)):
1180                seen = True
1181                reraise.remove(w)
1182        if not seen and not quiet:
1183            # This filter caught nothing
1184            missing.append((msg, cat.__name__))
1185    if reraise:
1186        raise AssertionError("unhandled warning %s" % reraise[0])
1187    if missing:
1188        raise AssertionError("filter (%r, %s) did not catch any warning" %
1189                             missing[0])
1190
1191
1192@contextlib.contextmanager
1193def check_warnings(*filters, **kwargs):
1194    """Context manager to silence warnings.
1195
1196    Accept 2-tuples as positional arguments:
1197        ("message regexp", WarningCategory)
1198
1199    Optional argument:
1200     - if 'quiet' is True, it does not fail if a filter catches nothing
1201        (default True without argument,
1202         default False if some filters are defined)
1203
1204    Without argument, it defaults to:
1205        check_warnings(("", Warning), quiet=True)
1206    """
1207    quiet = kwargs.get('quiet')
1208    if not filters:
1209        filters = (("", Warning),)
1210        # Preserve backward compatibility
1211        if quiet is None:
1212            quiet = True
1213    return _filterwarnings(filters, quiet)
1214
1215
1216@contextlib.contextmanager
1217def check_no_resource_warning(testcase):
1218    """Context manager to check that no ResourceWarning is emitted.
1219
1220    Usage:
1221
1222        with check_no_resource_warning(self):
1223            f = open(...)
1224            ...
1225            del f
1226
1227    You must remove the object which may emit ResourceWarning before
1228    the end of the context manager.
1229    """
1230    with warnings.catch_warnings(record=True) as warns:
1231        warnings.filterwarnings('always', category=ResourceWarning)
1232        yield
1233        gc_collect()
1234    testcase.assertEqual(warns, [])
1235
1236
1237class CleanImport(object):
1238    """Context manager to force import to return a new module reference.
1239
1240    This is useful for testing module-level behaviours, such as
1241    the emission of a DeprecationWarning on import.
1242
1243    Use like this:
1244
1245        with CleanImport("foo"):
1246            importlib.import_module("foo") # new reference
1247    """
1248
1249    def __init__(self, *module_names):
1250        self.original_modules = sys.modules.copy()
1251        for module_name in module_names:
1252            if module_name in sys.modules:
1253                module = sys.modules[module_name]
1254                # It is possible that module_name is just an alias for
1255                # another module (e.g. stub for modules renamed in 3.x).
1256                # In that case, we also need delete the real module to clear
1257                # the import cache.
1258                if module.__name__ != module_name:
1259                    del sys.modules[module.__name__]
1260                del sys.modules[module_name]
1261
1262    def __enter__(self):
1263        return self
1264
1265    def __exit__(self, *ignore_exc):
1266        sys.modules.update(self.original_modules)
1267
1268
1269class EnvironmentVarGuard(collections.abc.MutableMapping):
1270
1271    """Class to help protect the environment variable properly.  Can be used as
1272    a context manager."""
1273
1274    def __init__(self):
1275        self._environ = os.environ
1276        self._changed = {}
1277
1278    def __getitem__(self, envvar):
1279        return self._environ[envvar]
1280
1281    def __setitem__(self, envvar, value):
1282        # Remember the initial value on the first access
1283        if envvar not in self._changed:
1284            self._changed[envvar] = self._environ.get(envvar)
1285        self._environ[envvar] = value
1286
1287    def __delitem__(self, envvar):
1288        # Remember the initial value on the first access
1289        if envvar not in self._changed:
1290            self._changed[envvar] = self._environ.get(envvar)
1291        if envvar in self._environ:
1292            del self._environ[envvar]
1293
1294    def keys(self):
1295        return self._environ.keys()
1296
1297    def __iter__(self):
1298        return iter(self._environ)
1299
1300    def __len__(self):
1301        return len(self._environ)
1302
1303    def set(self, envvar, value):
1304        self[envvar] = value
1305
1306    def unset(self, envvar):
1307        del self[envvar]
1308
1309    def __enter__(self):
1310        return self
1311
1312    def __exit__(self, *ignore_exc):
1313        for (k, v) in self._changed.items():
1314            if v is None:
1315                if k in self._environ:
1316                    del self._environ[k]
1317            else:
1318                self._environ[k] = v
1319        os.environ = self._environ
1320
1321
1322class DirsOnSysPath(object):
1323    """Context manager to temporarily add directories to sys.path.
1324
1325    This makes a copy of sys.path, appends any directories given
1326    as positional arguments, then reverts sys.path to the copied
1327    settings when the context ends.
1328
1329    Note that *all* sys.path modifications in the body of the
1330    context manager, including replacement of the object,
1331    will be reverted at the end of the block.
1332    """
1333
1334    def __init__(self, *paths):
1335        self.original_value = sys.path[:]
1336        self.original_object = sys.path
1337        sys.path.extend(paths)
1338
1339    def __enter__(self):
1340        return self
1341
1342    def __exit__(self, *ignore_exc):
1343        sys.path = self.original_object
1344        sys.path[:] = self.original_value
1345
1346
1347class TransientResource(object):
1348
1349    """Raise ResourceDenied if an exception is raised while the context manager
1350    is in effect that matches the specified exception and attributes."""
1351
1352    def __init__(self, exc, **kwargs):
1353        self.exc = exc
1354        self.attrs = kwargs
1355
1356    def __enter__(self):
1357        return self
1358
1359    def __exit__(self, type_=None, value=None, traceback=None):
1360        """If type_ is a subclass of self.exc and value has attributes matching
1361        self.attrs, raise ResourceDenied.  Otherwise let the exception
1362        propagate (if any)."""
1363        if type_ is not None and issubclass(self.exc, type_):
1364            for attr, attr_value in self.attrs.items():
1365                if not hasattr(value, attr):
1366                    break
1367                if getattr(value, attr) != attr_value:
1368                    break
1369            else:
1370                raise ResourceDenied("an optional resource is not available")
1371
1372# Context managers that raise ResourceDenied when various issues
1373# with the Internet connection manifest themselves as exceptions.
1374# XXX deprecate these and use transient_internet() instead
1375time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1376socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1377ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1378
1379
1380@contextlib.contextmanager
1381def transient_internet(resource_name, *, timeout=30.0, errnos=()):
1382    """Return a context manager that raises ResourceDenied when various issues
1383    with the Internet connection manifest themselves as exceptions."""
1384    default_errnos = [
1385        ('ECONNREFUSED', 111),
1386        ('ECONNRESET', 104),
1387        ('EHOSTUNREACH', 113),
1388        ('ENETUNREACH', 101),
1389        ('ETIMEDOUT', 110),
1390    ]
1391    default_gai_errnos = [
1392        ('EAI_AGAIN', -3),
1393        ('EAI_FAIL', -4),
1394        ('EAI_NONAME', -2),
1395        ('EAI_NODATA', -5),
1396        # Encountered when trying to resolve IPv6-only hostnames
1397        ('WSANO_DATA', 11004),
1398    ]
1399
1400    denied = ResourceDenied("Resource %r is not available" % resource_name)
1401    captured_errnos = errnos
1402    gai_errnos = []
1403    if not captured_errnos:
1404        captured_errnos = [getattr(errno, name, num)
1405                           for (name, num) in default_errnos]
1406        gai_errnos = [getattr(socket, name, num)
1407                      for (name, num) in default_gai_errnos]
1408
1409    def filter_error(err):
1410        n = getattr(err, 'errno', None)
1411        if (isinstance(err, socket.timeout) or
1412            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1413            (isinstance(err, urllib.error.HTTPError) and
1414             500 <= err.code <= 599) or
1415            (isinstance(err, urllib.error.URLError) and
1416                 (("ConnectionRefusedError" in err.reason) or
1417                  ("TimeoutError" in err.reason) or
1418                  ("EOFError" in err.reason))) or
1419            n in captured_errnos):
1420            if not verbose:
1421                sys.stderr.write(denied.args[0] + "\n")
1422            raise denied from err
1423
1424    old_timeout = socket.getdefaulttimeout()
1425    try:
1426        if timeout is not None:
1427            socket.setdefaulttimeout(timeout)
1428        yield
1429    except nntplib.NNTPTemporaryError as err:
1430        if verbose:
1431            sys.stderr.write(denied.args[0] + "\n")
1432        raise denied from err
1433    except OSError as err:
1434        # urllib can wrap original socket errors multiple times (!), we must
1435        # unwrap to get at the original error.
1436        while True:
1437            a = err.args
1438            if len(a) >= 1 and isinstance(a[0], OSError):
1439                err = a[0]
1440            # The error can also be wrapped as args[1]:
1441            #    except socket.error as msg:
1442            #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1443            elif len(a) >= 2 and isinstance(a[1], OSError):
1444                err = a[1]
1445            else:
1446                break
1447        filter_error(err)
1448        raise
1449    # XXX should we catch generic exceptions and look for their
1450    # __cause__ or __context__?
1451    finally:
1452        socket.setdefaulttimeout(old_timeout)
1453
1454
1455@contextlib.contextmanager
1456def captured_output(stream_name):
1457    """Return a context manager used by captured_stdout/stdin/stderr
1458    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1459    import io
1460    orig_stdout = getattr(sys, stream_name)
1461    setattr(sys, stream_name, io.StringIO())
1462    try:
1463        yield getattr(sys, stream_name)
1464    finally:
1465        setattr(sys, stream_name, orig_stdout)
1466
1467def captured_stdout():
1468    """Capture the output of sys.stdout:
1469
1470       with captured_stdout() as stdout:
1471           print("hello")
1472       self.assertEqual(stdout.getvalue(), "hello\\n")
1473    """
1474    return captured_output("stdout")
1475
1476def captured_stderr():
1477    """Capture the output of sys.stderr:
1478
1479       with captured_stderr() as stderr:
1480           print("hello", file=sys.stderr)
1481       self.assertEqual(stderr.getvalue(), "hello\\n")
1482    """
1483    return captured_output("stderr")
1484
1485def captured_stdin():
1486    """Capture the input to sys.stdin:
1487
1488       with captured_stdin() as stdin:
1489           stdin.write('hello\\n')
1490           stdin.seek(0)
1491           # call test code that consumes from sys.stdin
1492           captured = input()
1493       self.assertEqual(captured, "hello")
1494    """
1495    return captured_output("stdin")
1496
1497
1498def gc_collect():
1499    """Force as many objects as possible to be collected.
1500
1501    In non-CPython implementations of Python, this is needed because timely
1502    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1503    this can be the case in case of reference cycles.)  This means that __del__
1504    methods may be called later than expected and weakrefs may remain alive for
1505    longer than expected.  This function tries its best to force all garbage
1506    objects to disappear.
1507    """
1508    gc.collect()
1509    if is_jython:
1510        time.sleep(0.1)
1511    gc.collect()
1512    gc.collect()
1513
1514@contextlib.contextmanager
1515def disable_gc():
1516    have_gc = gc.isenabled()
1517    gc.disable()
1518    try:
1519        yield
1520    finally:
1521        if have_gc:
1522            gc.enable()
1523
1524
1525def python_is_optimized():
1526    """Find if Python was built with optimizations."""
1527    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1528    final_opt = ""
1529    for opt in cflags.split():
1530        if opt.startswith('-O'):
1531            final_opt = opt
1532    return final_opt not in ('', '-O0', '-Og')
1533
1534
1535_header = 'nP'
1536_align = '0n'
1537if hasattr(sys, "gettotalrefcount"):
1538    _header = '2P' + _header
1539    _align = '0P'
1540_vheader = _header + 'n'
1541
1542def calcobjsize(fmt):
1543    return struct.calcsize(_header + fmt + _align)
1544
1545def calcvobjsize(fmt):
1546    return struct.calcsize(_vheader + fmt + _align)
1547
1548
1549_TPFLAGS_HAVE_GC = 1<<14
1550_TPFLAGS_HEAPTYPE = 1<<9
1551
1552def check_sizeof(test, o, size):
1553    import _testcapi
1554    result = sys.getsizeof(o)
1555    # add GC header size
1556    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1557        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1558        size += _testcapi.SIZEOF_PYGC_HEAD
1559    msg = 'wrong size for %s: got %d, expected %d' \
1560            % (type(o), result, size)
1561    test.assertEqual(result, size, msg)
1562
1563#=======================================================================
1564# Decorator for running a function in a different locale, correctly resetting
1565# it afterwards.
1566
1567def run_with_locale(catstr, *locales):
1568    def decorator(func):
1569        def inner(*args, **kwds):
1570            try:
1571                import locale
1572                category = getattr(locale, catstr)
1573                orig_locale = locale.setlocale(category)
1574            except AttributeError:
1575                # if the test author gives us an invalid category string
1576                raise
1577            except:
1578                # cannot retrieve original locale, so do nothing
1579                locale = orig_locale = None
1580            else:
1581                for loc in locales:
1582                    try:
1583                        locale.setlocale(category, loc)
1584                        break
1585                    except:
1586                        pass
1587
1588            # now run the function, resetting the locale on exceptions
1589            try:
1590                return func(*args, **kwds)
1591            finally:
1592                if locale and orig_locale:
1593                    locale.setlocale(category, orig_locale)
1594        inner.__name__ = func.__name__
1595        inner.__doc__ = func.__doc__
1596        return inner
1597    return decorator
1598
1599#=======================================================================
1600# Decorator for running a function in a specific timezone, correctly
1601# resetting it afterwards.
1602
1603def run_with_tz(tz):
1604    def decorator(func):
1605        def inner(*args, **kwds):
1606            try:
1607                tzset = time.tzset
1608            except AttributeError:
1609                raise unittest.SkipTest("tzset required")
1610            if 'TZ' in os.environ:
1611                orig_tz = os.environ['TZ']
1612            else:
1613                orig_tz = None
1614            os.environ['TZ'] = tz
1615            tzset()
1616
1617            # now run the function, resetting the tz on exceptions
1618            try:
1619                return func(*args, **kwds)
1620            finally:
1621                if orig_tz is None:
1622                    del os.environ['TZ']
1623                else:
1624                    os.environ['TZ'] = orig_tz
1625                time.tzset()
1626
1627        inner.__name__ = func.__name__
1628        inner.__doc__ = func.__doc__
1629        return inner
1630    return decorator
1631
1632#=======================================================================
1633# Big-memory-test support. Separate from 'resources' because memory use
1634# should be configurable.
1635
1636# Some handy shorthands. Note that these are used for byte-limits as well
1637# as size-limits, in the various bigmem tests
1638_1M = 1024*1024
1639_1G = 1024 * _1M
1640_2G = 2 * _1G
1641_4G = 4 * _1G
1642
1643MAX_Py_ssize_t = sys.maxsize
1644
1645def set_memlimit(limit):
1646    global max_memuse
1647    global real_max_memuse
1648    sizes = {
1649        'k': 1024,
1650        'm': _1M,
1651        'g': _1G,
1652        't': 1024*_1G,
1653    }
1654    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1655                 re.IGNORECASE | re.VERBOSE)
1656    if m is None:
1657        raise ValueError('Invalid memory limit %r' % (limit,))
1658    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1659    real_max_memuse = memlimit
1660    if memlimit > MAX_Py_ssize_t:
1661        memlimit = MAX_Py_ssize_t
1662    if memlimit < _2G - 1:
1663        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1664    max_memuse = memlimit
1665
1666class _MemoryWatchdog:
1667    """An object which periodically watches the process' memory consumption
1668    and prints it out.
1669    """
1670
1671    def __init__(self):
1672        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1673        self.started = False
1674
1675    def start(self):
1676        try:
1677            f = open(self.procfile, 'r')
1678        except OSError as e:
1679            warnings.warn('/proc not available for stats: {}'.format(e),
1680                          RuntimeWarning)
1681            sys.stderr.flush()
1682            return
1683
1684        watchdog_script = findfile("memory_watchdog.py")
1685        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1686                                             stdin=f, stderr=subprocess.DEVNULL)
1687        f.close()
1688        self.started = True
1689
1690    def stop(self):
1691        if self.started:
1692            self.mem_watchdog.terminate()
1693            self.mem_watchdog.wait()
1694
1695
1696def bigmemtest(size, memuse, dry_run=True):
1697    """Decorator for bigmem tests.
1698
1699    'size' is a requested size for the test (in arbitrary, test-interpreted
1700    units.) 'memuse' is the number of bytes per unit for the test, or a good
1701    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1702    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1703
1704    The 'size' argument is normally passed to the decorated test method as an
1705    extra argument. If 'dry_run' is true, the value passed to the test method
1706    may be less than the requested value. If 'dry_run' is false, it means the
1707    test doesn't support dummy runs when -M is not specified.
1708    """
1709    def decorator(f):
1710        def wrapper(self):
1711            size = wrapper.size
1712            memuse = wrapper.memuse
1713            if not real_max_memuse:
1714                maxsize = 5147
1715            else:
1716                maxsize = size
1717
1718            if ((real_max_memuse or not dry_run)
1719                and real_max_memuse < maxsize * memuse):
1720                raise unittest.SkipTest(
1721                    "not enough memory: %.1fG minimum needed"
1722                    % (size * memuse / (1024 ** 3)))
1723
1724            if real_max_memuse and verbose:
1725                print()
1726                print(" ... expected peak memory use: {peak:.1f}G"
1727                      .format(peak=size * memuse / (1024 ** 3)))
1728                watchdog = _MemoryWatchdog()
1729                watchdog.start()
1730            else:
1731                watchdog = None
1732
1733            try:
1734                return f(self, maxsize)
1735            finally:
1736                if watchdog:
1737                    watchdog.stop()
1738
1739        wrapper.size = size
1740        wrapper.memuse = memuse
1741        return wrapper
1742    return decorator
1743
1744def bigaddrspacetest(f):
1745    """Decorator for tests that fill the address space."""
1746    def wrapper(self):
1747        if max_memuse < MAX_Py_ssize_t:
1748            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1749                raise unittest.SkipTest(
1750                    "not enough memory: try a 32-bit build instead")
1751            else:
1752                raise unittest.SkipTest(
1753                    "not enough memory: %.1fG minimum needed"
1754                    % (MAX_Py_ssize_t / (1024 ** 3)))
1755        else:
1756            return f(self)
1757    return wrapper
1758
1759#=======================================================================
1760# unittest integration.
1761
1762class BasicTestRunner:
1763    def run(self, test):
1764        result = unittest.TestResult()
1765        test(result)
1766        return result
1767
1768def _id(obj):
1769    return obj
1770
1771def requires_resource(resource):
1772    if resource == 'gui' and not _is_gui_available():
1773        return unittest.skip(_is_gui_available.reason)
1774    if is_resource_enabled(resource):
1775        return _id
1776    else:
1777        return unittest.skip("resource {0!r} is not enabled".format(resource))
1778
1779def requires_android_level(level, reason):
1780    if is_android and _ANDROID_API_LEVEL < level:
1781        return unittest.skip('%s at Android API level %d' %
1782                             (reason, _ANDROID_API_LEVEL))
1783    else:
1784        return _id
1785
1786def cpython_only(test):
1787    """
1788    Decorator for tests only applicable on CPython.
1789    """
1790    return impl_detail(cpython=True)(test)
1791
1792def impl_detail(msg=None, **guards):
1793    if check_impl_detail(**guards):
1794        return _id
1795    if msg is None:
1796        guardnames, default = _parse_guards(guards)
1797        if default:
1798            msg = "implementation detail not available on {0}"
1799        else:
1800            msg = "implementation detail specific to {0}"
1801        guardnames = sorted(guardnames.keys())
1802        msg = msg.format(' or '.join(guardnames))
1803    return unittest.skip(msg)
1804
1805_have_mp_queue = None
1806def requires_multiprocessing_queue(test):
1807    """Skip decorator for tests that use multiprocessing.Queue."""
1808    global _have_mp_queue
1809    if _have_mp_queue is None:
1810        import multiprocessing
1811        # Without a functioning shared semaphore implementation attempts to
1812        # instantiate a Queue will result in an ImportError (issue #3770).
1813        try:
1814            multiprocessing.Queue()
1815            _have_mp_queue = True
1816        except ImportError:
1817            _have_mp_queue = False
1818    msg = "requires a functioning shared semaphore implementation"
1819    return test if _have_mp_queue else unittest.skip(msg)(test)
1820
1821def _parse_guards(guards):
1822    # Returns a tuple ({platform_name: run_me}, default_value)
1823    if not guards:
1824        return ({'cpython': True}, False)
1825    is_true = list(guards.values())[0]
1826    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1827    return (guards, not is_true)
1828
1829# Use the following check to guard CPython's implementation-specific tests --
1830# or to run them only on the implementation(s) guarded by the arguments.
1831def check_impl_detail(**guards):
1832    """This function returns True or False depending on the host platform.
1833       Examples:
1834          if check_impl_detail():               # only on CPython (default)
1835          if check_impl_detail(jython=True):    # only on Jython
1836          if check_impl_detail(cpython=False):  # everywhere except on CPython
1837    """
1838    guards, default = _parse_guards(guards)
1839    return guards.get(platform.python_implementation().lower(), default)
1840
1841
1842def no_tracing(func):
1843    """Decorator to temporarily turn off tracing for the duration of a test."""
1844    if not hasattr(sys, 'gettrace'):
1845        return func
1846    else:
1847        @functools.wraps(func)
1848        def wrapper(*args, **kwargs):
1849            original_trace = sys.gettrace()
1850            try:
1851                sys.settrace(None)
1852                return func(*args, **kwargs)
1853            finally:
1854                sys.settrace(original_trace)
1855        return wrapper
1856
1857
1858def refcount_test(test):
1859    """Decorator for tests which involve reference counting.
1860
1861    To start, the decorator does not run the test if is not run by CPython.
1862    After that, any trace function is unset during the test to prevent
1863    unexpected refcounts caused by the trace function.
1864
1865    """
1866    return no_tracing(cpython_only(test))
1867
1868
1869def _filter_suite(suite, pred):
1870    """Recursively filter test cases in a suite based on a predicate."""
1871    newtests = []
1872    for test in suite._tests:
1873        if isinstance(test, unittest.TestSuite):
1874            _filter_suite(test, pred)
1875            newtests.append(test)
1876        else:
1877            if pred(test):
1878                newtests.append(test)
1879    suite._tests = newtests
1880
1881def _run_suite(suite):
1882    """Run tests from a unittest.TestSuite-derived class."""
1883    if verbose:
1884        runner = unittest.TextTestRunner(sys.stdout, verbosity=2,
1885                                         failfast=failfast)
1886    else:
1887        runner = BasicTestRunner()
1888
1889    result = runner.run(suite)
1890    if not result.wasSuccessful():
1891        if len(result.errors) == 1 and not result.failures:
1892            err = result.errors[0][1]
1893        elif len(result.failures) == 1 and not result.errors:
1894            err = result.failures[0][1]
1895        else:
1896            err = "multiple errors occurred"
1897            if not verbose: err += "; run in verbose mode for details"
1898        raise TestFailed(err)
1899
1900
1901def run_unittest(*classes):
1902    """Run tests from unittest.TestCase-derived classes."""
1903    valid_types = (unittest.TestSuite, unittest.TestCase)
1904    suite = unittest.TestSuite()
1905    for cls in classes:
1906        if isinstance(cls, str):
1907            if cls in sys.modules:
1908                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1909            else:
1910                raise ValueError("str arguments must be keys in sys.modules")
1911        elif isinstance(cls, valid_types):
1912            suite.addTest(cls)
1913        else:
1914            suite.addTest(unittest.makeSuite(cls))
1915    def case_pred(test):
1916        if match_tests is None:
1917            return True
1918        for name in test.id().split("."):
1919            if fnmatch.fnmatchcase(name, match_tests):
1920                return True
1921        return False
1922    _filter_suite(suite, case_pred)
1923    _run_suite(suite)
1924
1925#=======================================================================
1926# Check for the presence of docstrings.
1927
1928# Rather than trying to enumerate all the cases where docstrings may be
1929# disabled, we just check for that directly
1930
1931def _check_docstrings():
1932    """Just used to check if docstrings are enabled"""
1933
1934MISSING_C_DOCSTRINGS = (check_impl_detail() and
1935                        sys.platform != 'win32' and
1936                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1937
1938HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1939                   not MISSING_C_DOCSTRINGS)
1940
1941requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1942                                          "test requires docstrings")
1943
1944
1945#=======================================================================
1946# doctest driver.
1947
1948def run_doctest(module, verbosity=None, optionflags=0):
1949    """Run doctest on the given module.  Return (#failures, #tests).
1950
1951    If optional argument verbosity is not specified (or is None), pass
1952    support's belief about verbosity on to doctest.  Else doctest's
1953    usual behavior is used (it searches sys.argv for -v).
1954    """
1955
1956    import doctest
1957
1958    if verbosity is None:
1959        verbosity = verbose
1960    else:
1961        verbosity = None
1962
1963    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1964    if f:
1965        raise TestFailed("%d of %d doctests failed" % (f, t))
1966    if verbose:
1967        print('doctest (%s) ... %d tests with zero failures' %
1968              (module.__name__, t))
1969    return f, t
1970
1971
1972#=======================================================================
1973# Support for saving and restoring the imported modules.
1974
1975def modules_setup():
1976    return sys.modules.copy(),
1977
1978def modules_cleanup(oldmodules):
1979    # Encoders/decoders are registered permanently within the internal
1980    # codec cache. If we destroy the corresponding modules their
1981    # globals will be set to None which will trip up the cached functions.
1982    encodings = [(k, v) for k, v in sys.modules.items()
1983                 if k.startswith('encodings.')]
1984    sys.modules.clear()
1985    sys.modules.update(encodings)
1986    # XXX: This kind of problem can affect more than just encodings. In particular
1987    # extension modules (such as _ssl) don't cope with reloading properly.
1988    # Really, test modules should be cleaning out the test specific modules they
1989    # know they added (ala test_runpy) rather than relying on this function (as
1990    # test_importhooks and test_pkg do currently).
1991    # Implicitly imported *real* modules should be left alone (see issue 10556).
1992    sys.modules.update(oldmodules)
1993
1994#=======================================================================
1995# Threading support to prevent reporting refleaks when running regrtest.py -R
1996
1997# NOTE: we use thread._count() rather than threading.enumerate() (or the
1998# moral equivalent thereof) because a threading.Thread object is still alive
1999# until its __bootstrap() method has returned, even after it has been
2000# unregistered from the threading module.
2001# thread._count(), on the other hand, only gets decremented *after* the
2002# __bootstrap() method has returned, which gives us reliable reference counts
2003# at the end of a test run.
2004
2005def threading_setup():
2006    if _thread:
2007        return _thread._count(), threading._dangling.copy()
2008    else:
2009        return 1, ()
2010
2011def threading_cleanup(*original_values):
2012    if not _thread:
2013        return
2014    _MAX_COUNT = 100
2015    for count in range(_MAX_COUNT):
2016        values = _thread._count(), threading._dangling
2017        if values == original_values:
2018            break
2019        time.sleep(0.01)
2020        gc_collect()
2021    # XXX print a warning in case of failure?
2022
2023def reap_threads(func):
2024    """Use this function when threads are being used.  This will
2025    ensure that the threads are cleaned up even when the test fails.
2026    If threading is unavailable this function does nothing.
2027    """
2028    if not _thread:
2029        return func
2030
2031    @functools.wraps(func)
2032    def decorator(*args):
2033        key = threading_setup()
2034        try:
2035            return func(*args)
2036        finally:
2037            threading_cleanup(*key)
2038    return decorator
2039
2040def reap_children():
2041    """Use this function at the end of test_main() whenever sub-processes
2042    are started.  This will help ensure that no extra children (zombies)
2043    stick around to hog resources and create problems when looking
2044    for refleaks.
2045    """
2046
2047    # Reap all our dead child processes so we don't leave zombies around.
2048    # These hog resources and might be causing some of the buildbots to die.
2049    if hasattr(os, 'waitpid'):
2050        any_process = -1
2051        while True:
2052            try:
2053                # This will raise an exception on Windows.  That's ok.
2054                pid, status = os.waitpid(any_process, os.WNOHANG)
2055                if pid == 0:
2056                    break
2057            except:
2058                break
2059
2060@contextlib.contextmanager
2061def start_threads(threads, unlock=None):
2062    threads = list(threads)
2063    started = []
2064    try:
2065        try:
2066            for t in threads:
2067                t.start()
2068                started.append(t)
2069        except:
2070            if verbose:
2071                print("Can't start %d threads, only %d threads started" %
2072                      (len(threads), len(started)))
2073            raise
2074        yield
2075    finally:
2076        try:
2077            if unlock:
2078                unlock()
2079            endtime = starttime = time.time()
2080            for timeout in range(1, 16):
2081                endtime += 60
2082                for t in started:
2083                    t.join(max(endtime - time.time(), 0.01))
2084                started = [t for t in started if t.isAlive()]
2085                if not started:
2086                    break
2087                if verbose:
2088                    print('Unable to join %d threads during a period of '
2089                          '%d minutes' % (len(started), timeout))
2090        finally:
2091            started = [t for t in started if t.isAlive()]
2092            if started:
2093                faulthandler.dump_traceback(sys.stdout)
2094                raise AssertionError('Unable to join %d threads' % len(started))
2095
2096@contextlib.contextmanager
2097def swap_attr(obj, attr, new_val):
2098    """Temporary swap out an attribute with a new object.
2099
2100    Usage:
2101        with swap_attr(obj, "attr", 5):
2102            ...
2103
2104        This will set obj.attr to 5 for the duration of the with: block,
2105        restoring the old value at the end of the block. If `attr` doesn't
2106        exist on `obj`, it will be created and then deleted at the end of the
2107        block.
2108    """
2109    if hasattr(obj, attr):
2110        real_val = getattr(obj, attr)
2111        setattr(obj, attr, new_val)
2112        try:
2113            yield
2114        finally:
2115            setattr(obj, attr, real_val)
2116    else:
2117        setattr(obj, attr, new_val)
2118        try:
2119            yield
2120        finally:
2121            delattr(obj, attr)
2122
2123@contextlib.contextmanager
2124def swap_item(obj, item, new_val):
2125    """Temporary swap out an item with a new object.
2126
2127    Usage:
2128        with swap_item(obj, "item", 5):
2129            ...
2130
2131        This will set obj["item"] to 5 for the duration of the with: block,
2132        restoring the old value at the end of the block. If `item` doesn't
2133        exist on `obj`, it will be created and then deleted at the end of the
2134        block.
2135    """
2136    if item in obj:
2137        real_val = obj[item]
2138        obj[item] = new_val
2139        try:
2140            yield
2141        finally:
2142            obj[item] = real_val
2143    else:
2144        obj[item] = new_val
2145        try:
2146            yield
2147        finally:
2148            del obj[item]
2149
2150def strip_python_stderr(stderr):
2151    """Strip the stderr of a Python process from potential debug output
2152    emitted by the interpreter.
2153
2154    This will typically be run on the result of the communicate() method
2155    of a subprocess.Popen object.
2156    """
2157    stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
2158    return stderr
2159
2160requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
2161                        'types are immortal if COUNT_ALLOCS is defined')
2162
2163def args_from_interpreter_flags():
2164    """Return a list of command-line arguments reproducing the current
2165    settings in sys.flags and sys.warnoptions."""
2166    return subprocess._args_from_interpreter_flags()
2167
2168def optim_args_from_interpreter_flags():
2169    """Return a list of command-line arguments reproducing the current
2170    optimization settings in sys.flags."""
2171    return subprocess._optim_args_from_interpreter_flags()
2172
2173#============================================================
2174# Support for assertions about logging.
2175#============================================================
2176
2177class TestHandler(logging.handlers.BufferingHandler):
2178    def __init__(self, matcher):
2179        # BufferingHandler takes a "capacity" argument
2180        # so as to know when to flush. As we're overriding
2181        # shouldFlush anyway, we can set a capacity of zero.
2182        # You can call flush() manually to clear out the
2183        # buffer.
2184        logging.handlers.BufferingHandler.__init__(self, 0)
2185        self.matcher = matcher
2186
2187    def shouldFlush(self):
2188        return False
2189
2190    def emit(self, record):
2191        self.format(record)
2192        self.buffer.append(record.__dict__)
2193
2194    def matches(self, **kwargs):
2195        """
2196        Look for a saved dict whose keys/values match the supplied arguments.
2197        """
2198        result = False
2199        for d in self.buffer:
2200            if self.matcher.matches(d, **kwargs):
2201                result = True
2202                break
2203        return result
2204
2205class Matcher(object):
2206
2207    _partial_matches = ('msg', 'message')
2208
2209    def matches(self, d, **kwargs):
2210        """
2211        Try to match a single dict with the supplied arguments.
2212
2213        Keys whose values are strings and which are in self._partial_matches
2214        will be checked for partial (i.e. substring) matches. You can extend
2215        this scheme to (for example) do regular expression matching, etc.
2216        """
2217        result = True
2218        for k in kwargs:
2219            v = kwargs[k]
2220            dv = d.get(k)
2221            if not self.match_value(k, dv, v):
2222                result = False
2223                break
2224        return result
2225
2226    def match_value(self, k, dv, v):
2227        """
2228        Try to match a single stored value (dv) with a supplied value (v).
2229        """
2230        if type(v) != type(dv):
2231            result = False
2232        elif type(dv) is not str or k not in self._partial_matches:
2233            result = (v == dv)
2234        else:
2235            result = dv.find(v) >= 0
2236        return result
2237
2238
2239_can_symlink = None
2240def can_symlink():
2241    global _can_symlink
2242    if _can_symlink is not None:
2243        return _can_symlink
2244    symlink_path = TESTFN + "can_symlink"
2245    try:
2246        os.symlink(TESTFN, symlink_path)
2247        can = True
2248    except (OSError, NotImplementedError, AttributeError):
2249        can = False
2250    else:
2251        os.remove(symlink_path)
2252    _can_symlink = can
2253    return can
2254
2255def skip_unless_symlink(test):
2256    """Skip decorator for tests that require functional symlink"""
2257    ok = can_symlink()
2258    msg = "Requires functional symlink implementation"
2259    return test if ok else unittest.skip(msg)(test)
2260
2261_can_xattr = None
2262def can_xattr():
2263    global _can_xattr
2264    if _can_xattr is not None:
2265        return _can_xattr
2266    if not hasattr(os, "setxattr"):
2267        can = False
2268    else:
2269        tmp_fp, tmp_name = tempfile.mkstemp()
2270        try:
2271            with open(TESTFN, "wb") as fp:
2272                try:
2273                    # TESTFN & tempfile may use different file systems with
2274                    # different capabilities
2275                    os.setxattr(tmp_fp, b"user.test", b"")
2276                    os.setxattr(fp.fileno(), b"user.test", b"")
2277                    # Kernels < 2.6.39 don't respect setxattr flags.
2278                    kernel_version = platform.release()
2279                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
2280                    can = m is None or int(m.group(1)) >= 39
2281                except OSError:
2282                    can = False
2283        finally:
2284            unlink(TESTFN)
2285            unlink(tmp_name)
2286    _can_xattr = can
2287    return can
2288
2289def skip_unless_xattr(test):
2290    """Skip decorator for tests that require functional extended attributes"""
2291    ok = can_xattr()
2292    msg = "no non-broken extended attribute support"
2293    return test if ok else unittest.skip(msg)(test)
2294
2295
2296def fs_is_case_insensitive(directory):
2297    """Detects if the file system for the specified directory is case-insensitive."""
2298    with tempfile.NamedTemporaryFile(dir=directory) as base:
2299        base_path = base.name
2300        case_path = base_path.upper()
2301        if case_path == base_path:
2302            case_path = base_path.lower()
2303        try:
2304            return os.path.samefile(base_path, case_path)
2305        except FileNotFoundError:
2306            return False
2307
2308
2309def detect_api_mismatch(ref_api, other_api, *, ignore=()):
2310    """Returns the set of items in ref_api not in other_api, except for a
2311    defined list of items to be ignored in this check.
2312
2313    By default this skips private attributes beginning with '_' but
2314    includes all magic methods, i.e. those starting and ending in '__'.
2315    """
2316    missing_items = set(dir(ref_api)) - set(dir(other_api))
2317    if ignore:
2318        missing_items -= set(ignore)
2319    missing_items = set(m for m in missing_items
2320                        if not m.startswith('_') or m.endswith('__'))
2321    return missing_items
2322
2323
2324def check__all__(test_case, module, name_of_module=None, extra=(),
2325                 blacklist=()):
2326    """Assert that the __all__ variable of 'module' contains all public names.
2327
2328    The module's public names (its API) are detected automatically based on
2329    whether they match the public name convention and were defined in
2330    'module'.
2331
2332    The 'name_of_module' argument can specify (as a string or tuple thereof)
2333    what module(s) an API could be defined in in order to be detected as a
2334    public API. One case for this is when 'module' imports part of its public
2335    API from other modules, possibly a C backend (like 'csv' and its '_csv').
2336
2337    The 'extra' argument can be a set of names that wouldn't otherwise be
2338    automatically detected as "public", like objects without a proper
2339    '__module__' attriubute. If provided, it will be added to the
2340    automatically detected ones.
2341
2342    The 'blacklist' argument can be a set of names that must not be treated
2343    as part of the public API even though their names indicate otherwise.
2344
2345    Usage:
2346        import bar
2347        import foo
2348        import unittest
2349        from test import support
2350
2351        class MiscTestCase(unittest.TestCase):
2352            def test__all__(self):
2353                support.check__all__(self, foo)
2354
2355        class OtherTestCase(unittest.TestCase):
2356            def test__all__(self):
2357                extra = {'BAR_CONST', 'FOO_CONST'}
2358                blacklist = {'baz'}  # Undocumented name.
2359                # bar imports part of its API from _bar.
2360                support.check__all__(self, bar, ('bar', '_bar'),
2361                                     extra=extra, blacklist=blacklist)
2362
2363    """
2364
2365    if name_of_module is None:
2366        name_of_module = (module.__name__, )
2367    elif isinstance(name_of_module, str):
2368        name_of_module = (name_of_module, )
2369
2370    expected = set(extra)
2371
2372    for name in dir(module):
2373        if name.startswith('_') or name in blacklist:
2374            continue
2375        obj = getattr(module, name)
2376        if (getattr(obj, '__module__', None) in name_of_module or
2377                (not hasattr(obj, '__module__') and
2378                 not isinstance(obj, types.ModuleType))):
2379            expected.add(name)
2380    test_case.assertCountEqual(module.__all__, expected)
2381
2382
2383class SuppressCrashReport:
2384    """Try to prevent a crash report from popping up.
2385
2386    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
2387    disable the creation of coredump file.
2388    """
2389    old_value = None
2390    old_modes = None
2391
2392    def __enter__(self):
2393        """On Windows, disable Windows Error Reporting dialogs using
2394        SetErrorMode.
2395
2396        On UNIX, try to save the previous core file size limit, then set
2397        soft limit to 0.
2398        """
2399        if sys.platform.startswith('win'):
2400            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2401            # GetErrorMode is not available on Windows XP and Windows Server 2003,
2402            # but SetErrorMode returns the previous value, so we can use that
2403            import ctypes
2404            self._k32 = ctypes.windll.kernel32
2405            SEM_NOGPFAULTERRORBOX = 0x02
2406            self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
2407            self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
2408
2409            # Suppress assert dialogs in debug builds
2410            # (see http://bugs.python.org/issue23314)
2411            try:
2412                import msvcrt
2413                msvcrt.CrtSetReportMode
2414            except (AttributeError, ImportError):
2415                # no msvcrt or a release build
2416                pass
2417            else:
2418                self.old_modes = {}
2419                for report_type in [msvcrt.CRT_WARN,
2420                                    msvcrt.CRT_ERROR,
2421                                    msvcrt.CRT_ASSERT]:
2422                    old_mode = msvcrt.CrtSetReportMode(report_type,
2423                            msvcrt.CRTDBG_MODE_FILE)
2424                    old_file = msvcrt.CrtSetReportFile(report_type,
2425                            msvcrt.CRTDBG_FILE_STDERR)
2426                    self.old_modes[report_type] = old_mode, old_file
2427
2428        else:
2429            if resource is not None:
2430                try:
2431                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2432                    resource.setrlimit(resource.RLIMIT_CORE,
2433                                       (0, self.old_value[1]))
2434                except (ValueError, OSError):
2435                    pass
2436            if sys.platform == 'darwin':
2437                # Check if the 'Crash Reporter' on OSX was configured
2438                # in 'Developer' mode and warn that it will get triggered
2439                # when it is.
2440                #
2441                # This assumes that this context manager is used in tests
2442                # that might trigger the next manager.
2443                value = subprocess.Popen(['/usr/bin/defaults', 'read',
2444                        'com.apple.CrashReporter', 'DialogType'],
2445                        stdout=subprocess.PIPE).communicate()[0]
2446                if value.strip() == b'developer':
2447                    print("this test triggers the Crash Reporter, "
2448                          "that is intentional", end='', flush=True)
2449
2450        return self
2451
2452    def __exit__(self, *ignore_exc):
2453        """Restore Windows ErrorMode or core file behavior to initial value."""
2454        if self.old_value is None:
2455            return
2456
2457        if sys.platform.startswith('win'):
2458            self._k32.SetErrorMode(self.old_value)
2459
2460            if self.old_modes:
2461                import msvcrt
2462                for report_type, (old_mode, old_file) in self.old_modes.items():
2463                    msvcrt.CrtSetReportMode(report_type, old_mode)
2464                    msvcrt.CrtSetReportFile(report_type, old_file)
2465        else:
2466            if resource is not None:
2467                try:
2468                    resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2469                except (ValueError, OSError):
2470                    pass
2471
2472
2473def patch(test_instance, object_to_patch, attr_name, new_value):
2474    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2475
2476    Also, add a cleanup procedure to 'test_instance' to restore
2477    'object_to_patch' value for 'attr_name'.
2478    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2479
2480    """
2481    # check that 'attr_name' is a real attribute for 'object_to_patch'
2482    # will raise AttributeError if it does not exist
2483    getattr(object_to_patch, attr_name)
2484
2485    # keep a copy of the old value
2486    attr_is_local = False
2487    try:
2488        old_value = object_to_patch.__dict__[attr_name]
2489    except (AttributeError, KeyError):
2490        old_value = getattr(object_to_patch, attr_name, None)
2491    else:
2492        attr_is_local = True
2493
2494    # restore the value when the test is done
2495    def cleanup():
2496        if attr_is_local:
2497            setattr(object_to_patch, attr_name, old_value)
2498        else:
2499            delattr(object_to_patch, attr_name)
2500
2501    test_instance.addCleanup(cleanup)
2502
2503    # actually override the attribute
2504    setattr(object_to_patch, attr_name, new_value)
2505
2506
2507def run_in_subinterp(code):
2508    """
2509    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2510    module is enabled.
2511    """
2512    # Issue #10915, #15751: PyGILState_*() functions don't work with
2513    # sub-interpreters, the tracemalloc module uses these functions internally
2514    try:
2515        import tracemalloc
2516    except ImportError:
2517        pass
2518    else:
2519        if tracemalloc.is_tracing():
2520            raise unittest.SkipTest("run_in_subinterp() cannot be used "
2521                                     "if tracemalloc module is tracing "
2522                                     "memory allocations")
2523    import _testcapi
2524    return _testcapi.run_in_subinterp(code)
2525
2526
2527def check_free_after_iterating(test, iter, cls, args=()):
2528    class A(cls):
2529        def __del__(self):
2530            nonlocal done
2531            done = True
2532            try:
2533                next(it)
2534            except StopIteration:
2535                pass
2536
2537    done = False
2538    it = iter(A(*args))
2539    # Issue 26494: Shouldn't crash
2540    test.assertRaises(StopIteration, next, it)
2541    # The sequence should be deallocated just after the end of iterating
2542    gc_collect()
2543    test.assertTrue(done)
2544
2545
2546def missing_compiler_executable(cmd_names=[]):
2547    """Check if the compiler components used to build the interpreter exist.
2548
2549    Check for the existence of the compiler executables whose names are listed
2550    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
2551    and return the first missing executable or None when none is found
2552    missing.
2553
2554    """
2555    from distutils import ccompiler, sysconfig, spawn
2556    compiler = ccompiler.new_compiler()
2557    sysconfig.customize_compiler(compiler)
2558    for name in compiler.executables:
2559        if cmd_names and name not in cmd_names:
2560            continue
2561        cmd = getattr(compiler, name)
2562        if cmd_names:
2563            assert cmd is not None, \
2564                    "the '%s' executable is not configured" % name
2565        elif cmd is None:
2566            continue
2567        if spawn.find_executable(cmd[0]) is None:
2568            return cmd[0]
2569
2570
2571_is_android_emulator = None
2572def setswitchinterval(interval):
2573    # Setting a very low gil interval on the Android emulator causes python
2574    # to hang (issue #26939).
2575    minimum_interval = 1e-5
2576    if is_android and interval < minimum_interval:
2577        global _is_android_emulator
2578        if _is_android_emulator is None:
2579            _is_android_emulator = (subprocess.check_output(
2580                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
2581        if _is_android_emulator:
2582            interval = minimum_interval
2583    return sys.setswitchinterval(interval)
2584