• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import collections.abc
7import contextlib
8import datetime
9import errno
10import faulthandler
11import fnmatch
12import functools
13import gc
14import importlib
15import importlib.util
16import io
17import logging.handlers
18import nntplib
19import os
20import platform
21import re
22import shutil
23import socket
24import stat
25import struct
26import subprocess
27import sys
28import sysconfig
29import tempfile
30import _thread
31import threading
32import time
33import types
34import unittest
35import urllib.error
36import warnings
37
38from .testresult import get_test_runner
39
40try:
41    import multiprocessing.process
42except ImportError:
43    multiprocessing = None
44
45try:
46    import zlib
47except ImportError:
48    zlib = None
49
50try:
51    import gzip
52except ImportError:
53    gzip = None
54
55try:
56    import bz2
57except ImportError:
58    bz2 = None
59
60try:
61    import lzma
62except ImportError:
63    lzma = None
64
65try:
66    import resource
67except ImportError:
68    resource = None
69
70__all__ = [
71    # globals
72    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
73    # exceptions
74    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
75    # imports
76    "import_module", "import_fresh_module", "CleanImport",
77    # modules
78    "unload", "forget",
79    # io
80    "record_original_stdout", "get_original_stdout", "captured_stdout",
81    "captured_stdin", "captured_stderr",
82    # filesystem
83    "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile",
84    "create_empty_file", "can_symlink", "fs_is_case_insensitive",
85    # unittest
86    "is_resource_enabled", "requires", "requires_freebsd_version",
87    "requires_linux_version", "requires_mac_ver", "check_syntax_error",
88    "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset",
89    "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest",
90    "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma",
91    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
92    "requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
93    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
94    "check__all__", "skip_unless_bind_unix_socket",
95    # sys
96    "is_jython", "is_android", "check_impl_detail", "unix_shell",
97    "setswitchinterval",
98    # network
99    "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource",
100    "bind_unix_socket",
101    # processes
102    'temp_umask', "reap_children",
103    # logging
104    "TestHandler",
105    # threads
106    "threading_setup", "threading_cleanup", "reap_threads", "start_threads",
107    # miscellaneous
108    "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard",
109    "run_with_locale", "swap_item",
110    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
111    "run_with_tz", "PGO", "missing_compiler_executable", "fd_count",
112    ]
113
114class Error(Exception):
115    """Base class for regression test exceptions."""
116
117class TestFailed(Error):
118    """Test failed."""
119
120class TestDidNotRun(Error):
121    """Test did not run any subtests."""
122
123class ResourceDenied(unittest.SkipTest):
124    """Test skipped because it requested a disallowed resource.
125
126    This is raised when a test calls requires() for a resource that
127    has not be enabled.  It is used to distinguish between expected
128    and unexpected skips.
129    """
130
131@contextlib.contextmanager
132def _ignore_deprecated_imports(ignore=True):
133    """Context manager to suppress package and module deprecation
134    warnings when importing them.
135
136    If ignore is False, this context manager has no effect.
137    """
138    if ignore:
139        with warnings.catch_warnings():
140            warnings.filterwarnings("ignore", ".+ (module|package)",
141                                    DeprecationWarning)
142            yield
143    else:
144        yield
145
146
147def import_module(name, deprecated=False, *, required_on=()):
148    """Import and return the module to be tested, raising SkipTest if
149    it is not available.
150
151    If deprecated is True, any module or package deprecation messages
152    will be suppressed. If a module is required on a platform but optional for
153    others, set required_on to an iterable of platform prefixes which will be
154    compared against sys.platform.
155    """
156    with _ignore_deprecated_imports(deprecated):
157        try:
158            return importlib.import_module(name)
159        except ImportError as msg:
160            if sys.platform.startswith(tuple(required_on)):
161                raise
162            raise unittest.SkipTest(str(msg))
163
164
165def _save_and_remove_module(name, orig_modules):
166    """Helper function to save and remove a module from sys.modules
167
168    Raise ImportError if the module can't be imported.
169    """
170    # try to import the module and raise an error if it can't be imported
171    if name not in sys.modules:
172        __import__(name)
173        del sys.modules[name]
174    for modname in list(sys.modules):
175        if modname == name or modname.startswith(name + '.'):
176            orig_modules[modname] = sys.modules[modname]
177            del sys.modules[modname]
178
179def _save_and_block_module(name, orig_modules):
180    """Helper function to save and block a module in sys.modules
181
182    Return True if the module was in sys.modules, False otherwise.
183    """
184    saved = True
185    try:
186        orig_modules[name] = sys.modules[name]
187    except KeyError:
188        saved = False
189    sys.modules[name] = None
190    return saved
191
192
193def anticipate_failure(condition):
194    """Decorator to mark a test that is known to be broken in some cases
195
196       Any use of this decorator should have a comment identifying the
197       associated tracker issue.
198    """
199    if condition:
200        return unittest.expectedFailure
201    return lambda f: f
202
203def load_package_tests(pkg_dir, loader, standard_tests, pattern):
204    """Generic load_tests implementation for simple test packages.
205
206    Most packages can implement load_tests using this function as follows:
207
208       def load_tests(*args):
209           return load_package_tests(os.path.dirname(__file__), *args)
210    """
211    if pattern is None:
212        pattern = "test*"
213    top_dir = os.path.dirname(              # Lib
214                  os.path.dirname(              # test
215                      os.path.dirname(__file__)))   # support
216    package_tests = loader.discover(start_dir=pkg_dir,
217                                    top_level_dir=top_dir,
218                                    pattern=pattern)
219    standard_tests.addTests(package_tests)
220    return standard_tests
221
222
223def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
224    """Import and return a module, deliberately bypassing sys.modules.
225
226    This function imports and returns a fresh copy of the named Python module
227    by removing the named module from sys.modules before doing the import.
228    Note that unlike reload, the original module is not affected by
229    this operation.
230
231    *fresh* is an iterable of additional module names that are also removed
232    from the sys.modules cache before doing the import.
233
234    *blocked* is an iterable of module names that are replaced with None
235    in the module cache during the import to ensure that attempts to import
236    them raise ImportError.
237
238    The named module and any modules named in the *fresh* and *blocked*
239    parameters are saved before starting the import and then reinserted into
240    sys.modules when the fresh import is complete.
241
242    Module and package deprecation messages are suppressed during this import
243    if *deprecated* is True.
244
245    This function will raise ImportError if the named module cannot be
246    imported.
247    """
248    # NOTE: test_heapq, test_json and test_warnings include extra sanity checks
249    # to make sure that this utility function is working as expected
250    with _ignore_deprecated_imports(deprecated):
251        # Keep track of modules saved for later restoration as well
252        # as those which just need a blocking entry removed
253        orig_modules = {}
254        names_to_remove = []
255        _save_and_remove_module(name, orig_modules)
256        try:
257            for fresh_name in fresh:
258                _save_and_remove_module(fresh_name, orig_modules)
259            for blocked_name in blocked:
260                if not _save_and_block_module(blocked_name, orig_modules):
261                    names_to_remove.append(blocked_name)
262            fresh_module = importlib.import_module(name)
263        except ImportError:
264            fresh_module = None
265        finally:
266            for orig_name, module in orig_modules.items():
267                sys.modules[orig_name] = module
268            for name_to_remove in names_to_remove:
269                del sys.modules[name_to_remove]
270        return fresh_module
271
272
273def get_attribute(obj, name):
274    """Get an attribute, raising SkipTest if AttributeError is raised."""
275    try:
276        attribute = getattr(obj, name)
277    except AttributeError:
278        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
279    else:
280        return attribute
281
282verbose = 1              # Flag set to 0 by regrtest.py
283use_resources = None     # Flag set to [] by regrtest.py
284max_memuse = 0           # Disable bigmem tests (they will still be run with
285                         # small sizes, to make sure they work.)
286real_max_memuse = 0
287junit_xml_list = None    # list of testsuite XML elements
288failfast = False
289
290# _original_stdout is meant to hold stdout at the time regrtest began.
291# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
292# The point is to have some flavor of stdout the user can actually see.
293_original_stdout = None
294def record_original_stdout(stdout):
295    global _original_stdout
296    _original_stdout = stdout
297
298def get_original_stdout():
299    return _original_stdout or sys.stdout
300
301def unload(name):
302    try:
303        del sys.modules[name]
304    except KeyError:
305        pass
306
307def _force_run(path, func, *args):
308    try:
309        return func(*args)
310    except OSError as err:
311        if verbose >= 2:
312            print('%s: %s' % (err.__class__.__name__, err))
313            print('re-run %s%r' % (func.__name__, args))
314        os.chmod(path, stat.S_IRWXU)
315        return func(*args)
316
317if sys.platform.startswith("win"):
318    def _waitfor(func, pathname, waitall=False):
319        # Perform the operation
320        func(pathname)
321        # Now setup the wait loop
322        if waitall:
323            dirname = pathname
324        else:
325            dirname, name = os.path.split(pathname)
326            dirname = dirname or '.'
327        # Check for `pathname` to be removed from the filesystem.
328        # The exponential backoff of the timeout amounts to a total
329        # of ~1 second after which the deletion is probably an error
330        # anyway.
331        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
332        # required when contention occurs.
333        timeout = 0.001
334        while timeout < 1.0:
335            # Note we are only testing for the existence of the file(s) in
336            # the contents of the directory regardless of any security or
337            # access rights.  If we have made it this far, we have sufficient
338            # permissions to do that much using Python's equivalent of the
339            # Windows API FindFirstFile.
340            # Other Windows APIs can fail or give incorrect results when
341            # dealing with files that are pending deletion.
342            L = os.listdir(dirname)
343            if not (L if waitall else name in L):
344                return
345            # Increase the timeout and try again
346            time.sleep(timeout)
347            timeout *= 2
348        warnings.warn('tests may fail, delete still pending for ' + pathname,
349                      RuntimeWarning, stacklevel=4)
350
351    def _unlink(filename):
352        _waitfor(os.unlink, filename)
353
354    def _rmdir(dirname):
355        _waitfor(os.rmdir, dirname)
356
357    def _rmtree(path):
358        def _rmtree_inner(path):
359            for name in _force_run(path, os.listdir, path):
360                fullname = os.path.join(path, name)
361                try:
362                    mode = os.lstat(fullname).st_mode
363                except OSError as exc:
364                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
365                          file=sys.__stderr__)
366                    mode = 0
367                if stat.S_ISDIR(mode):
368                    _waitfor(_rmtree_inner, fullname, waitall=True)
369                    _force_run(fullname, os.rmdir, fullname)
370                else:
371                    _force_run(fullname, os.unlink, fullname)
372        _waitfor(_rmtree_inner, path, waitall=True)
373        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
374
375    def _longpath(path):
376        try:
377            import ctypes
378        except ImportError:
379            # No ctypes means we can't expands paths.
380            pass
381        else:
382            buffer = ctypes.create_unicode_buffer(len(path) * 2)
383            length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
384                                                             len(buffer))
385            if length:
386                return buffer[:length]
387        return path
388else:
389    _unlink = os.unlink
390    _rmdir = os.rmdir
391
392    def _rmtree(path):
393        try:
394            shutil.rmtree(path)
395            return
396        except OSError:
397            pass
398
399        def _rmtree_inner(path):
400            for name in _force_run(path, os.listdir, path):
401                fullname = os.path.join(path, name)
402                try:
403                    mode = os.lstat(fullname).st_mode
404                except OSError:
405                    mode = 0
406                if stat.S_ISDIR(mode):
407                    _rmtree_inner(fullname)
408                    _force_run(path, os.rmdir, fullname)
409                else:
410                    _force_run(path, os.unlink, fullname)
411        _rmtree_inner(path)
412        os.rmdir(path)
413
414    def _longpath(path):
415        return path
416
417def unlink(filename):
418    try:
419        _unlink(filename)
420    except (FileNotFoundError, NotADirectoryError):
421        pass
422
423def rmdir(dirname):
424    try:
425        _rmdir(dirname)
426    except FileNotFoundError:
427        pass
428
429def rmtree(path):
430    try:
431        _rmtree(path)
432    except FileNotFoundError:
433        pass
434
435def make_legacy_pyc(source):
436    """Move a PEP 3147/488 pyc file to its legacy pyc location.
437
438    :param source: The file system path to the source file.  The source file
439        does not need to exist, however the PEP 3147/488 pyc file must exist.
440    :return: The file system path to the legacy pyc file.
441    """
442    pyc_file = importlib.util.cache_from_source(source)
443    up_one = os.path.dirname(os.path.abspath(source))
444    legacy_pyc = os.path.join(up_one, source + 'c')
445    os.rename(pyc_file, legacy_pyc)
446    return legacy_pyc
447
448def forget(modname):
449    """'Forget' a module was ever imported.
450
451    This removes the module from sys.modules and deletes any PEP 3147/488 or
452    legacy .pyc files.
453    """
454    unload(modname)
455    for dirname in sys.path:
456        source = os.path.join(dirname, modname + '.py')
457        # It doesn't matter if they exist or not, unlink all possible
458        # combinations of PEP 3147/488 and legacy pyc files.
459        unlink(source + 'c')
460        for opt in ('', 1, 2):
461            unlink(importlib.util.cache_from_source(source, optimization=opt))
462
463# Check whether a gui is actually available
464def _is_gui_available():
465    if hasattr(_is_gui_available, 'result'):
466        return _is_gui_available.result
467    reason = None
468    if sys.platform.startswith('win'):
469        # if Python is running as a service (such as the buildbot service),
470        # gui interaction may be disallowed
471        import ctypes
472        import ctypes.wintypes
473        UOI_FLAGS = 1
474        WSF_VISIBLE = 0x0001
475        class USEROBJECTFLAGS(ctypes.Structure):
476            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
477                        ("fReserved", ctypes.wintypes.BOOL),
478                        ("dwFlags", ctypes.wintypes.DWORD)]
479        dll = ctypes.windll.user32
480        h = dll.GetProcessWindowStation()
481        if not h:
482            raise ctypes.WinError()
483        uof = USEROBJECTFLAGS()
484        needed = ctypes.wintypes.DWORD()
485        res = dll.GetUserObjectInformationW(h,
486            UOI_FLAGS,
487            ctypes.byref(uof),
488            ctypes.sizeof(uof),
489            ctypes.byref(needed))
490        if not res:
491            raise ctypes.WinError()
492        if not bool(uof.dwFlags & WSF_VISIBLE):
493            reason = "gui not available (WSF_VISIBLE flag not set)"
494    elif sys.platform == 'darwin':
495        # The Aqua Tk implementations on OS X can abort the process if
496        # being called in an environment where a window server connection
497        # cannot be made, for instance when invoked by a buildbot or ssh
498        # process not running under the same user id as the current console
499        # user.  To avoid that, raise an exception if the window manager
500        # connection is not available.
501        from ctypes import cdll, c_int, pointer, Structure
502        from ctypes.util import find_library
503
504        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
505
506        if app_services.CGMainDisplayID() == 0:
507            reason = "gui tests cannot run without OS X window manager"
508        else:
509            class ProcessSerialNumber(Structure):
510                _fields_ = [("highLongOfPSN", c_int),
511                            ("lowLongOfPSN", c_int)]
512            psn = ProcessSerialNumber()
513            psn_p = pointer(psn)
514            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
515                  (app_services.SetFrontProcess(psn_p) < 0) ):
516                reason = "cannot run without OS X gui process"
517
518    # check on every platform whether tkinter can actually do anything
519    if not reason:
520        try:
521            from tkinter import Tk
522            root = Tk()
523            root.withdraw()
524            root.update()
525            root.destroy()
526        except Exception as e:
527            err_string = str(e)
528            if len(err_string) > 50:
529                err_string = err_string[:50] + ' [...]'
530            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
531                                                           err_string)
532
533    _is_gui_available.reason = reason
534    _is_gui_available.result = not reason
535
536    return _is_gui_available.result
537
538def is_resource_enabled(resource):
539    """Test whether a resource is enabled.
540
541    Known resources are set by regrtest.py.  If not running under regrtest.py,
542    all resources are assumed enabled unless use_resources has been set.
543    """
544    return use_resources is None or resource in use_resources
545
546def requires(resource, msg=None):
547    """Raise ResourceDenied if the specified resource is not available."""
548    if not is_resource_enabled(resource):
549        if msg is None:
550            msg = "Use of the %r resource not enabled" % resource
551        raise ResourceDenied(msg)
552    if resource == 'gui' and not _is_gui_available():
553        raise ResourceDenied(_is_gui_available.reason)
554
555def _requires_unix_version(sysname, min_version):
556    """Decorator raising SkipTest if the OS is `sysname` and the version is less
557    than `min_version`.
558
559    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
560    the FreeBSD version is less than 7.2.
561    """
562    def decorator(func):
563        @functools.wraps(func)
564        def wrapper(*args, **kw):
565            if platform.system() == sysname:
566                version_txt = platform.release().split('-', 1)[0]
567                try:
568                    version = tuple(map(int, version_txt.split('.')))
569                except ValueError:
570                    pass
571                else:
572                    if version < min_version:
573                        min_version_txt = '.'.join(map(str, min_version))
574                        raise unittest.SkipTest(
575                            "%s version %s or higher required, not %s"
576                            % (sysname, min_version_txt, version_txt))
577            return func(*args, **kw)
578        wrapper.min_version = min_version
579        return wrapper
580    return decorator
581
582def requires_freebsd_version(*min_version):
583    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
584    less than `min_version`.
585
586    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
587    version is less than 7.2.
588    """
589    return _requires_unix_version('FreeBSD', min_version)
590
591def requires_linux_version(*min_version):
592    """Decorator raising SkipTest if the OS is Linux and the Linux version is
593    less than `min_version`.
594
595    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
596    version is less than 2.6.32.
597    """
598    return _requires_unix_version('Linux', min_version)
599
600def requires_mac_ver(*min_version):
601    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
602    version if less than min_version.
603
604    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
605    is lesser than 10.5.
606    """
607    def decorator(func):
608        @functools.wraps(func)
609        def wrapper(*args, **kw):
610            if sys.platform == 'darwin':
611                version_txt = platform.mac_ver()[0]
612                try:
613                    version = tuple(map(int, version_txt.split('.')))
614                except ValueError:
615                    pass
616                else:
617                    if version < min_version:
618                        min_version_txt = '.'.join(map(str, min_version))
619                        raise unittest.SkipTest(
620                            "Mac OS X %s or higher required, not %s"
621                            % (min_version_txt, version_txt))
622            return func(*args, **kw)
623        wrapper.min_version = min_version
624        return wrapper
625    return decorator
626
627
628HOST = "localhost"
629HOSTv4 = "127.0.0.1"
630HOSTv6 = "::1"
631
632
633def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
634    """Returns an unused port that should be suitable for binding.  This is
635    achieved by creating a temporary socket with the same family and type as
636    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
637    the specified host address (defaults to 0.0.0.0) with the port set to 0,
638    eliciting an unused ephemeral port from the OS.  The temporary socket is
639    then closed and deleted, and the ephemeral port is returned.
640
641    Either this method or bind_port() should be used for any tests where a
642    server socket needs to be bound to a particular port for the duration of
643    the test.  Which one to use depends on whether the calling code is creating
644    a python socket, or if an unused port needs to be provided in a constructor
645    or passed to an external program (i.e. the -accept argument to openssl's
646    s_server mode).  Always prefer bind_port() over find_unused_port() where
647    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
648    socket is bound to a hard coded port, the ability to run multiple instances
649    of the test simultaneously on the same host is compromised, which makes the
650    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
651    may simply manifest as a failed test, which can be recovered from without
652    intervention in most cases, but on Windows, the entire python process can
653    completely and utterly wedge, requiring someone to log in to the buildbot
654    and manually kill the affected process.
655
656    (This is easy to reproduce on Windows, unfortunately, and can be traced to
657    the SO_REUSEADDR socket option having different semantics on Windows versus
658    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
659    listen and then accept connections on identical host/ports.  An EADDRINUSE
660    OSError will be raised at some point (depending on the platform and
661    the order bind and listen were called on each socket).
662
663    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
664    will ever be raised when attempting to bind two identical host/ports. When
665    accept() is called on each socket, the second caller's process will steal
666    the port from the first caller, leaving them both in an awkwardly wedged
667    state where they'll no longer respond to any signals or graceful kills, and
668    must be forcibly killed via OpenProcess()/TerminateProcess().
669
670    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
671    instead of SO_REUSEADDR, which effectively affords the same semantics as
672    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
673    Source world compared to Windows ones, this is a common mistake.  A quick
674    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
675    openssl.exe is called with the 's_server' option, for example. See
676    http://bugs.python.org/issue2550 for more info.  The following site also
677    has a very thorough description about the implications of both REUSEADDR
678    and EXCLUSIVEADDRUSE on Windows:
679    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
680
681    XXX: although this approach is a vast improvement on previous attempts to
682    elicit unused ports, it rests heavily on the assumption that the ephemeral
683    port returned to us by the OS won't immediately be dished back out to some
684    other process when we close and delete our temporary socket but before our
685    calling code has a chance to bind the returned port.  We can deal with this
686    issue if/when we come across it.
687    """
688
689    tempsock = socket.socket(family, socktype)
690    port = bind_port(tempsock)
691    tempsock.close()
692    del tempsock
693    return port
694
695def bind_port(sock, host=HOST):
696    """Bind the socket to a free port and return the port number.  Relies on
697    ephemeral ports in order to ensure we are using an unbound port.  This is
698    important as many tests may be running simultaneously, especially in a
699    buildbot environment.  This method raises an exception if the sock.family
700    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
701    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
702    for TCP/IP sockets.  The only case for setting these options is testing
703    multicasting via multiple UDP sockets.
704
705    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
706    on Windows), it will be set on the socket.  This will prevent anyone else
707    from bind()'ing to our host/port for the duration of the test.
708    """
709
710    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
711        if hasattr(socket, 'SO_REUSEADDR'):
712            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
713                raise TestFailed("tests should never set the SO_REUSEADDR "   \
714                                 "socket option on TCP/IP sockets!")
715        if hasattr(socket, 'SO_REUSEPORT'):
716            try:
717                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
718                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
719                                     "socket option on TCP/IP sockets!")
720            except OSError:
721                # Python's socket module was compiled using modern headers
722                # thus defining SO_REUSEPORT but this process is running
723                # under an older kernel that does not support SO_REUSEPORT.
724                pass
725        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
726            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
727
728    sock.bind((host, 0))
729    port = sock.getsockname()[1]
730    return port
731
732def bind_unix_socket(sock, addr):
733    """Bind a unix socket, raising SkipTest if PermissionError is raised."""
734    assert sock.family == socket.AF_UNIX
735    try:
736        sock.bind(addr)
737    except PermissionError:
738        sock.close()
739        raise unittest.SkipTest('cannot bind AF_UNIX sockets')
740
741def _is_ipv6_enabled():
742    """Check whether IPv6 is enabled on this host."""
743    if socket.has_ipv6:
744        sock = None
745        try:
746            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
747            sock.bind((HOSTv6, 0))
748            return True
749        except OSError:
750            pass
751        finally:
752            if sock:
753                sock.close()
754    return False
755
756IPV6_ENABLED = _is_ipv6_enabled()
757
758def system_must_validate_cert(f):
759    """Skip the test on TLS certificate validation failures."""
760    @functools.wraps(f)
761    def dec(*args, **kwargs):
762        try:
763            f(*args, **kwargs)
764        except OSError as e:
765            if "CERTIFICATE_VERIFY_FAILED" in str(e):
766                raise unittest.SkipTest("system does not contain "
767                                        "necessary certificates")
768            raise
769    return dec
770
771# A constant likely larger than the underlying OS pipe buffer size, to
772# make writes blocking.
773# Windows limit seems to be around 512 B, and many Unix kernels have a
774# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
775# (see issue #17835 for a discussion of this number).
776PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
777
778# A constant likely larger than the underlying OS socket buffer size, to make
779# writes blocking.
780# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
781# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
782# for a discussion of this number).
783SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
784
785# decorator for skipping tests on non-IEEE 754 platforms
786requires_IEEE_754 = unittest.skipUnless(
787    float.__getformat__("double").startswith("IEEE"),
788    "test requires IEEE 754 doubles")
789
790requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
791
792requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
793
794requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
795
796requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
797
798is_jython = sys.platform.startswith('java')
799
800is_android = hasattr(sys, 'getandroidapilevel')
801
802if sys.platform != 'win32':
803    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
804else:
805    unix_shell = None
806
807# Filename used for testing
808if os.name == 'java':
809    # Jython disallows @ in module names
810    TESTFN = '$test'
811else:
812    TESTFN = '@test'
813
814# Disambiguate TESTFN for parallel testing, while letting it remain a valid
815# module name.
816TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
817
818# Define the URL of a dedicated HTTP server for the network tests.
819# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
820TEST_HTTP_URL = "http://www.pythontest.net"
821
822# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
823# or None if there is no such character.
824FS_NONASCII = None
825for character in (
826    # First try printable and common characters to have a readable filename.
827    # For each character, the encoding list are just example of encodings able
828    # to encode the character (the list is not exhaustive).
829
830    # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
831    '\u00E6',
832    # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
833    '\u0130',
834    # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
835    '\u0141',
836    # U+03C6 (Greek Small Letter Phi): cp1253
837    '\u03C6',
838    # U+041A (Cyrillic Capital Letter Ka): cp1251
839    '\u041A',
840    # U+05D0 (Hebrew Letter Alef): Encodable to cp424
841    '\u05D0',
842    # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
843    '\u060C',
844    # U+062A (Arabic Letter Teh): cp720
845    '\u062A',
846    # U+0E01 (Thai Character Ko Kai): cp874
847    '\u0E01',
848
849    # Then try more "special" characters. "special" because they may be
850    # interpreted or displayed differently depending on the exact locale
851    # encoding and the font.
852
853    # U+00A0 (No-Break Space)
854    '\u00A0',
855    # U+20AC (Euro Sign)
856    '\u20AC',
857):
858    try:
859        # If Python is set up to use the legacy 'mbcs' in Windows,
860        # 'replace' error mode is used, and encode() returns b'?'
861        # for characters missing in the ANSI codepage
862        if os.fsdecode(os.fsencode(character)) != character:
863            raise UnicodeError
864    except UnicodeError:
865        pass
866    else:
867        FS_NONASCII = character
868        break
869
870# TESTFN_UNICODE is a non-ascii filename
871TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f"
872if sys.platform == 'darwin':
873    # In Mac OS X's VFS API file names are, by definition, canonically
874    # decomposed Unicode, encoded using UTF-8. See QA1173:
875    # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
876    import unicodedata
877    TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
878TESTFN_ENCODING = sys.getfilesystemencoding()
879
880# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
881# encoded by the filesystem encoding (in strict mode). It can be None if we
882# cannot generate such filename.
883TESTFN_UNENCODABLE = None
884if os.name == 'nt':
885    # skip win32s (0) or Windows 9x/ME (1)
886    if sys.getwindowsversion().platform >= 2:
887        # Different kinds of characters from various languages to minimize the
888        # probability that the whole name is encodable to MBCS (issue #9819)
889        TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80"
890        try:
891            TESTFN_UNENCODABLE.encode(TESTFN_ENCODING)
892        except UnicodeEncodeError:
893            pass
894        else:
895            print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). '
896                  'Unicode filename tests may not be effective'
897                  % (TESTFN_UNENCODABLE, TESTFN_ENCODING))
898            TESTFN_UNENCODABLE = None
899# Mac OS X denies unencodable filenames (invalid utf-8)
900elif sys.platform != 'darwin':
901    try:
902        # ascii and utf-8 cannot encode the byte 0xff
903        b'\xff'.decode(TESTFN_ENCODING)
904    except UnicodeDecodeError:
905        # 0xff will be encoded using the surrogate character u+DCFF
906        TESTFN_UNENCODABLE = TESTFN \
907            + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape')
908    else:
909        # File system encoding (eg. ISO-8859-* encodings) can encode
910        # the byte 0xff. Skip some unicode filename tests.
911        pass
912
913# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
914# decoded from the filesystem encoding (in strict mode). It can be None if we
915# cannot generate such filename (ex: the latin1 encoding can decode any byte
916# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
917# to the surrogateescape error handler (PEP 383), but not from the filesystem
918# encoding in strict mode.
919TESTFN_UNDECODABLE = None
920for name in (
921    # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
922    # accepts it to create a file or a directory, or don't accept to enter to
923    # such directory (when the bytes name is used). So test b'\xe7' first: it is
924    # not decodable from cp932.
925    b'\xe7w\xf0',
926    # undecodable from ASCII, UTF-8
927    b'\xff',
928    # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
929    # and cp857
930    b'\xae\xd5'
931    # undecodable from UTF-8 (UNIX and Mac OS X)
932    b'\xed\xb2\x80', b'\xed\xb4\x80',
933    # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
934    # cp1253, cp1254, cp1255, cp1257, cp1258
935    b'\x81\x98',
936):
937    try:
938        name.decode(TESTFN_ENCODING)
939    except UnicodeDecodeError:
940        TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name
941        break
942
943if FS_NONASCII:
944    TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII
945else:
946    TESTFN_NONASCII = None
947
948# Save the initial cwd
949SAVEDCWD = os.getcwd()
950
951# Set by libregrtest/main.py so we can skip tests that are not
952# useful for PGO
953PGO = False
954
955@contextlib.contextmanager
956def temp_dir(path=None, quiet=False):
957    """Return a context manager that creates a temporary directory.
958
959    Arguments:
960
961      path: the directory to create temporarily.  If omitted or None,
962        defaults to creating a temporary directory using tempfile.mkdtemp.
963
964      quiet: if False (the default), the context manager raises an exception
965        on error.  Otherwise, if the path is specified and cannot be
966        created, only a warning is issued.
967
968    """
969    dir_created = False
970    if path is None:
971        path = tempfile.mkdtemp()
972        dir_created = True
973        path = os.path.realpath(path)
974    else:
975        try:
976            os.mkdir(path)
977            dir_created = True
978        except OSError as exc:
979            if not quiet:
980                raise
981            warnings.warn(f'tests may fail, unable to create '
982                          f'temporary directory {path!r}: {exc}',
983                          RuntimeWarning, stacklevel=3)
984    if dir_created:
985        pid = os.getpid()
986    try:
987        yield path
988    finally:
989        # In case the process forks, let only the parent remove the
990        # directory. The child has a diffent process id. (bpo-30028)
991        if dir_created and pid == os.getpid():
992            rmtree(path)
993
994@contextlib.contextmanager
995def change_cwd(path, quiet=False):
996    """Return a context manager that changes the current working directory.
997
998    Arguments:
999
1000      path: the directory to use as the temporary current working directory.
1001
1002      quiet: if False (the default), the context manager raises an exception
1003        on error.  Otherwise, it issues only a warning and keeps the current
1004        working directory the same.
1005
1006    """
1007    saved_dir = os.getcwd()
1008    try:
1009        os.chdir(path)
1010    except OSError as exc:
1011        if not quiet:
1012            raise
1013        warnings.warn(f'tests may fail, unable to change the current working '
1014                      f'directory to {path!r}: {exc}',
1015                      RuntimeWarning, stacklevel=3)
1016    try:
1017        yield os.getcwd()
1018    finally:
1019        os.chdir(saved_dir)
1020
1021
1022@contextlib.contextmanager
1023def temp_cwd(name='tempcwd', quiet=False):
1024    """
1025    Context manager that temporarily creates and changes the CWD.
1026
1027    The function temporarily changes the current working directory
1028    after creating a temporary directory in the current directory with
1029    name *name*.  If *name* is None, the temporary directory is
1030    created using tempfile.mkdtemp.
1031
1032    If *quiet* is False (default) and it is not possible to
1033    create or change the CWD, an error is raised.  If *quiet* is True,
1034    only a warning is raised and the original CWD is used.
1035
1036    """
1037    with temp_dir(path=name, quiet=quiet) as temp_path:
1038        with change_cwd(temp_path, quiet=quiet) as cwd_dir:
1039            yield cwd_dir
1040
1041if hasattr(os, "umask"):
1042    @contextlib.contextmanager
1043    def temp_umask(umask):
1044        """Context manager that temporarily sets the process umask."""
1045        oldmask = os.umask(umask)
1046        try:
1047            yield
1048        finally:
1049            os.umask(oldmask)
1050
1051# TEST_HOME_DIR refers to the top level directory of the "test" package
1052# that contains Python's regression test suite
1053TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
1054TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
1055
1056# TEST_DATA_DIR is used as a target download location for remote resources
1057TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
1058
1059def findfile(filename, subdir=None):
1060    """Try to find a file on sys.path or in the test directory.  If it is not
1061    found the argument passed to the function is returned (this does not
1062    necessarily signal failure; could still be the legitimate path).
1063
1064    Setting *subdir* indicates a relative path to use to find the file
1065    rather than looking directly in the path directories.
1066    """
1067    if os.path.isabs(filename):
1068        return filename
1069    if subdir is not None:
1070        filename = os.path.join(subdir, filename)
1071    path = [TEST_HOME_DIR] + sys.path
1072    for dn in path:
1073        fn = os.path.join(dn, filename)
1074        if os.path.exists(fn): return fn
1075    return filename
1076
1077def create_empty_file(filename):
1078    """Create an empty file. If the file already exists, truncate it."""
1079    fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
1080    os.close(fd)
1081
1082def sortdict(dict):
1083    "Like repr(dict), but in sorted order."
1084    items = sorted(dict.items())
1085    reprpairs = ["%r: %r" % pair for pair in items]
1086    withcommas = ", ".join(reprpairs)
1087    return "{%s}" % withcommas
1088
1089def make_bad_fd():
1090    """
1091    Create an invalid file descriptor by opening and closing a file and return
1092    its fd.
1093    """
1094    file = open(TESTFN, "wb")
1095    try:
1096        return file.fileno()
1097    finally:
1098        file.close()
1099        unlink(TESTFN)
1100
1101def check_syntax_error(testcase, statement, *, lineno=None, offset=None):
1102    with testcase.assertRaises(SyntaxError) as cm:
1103        compile(statement, '<test string>', 'exec')
1104    err = cm.exception
1105    testcase.assertIsNotNone(err.lineno)
1106    if lineno is not None:
1107        testcase.assertEqual(err.lineno, lineno)
1108    testcase.assertIsNotNone(err.offset)
1109    if offset is not None:
1110        testcase.assertEqual(err.offset, offset)
1111
1112def open_urlresource(url, *args, **kw):
1113    import urllib.request, urllib.parse
1114
1115    check = kw.pop('check', None)
1116
1117    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
1118
1119    fn = os.path.join(TEST_DATA_DIR, filename)
1120
1121    def check_valid_file(fn):
1122        f = open(fn, *args, **kw)
1123        if check is None:
1124            return f
1125        elif check(f):
1126            f.seek(0)
1127            return f
1128        f.close()
1129
1130    if os.path.exists(fn):
1131        f = check_valid_file(fn)
1132        if f is not None:
1133            return f
1134        unlink(fn)
1135
1136    # Verify the requirement before downloading the file
1137    requires('urlfetch')
1138
1139    if verbose:
1140        print('\tfetching %s ...' % url, file=get_original_stdout())
1141    opener = urllib.request.build_opener()
1142    if gzip:
1143        opener.addheaders.append(('Accept-Encoding', 'gzip'))
1144    f = opener.open(url, timeout=15)
1145    if gzip and f.headers.get('Content-Encoding') == 'gzip':
1146        f = gzip.GzipFile(fileobj=f)
1147    try:
1148        with open(fn, "wb") as out:
1149            s = f.read()
1150            while s:
1151                out.write(s)
1152                s = f.read()
1153    finally:
1154        f.close()
1155
1156    f = check_valid_file(fn)
1157    if f is not None:
1158        return f
1159    raise TestFailed('invalid resource %r' % fn)
1160
1161
1162class WarningsRecorder(object):
1163    """Convenience wrapper for the warnings list returned on
1164       entry to the warnings.catch_warnings() context manager.
1165    """
1166    def __init__(self, warnings_list):
1167        self._warnings = warnings_list
1168        self._last = 0
1169
1170    def __getattr__(self, attr):
1171        if len(self._warnings) > self._last:
1172            return getattr(self._warnings[-1], attr)
1173        elif attr in warnings.WarningMessage._WARNING_DETAILS:
1174            return None
1175        raise AttributeError("%r has no attribute %r" % (self, attr))
1176
1177    @property
1178    def warnings(self):
1179        return self._warnings[self._last:]
1180
1181    def reset(self):
1182        self._last = len(self._warnings)
1183
1184
1185def _filterwarnings(filters, quiet=False):
1186    """Catch the warnings, then check if all the expected
1187    warnings have been raised and re-raise unexpected warnings.
1188    If 'quiet' is True, only re-raise the unexpected warnings.
1189    """
1190    # Clear the warning registry of the calling module
1191    # in order to re-raise the warnings.
1192    frame = sys._getframe(2)
1193    registry = frame.f_globals.get('__warningregistry__')
1194    if registry:
1195        registry.clear()
1196    with warnings.catch_warnings(record=True) as w:
1197        # Set filter "always" to record all warnings.  Because
1198        # test_warnings swap the module, we need to look up in
1199        # the sys.modules dictionary.
1200        sys.modules['warnings'].simplefilter("always")
1201        yield WarningsRecorder(w)
1202    # Filter the recorded warnings
1203    reraise = list(w)
1204    missing = []
1205    for msg, cat in filters:
1206        seen = False
1207        for w in reraise[:]:
1208            warning = w.message
1209            # Filter out the matching messages
1210            if (re.match(msg, str(warning), re.I) and
1211                issubclass(warning.__class__, cat)):
1212                seen = True
1213                reraise.remove(w)
1214        if not seen and not quiet:
1215            # This filter caught nothing
1216            missing.append((msg, cat.__name__))
1217    if reraise:
1218        raise AssertionError("unhandled warning %s" % reraise[0])
1219    if missing:
1220        raise AssertionError("filter (%r, %s) did not catch any warning" %
1221                             missing[0])
1222
1223
1224@contextlib.contextmanager
1225def check_warnings(*filters, **kwargs):
1226    """Context manager to silence warnings.
1227
1228    Accept 2-tuples as positional arguments:
1229        ("message regexp", WarningCategory)
1230
1231    Optional argument:
1232     - if 'quiet' is True, it does not fail if a filter catches nothing
1233        (default True without argument,
1234         default False if some filters are defined)
1235
1236    Without argument, it defaults to:
1237        check_warnings(("", Warning), quiet=True)
1238    """
1239    quiet = kwargs.get('quiet')
1240    if not filters:
1241        filters = (("", Warning),)
1242        # Preserve backward compatibility
1243        if quiet is None:
1244            quiet = True
1245    return _filterwarnings(filters, quiet)
1246
1247
1248@contextlib.contextmanager
1249def check_no_resource_warning(testcase):
1250    """Context manager to check that no ResourceWarning is emitted.
1251
1252    Usage:
1253
1254        with check_no_resource_warning(self):
1255            f = open(...)
1256            ...
1257            del f
1258
1259    You must remove the object which may emit ResourceWarning before
1260    the end of the context manager.
1261    """
1262    with warnings.catch_warnings(record=True) as warns:
1263        warnings.filterwarnings('always', category=ResourceWarning)
1264        yield
1265        gc_collect()
1266    testcase.assertEqual(warns, [])
1267
1268
1269class CleanImport(object):
1270    """Context manager to force import to return a new module reference.
1271
1272    This is useful for testing module-level behaviours, such as
1273    the emission of a DeprecationWarning on import.
1274
1275    Use like this:
1276
1277        with CleanImport("foo"):
1278            importlib.import_module("foo") # new reference
1279    """
1280
1281    def __init__(self, *module_names):
1282        self.original_modules = sys.modules.copy()
1283        for module_name in module_names:
1284            if module_name in sys.modules:
1285                module = sys.modules[module_name]
1286                # It is possible that module_name is just an alias for
1287                # another module (e.g. stub for modules renamed in 3.x).
1288                # In that case, we also need delete the real module to clear
1289                # the import cache.
1290                if module.__name__ != module_name:
1291                    del sys.modules[module.__name__]
1292                del sys.modules[module_name]
1293
1294    def __enter__(self):
1295        return self
1296
1297    def __exit__(self, *ignore_exc):
1298        sys.modules.update(self.original_modules)
1299
1300
1301class EnvironmentVarGuard(collections.abc.MutableMapping):
1302
1303    """Class to help protect the environment variable properly.  Can be used as
1304    a context manager."""
1305
1306    def __init__(self):
1307        self._environ = os.environ
1308        self._changed = {}
1309
1310    def __getitem__(self, envvar):
1311        return self._environ[envvar]
1312
1313    def __setitem__(self, envvar, value):
1314        # Remember the initial value on the first access
1315        if envvar not in self._changed:
1316            self._changed[envvar] = self._environ.get(envvar)
1317        self._environ[envvar] = value
1318
1319    def __delitem__(self, envvar):
1320        # Remember the initial value on the first access
1321        if envvar not in self._changed:
1322            self._changed[envvar] = self._environ.get(envvar)
1323        if envvar in self._environ:
1324            del self._environ[envvar]
1325
1326    def keys(self):
1327        return self._environ.keys()
1328
1329    def __iter__(self):
1330        return iter(self._environ)
1331
1332    def __len__(self):
1333        return len(self._environ)
1334
1335    def set(self, envvar, value):
1336        self[envvar] = value
1337
1338    def unset(self, envvar):
1339        del self[envvar]
1340
1341    def __enter__(self):
1342        return self
1343
1344    def __exit__(self, *ignore_exc):
1345        for (k, v) in self._changed.items():
1346            if v is None:
1347                if k in self._environ:
1348                    del self._environ[k]
1349            else:
1350                self._environ[k] = v
1351        os.environ = self._environ
1352
1353
1354class DirsOnSysPath(object):
1355    """Context manager to temporarily add directories to sys.path.
1356
1357    This makes a copy of sys.path, appends any directories given
1358    as positional arguments, then reverts sys.path to the copied
1359    settings when the context ends.
1360
1361    Note that *all* sys.path modifications in the body of the
1362    context manager, including replacement of the object,
1363    will be reverted at the end of the block.
1364    """
1365
1366    def __init__(self, *paths):
1367        self.original_value = sys.path[:]
1368        self.original_object = sys.path
1369        sys.path.extend(paths)
1370
1371    def __enter__(self):
1372        return self
1373
1374    def __exit__(self, *ignore_exc):
1375        sys.path = self.original_object
1376        sys.path[:] = self.original_value
1377
1378
1379class TransientResource(object):
1380
1381    """Raise ResourceDenied if an exception is raised while the context manager
1382    is in effect that matches the specified exception and attributes."""
1383
1384    def __init__(self, exc, **kwargs):
1385        self.exc = exc
1386        self.attrs = kwargs
1387
1388    def __enter__(self):
1389        return self
1390
1391    def __exit__(self, type_=None, value=None, traceback=None):
1392        """If type_ is a subclass of self.exc and value has attributes matching
1393        self.attrs, raise ResourceDenied.  Otherwise let the exception
1394        propagate (if any)."""
1395        if type_ is not None and issubclass(self.exc, type_):
1396            for attr, attr_value in self.attrs.items():
1397                if not hasattr(value, attr):
1398                    break
1399                if getattr(value, attr) != attr_value:
1400                    break
1401            else:
1402                raise ResourceDenied("an optional resource is not available")
1403
1404# Context managers that raise ResourceDenied when various issues
1405# with the Internet connection manifest themselves as exceptions.
1406# XXX deprecate these and use transient_internet() instead
1407time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
1408socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1409ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
1410
1411
1412@contextlib.contextmanager
1413def transient_internet(resource_name, *, timeout=30.0, errnos=()):
1414    """Return a context manager that raises ResourceDenied when various issues
1415    with the Internet connection manifest themselves as exceptions."""
1416    default_errnos = [
1417        ('ECONNREFUSED', 111),
1418        ('ECONNRESET', 104),
1419        ('EHOSTUNREACH', 113),
1420        ('ENETUNREACH', 101),
1421        ('ETIMEDOUT', 110),
1422        # socket.create_connection() fails randomly with
1423        # EADDRNOTAVAIL on Travis CI.
1424        ('EADDRNOTAVAIL', 99),
1425    ]
1426    default_gai_errnos = [
1427        ('EAI_AGAIN', -3),
1428        ('EAI_FAIL', -4),
1429        ('EAI_NONAME', -2),
1430        ('EAI_NODATA', -5),
1431        # Encountered when trying to resolve IPv6-only hostnames
1432        ('WSANO_DATA', 11004),
1433    ]
1434
1435    denied = ResourceDenied("Resource %r is not available" % resource_name)
1436    captured_errnos = errnos
1437    gai_errnos = []
1438    if not captured_errnos:
1439        captured_errnos = [getattr(errno, name, num)
1440                           for (name, num) in default_errnos]
1441        gai_errnos = [getattr(socket, name, num)
1442                      for (name, num) in default_gai_errnos]
1443
1444    def filter_error(err):
1445        n = getattr(err, 'errno', None)
1446        if (isinstance(err, socket.timeout) or
1447            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1448            (isinstance(err, urllib.error.HTTPError) and
1449             500 <= err.code <= 599) or
1450            (isinstance(err, urllib.error.URLError) and
1451                 (("ConnectionRefusedError" in err.reason) or
1452                  ("TimeoutError" in err.reason) or
1453                  ("EOFError" in err.reason))) or
1454            n in captured_errnos):
1455            if not verbose:
1456                sys.stderr.write(denied.args[0] + "\n")
1457            raise denied from err
1458
1459    old_timeout = socket.getdefaulttimeout()
1460    try:
1461        if timeout is not None:
1462            socket.setdefaulttimeout(timeout)
1463        yield
1464    except nntplib.NNTPTemporaryError as err:
1465        if verbose:
1466            sys.stderr.write(denied.args[0] + "\n")
1467        raise denied from err
1468    except OSError as err:
1469        # urllib can wrap original socket errors multiple times (!), we must
1470        # unwrap to get at the original error.
1471        while True:
1472            a = err.args
1473            if len(a) >= 1 and isinstance(a[0], OSError):
1474                err = a[0]
1475            # The error can also be wrapped as args[1]:
1476            #    except socket.error as msg:
1477            #        raise OSError('socket error', msg).with_traceback(sys.exc_info()[2])
1478            elif len(a) >= 2 and isinstance(a[1], OSError):
1479                err = a[1]
1480            else:
1481                break
1482        filter_error(err)
1483        raise
1484    # XXX should we catch generic exceptions and look for their
1485    # __cause__ or __context__?
1486    finally:
1487        socket.setdefaulttimeout(old_timeout)
1488
1489
1490@contextlib.contextmanager
1491def captured_output(stream_name):
1492    """Return a context manager used by captured_stdout/stdin/stderr
1493    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1494    import io
1495    orig_stdout = getattr(sys, stream_name)
1496    setattr(sys, stream_name, io.StringIO())
1497    try:
1498        yield getattr(sys, stream_name)
1499    finally:
1500        setattr(sys, stream_name, orig_stdout)
1501
1502def captured_stdout():
1503    """Capture the output of sys.stdout:
1504
1505       with captured_stdout() as stdout:
1506           print("hello")
1507       self.assertEqual(stdout.getvalue(), "hello\\n")
1508    """
1509    return captured_output("stdout")
1510
1511def captured_stderr():
1512    """Capture the output of sys.stderr:
1513
1514       with captured_stderr() as stderr:
1515           print("hello", file=sys.stderr)
1516       self.assertEqual(stderr.getvalue(), "hello\\n")
1517    """
1518    return captured_output("stderr")
1519
1520def captured_stdin():
1521    """Capture the input to sys.stdin:
1522
1523       with captured_stdin() as stdin:
1524           stdin.write('hello\\n')
1525           stdin.seek(0)
1526           # call test code that consumes from sys.stdin
1527           captured = input()
1528       self.assertEqual(captured, "hello")
1529    """
1530    return captured_output("stdin")
1531
1532
1533def gc_collect():
1534    """Force as many objects as possible to be collected.
1535
1536    In non-CPython implementations of Python, this is needed because timely
1537    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1538    this can be the case in case of reference cycles.)  This means that __del__
1539    methods may be called later than expected and weakrefs may remain alive for
1540    longer than expected.  This function tries its best to force all garbage
1541    objects to disappear.
1542    """
1543    gc.collect()
1544    if is_jython:
1545        time.sleep(0.1)
1546    gc.collect()
1547    gc.collect()
1548
1549@contextlib.contextmanager
1550def disable_gc():
1551    have_gc = gc.isenabled()
1552    gc.disable()
1553    try:
1554        yield
1555    finally:
1556        if have_gc:
1557            gc.enable()
1558
1559
1560def python_is_optimized():
1561    """Find if Python was built with optimizations."""
1562    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
1563    final_opt = ""
1564    for opt in cflags.split():
1565        if opt.startswith('-O'):
1566            final_opt = opt
1567    return final_opt not in ('', '-O0', '-Og')
1568
1569
1570_header = 'nP'
1571_align = '0n'
1572if hasattr(sys, "gettotalrefcount"):
1573    _header = '2P' + _header
1574    _align = '0P'
1575_vheader = _header + 'n'
1576
1577def calcobjsize(fmt):
1578    return struct.calcsize(_header + fmt + _align)
1579
1580def calcvobjsize(fmt):
1581    return struct.calcsize(_vheader + fmt + _align)
1582
1583
1584_TPFLAGS_HAVE_GC = 1<<14
1585_TPFLAGS_HEAPTYPE = 1<<9
1586
1587def check_sizeof(test, o, size):
1588    import _testcapi
1589    result = sys.getsizeof(o)
1590    # add GC header size
1591    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1592        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1593        size += _testcapi.SIZEOF_PYGC_HEAD
1594    msg = 'wrong size for %s: got %d, expected %d' \
1595            % (type(o), result, size)
1596    test.assertEqual(result, size, msg)
1597
1598#=======================================================================
1599# Decorator for running a function in a different locale, correctly resetting
1600# it afterwards.
1601
1602def run_with_locale(catstr, *locales):
1603    def decorator(func):
1604        def inner(*args, **kwds):
1605            try:
1606                import locale
1607                category = getattr(locale, catstr)
1608                orig_locale = locale.setlocale(category)
1609            except AttributeError:
1610                # if the test author gives us an invalid category string
1611                raise
1612            except:
1613                # cannot retrieve original locale, so do nothing
1614                locale = orig_locale = None
1615            else:
1616                for loc in locales:
1617                    try:
1618                        locale.setlocale(category, loc)
1619                        break
1620                    except:
1621                        pass
1622
1623            # now run the function, resetting the locale on exceptions
1624            try:
1625                return func(*args, **kwds)
1626            finally:
1627                if locale and orig_locale:
1628                    locale.setlocale(category, orig_locale)
1629        inner.__name__ = func.__name__
1630        inner.__doc__ = func.__doc__
1631        return inner
1632    return decorator
1633
1634#=======================================================================
1635# Decorator for running a function in a specific timezone, correctly
1636# resetting it afterwards.
1637
1638def run_with_tz(tz):
1639    def decorator(func):
1640        def inner(*args, **kwds):
1641            try:
1642                tzset = time.tzset
1643            except AttributeError:
1644                raise unittest.SkipTest("tzset required")
1645            if 'TZ' in os.environ:
1646                orig_tz = os.environ['TZ']
1647            else:
1648                orig_tz = None
1649            os.environ['TZ'] = tz
1650            tzset()
1651
1652            # now run the function, resetting the tz on exceptions
1653            try:
1654                return func(*args, **kwds)
1655            finally:
1656                if orig_tz is None:
1657                    del os.environ['TZ']
1658                else:
1659                    os.environ['TZ'] = orig_tz
1660                time.tzset()
1661
1662        inner.__name__ = func.__name__
1663        inner.__doc__ = func.__doc__
1664        return inner
1665    return decorator
1666
1667#=======================================================================
1668# Big-memory-test support. Separate from 'resources' because memory use
1669# should be configurable.
1670
1671# Some handy shorthands. Note that these are used for byte-limits as well
1672# as size-limits, in the various bigmem tests
1673_1M = 1024*1024
1674_1G = 1024 * _1M
1675_2G = 2 * _1G
1676_4G = 4 * _1G
1677
1678MAX_Py_ssize_t = sys.maxsize
1679
1680def set_memlimit(limit):
1681    global max_memuse
1682    global real_max_memuse
1683    sizes = {
1684        'k': 1024,
1685        'm': _1M,
1686        'g': _1G,
1687        't': 1024*_1G,
1688    }
1689    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1690                 re.IGNORECASE | re.VERBOSE)
1691    if m is None:
1692        raise ValueError('Invalid memory limit %r' % (limit,))
1693    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1694    real_max_memuse = memlimit
1695    if memlimit > MAX_Py_ssize_t:
1696        memlimit = MAX_Py_ssize_t
1697    if memlimit < _2G - 1:
1698        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1699    max_memuse = memlimit
1700
1701class _MemoryWatchdog:
1702    """An object which periodically watches the process' memory consumption
1703    and prints it out.
1704    """
1705
1706    def __init__(self):
1707        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
1708        self.started = False
1709
1710    def start(self):
1711        try:
1712            f = open(self.procfile, 'r')
1713        except OSError as e:
1714            warnings.warn('/proc not available for stats: {}'.format(e),
1715                          RuntimeWarning)
1716            sys.stderr.flush()
1717            return
1718
1719        watchdog_script = findfile("memory_watchdog.py")
1720        self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
1721                                             stdin=f, stderr=subprocess.DEVNULL)
1722        f.close()
1723        self.started = True
1724
1725    def stop(self):
1726        if self.started:
1727            self.mem_watchdog.terminate()
1728            self.mem_watchdog.wait()
1729
1730
1731def bigmemtest(size, memuse, dry_run=True):
1732    """Decorator for bigmem tests.
1733
1734    'size' is a requested size for the test (in arbitrary, test-interpreted
1735    units.) 'memuse' is the number of bytes per unit for the test, or a good
1736    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
1737    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
1738
1739    The 'size' argument is normally passed to the decorated test method as an
1740    extra argument. If 'dry_run' is true, the value passed to the test method
1741    may be less than the requested value. If 'dry_run' is false, it means the
1742    test doesn't support dummy runs when -M is not specified.
1743    """
1744    def decorator(f):
1745        def wrapper(self):
1746            size = wrapper.size
1747            memuse = wrapper.memuse
1748            if not real_max_memuse:
1749                maxsize = 5147
1750            else:
1751                maxsize = size
1752
1753            if ((real_max_memuse or not dry_run)
1754                and real_max_memuse < maxsize * memuse):
1755                raise unittest.SkipTest(
1756                    "not enough memory: %.1fG minimum needed"
1757                    % (size * memuse / (1024 ** 3)))
1758
1759            if real_max_memuse and verbose:
1760                print()
1761                print(" ... expected peak memory use: {peak:.1f}G"
1762                      .format(peak=size * memuse / (1024 ** 3)))
1763                watchdog = _MemoryWatchdog()
1764                watchdog.start()
1765            else:
1766                watchdog = None
1767
1768            try:
1769                return f(self, maxsize)
1770            finally:
1771                if watchdog:
1772                    watchdog.stop()
1773
1774        wrapper.size = size
1775        wrapper.memuse = memuse
1776        return wrapper
1777    return decorator
1778
1779def bigaddrspacetest(f):
1780    """Decorator for tests that fill the address space."""
1781    def wrapper(self):
1782        if max_memuse < MAX_Py_ssize_t:
1783            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
1784                raise unittest.SkipTest(
1785                    "not enough memory: try a 32-bit build instead")
1786            else:
1787                raise unittest.SkipTest(
1788                    "not enough memory: %.1fG minimum needed"
1789                    % (MAX_Py_ssize_t / (1024 ** 3)))
1790        else:
1791            return f(self)
1792    return wrapper
1793
1794#=======================================================================
1795# unittest integration.
1796
1797class BasicTestRunner:
1798    def run(self, test):
1799        result = unittest.TestResult()
1800        test(result)
1801        return result
1802
1803def _id(obj):
1804    return obj
1805
1806def requires_resource(resource):
1807    if resource == 'gui' and not _is_gui_available():
1808        return unittest.skip(_is_gui_available.reason)
1809    if is_resource_enabled(resource):
1810        return _id
1811    else:
1812        return unittest.skip("resource {0!r} is not enabled".format(resource))
1813
1814def cpython_only(test):
1815    """
1816    Decorator for tests only applicable on CPython.
1817    """
1818    return impl_detail(cpython=True)(test)
1819
1820def impl_detail(msg=None, **guards):
1821    if check_impl_detail(**guards):
1822        return _id
1823    if msg is None:
1824        guardnames, default = _parse_guards(guards)
1825        if default:
1826            msg = "implementation detail not available on {0}"
1827        else:
1828            msg = "implementation detail specific to {0}"
1829        guardnames = sorted(guardnames.keys())
1830        msg = msg.format(' or '.join(guardnames))
1831    return unittest.skip(msg)
1832
1833def _parse_guards(guards):
1834    # Returns a tuple ({platform_name: run_me}, default_value)
1835    if not guards:
1836        return ({'cpython': True}, False)
1837    is_true = list(guards.values())[0]
1838    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
1839    return (guards, not is_true)
1840
1841# Use the following check to guard CPython's implementation-specific tests --
1842# or to run them only on the implementation(s) guarded by the arguments.
1843def check_impl_detail(**guards):
1844    """This function returns True or False depending on the host platform.
1845       Examples:
1846          if check_impl_detail():               # only on CPython (default)
1847          if check_impl_detail(jython=True):    # only on Jython
1848          if check_impl_detail(cpython=False):  # everywhere except on CPython
1849    """
1850    guards, default = _parse_guards(guards)
1851    return guards.get(platform.python_implementation().lower(), default)
1852
1853
1854def no_tracing(func):
1855    """Decorator to temporarily turn off tracing for the duration of a test."""
1856    if not hasattr(sys, 'gettrace'):
1857        return func
1858    else:
1859        @functools.wraps(func)
1860        def wrapper(*args, **kwargs):
1861            original_trace = sys.gettrace()
1862            try:
1863                sys.settrace(None)
1864                return func(*args, **kwargs)
1865            finally:
1866                sys.settrace(original_trace)
1867        return wrapper
1868
1869
1870def refcount_test(test):
1871    """Decorator for tests which involve reference counting.
1872
1873    To start, the decorator does not run the test if is not run by CPython.
1874    After that, any trace function is unset during the test to prevent
1875    unexpected refcounts caused by the trace function.
1876
1877    """
1878    return no_tracing(cpython_only(test))
1879
1880
1881def _filter_suite(suite, pred):
1882    """Recursively filter test cases in a suite based on a predicate."""
1883    newtests = []
1884    for test in suite._tests:
1885        if isinstance(test, unittest.TestSuite):
1886            _filter_suite(test, pred)
1887            newtests.append(test)
1888        else:
1889            if pred(test):
1890                newtests.append(test)
1891    suite._tests = newtests
1892
1893def _run_suite(suite):
1894    """Run tests from a unittest.TestSuite-derived class."""
1895    runner = get_test_runner(sys.stdout,
1896                             verbosity=verbose,
1897                             capture_output=(junit_xml_list is not None))
1898
1899    result = runner.run(suite)
1900
1901    if junit_xml_list is not None:
1902        junit_xml_list.append(result.get_xml_element())
1903
1904    if not result.testsRun and not result.skipped:
1905        raise TestDidNotRun
1906    if not result.wasSuccessful():
1907        if len(result.errors) == 1 and not result.failures:
1908            err = result.errors[0][1]
1909        elif len(result.failures) == 1 and not result.errors:
1910            err = result.failures[0][1]
1911        else:
1912            err = "multiple errors occurred"
1913            if not verbose: err += "; run in verbose mode for details"
1914        raise TestFailed(err)
1915
1916
1917# By default, don't filter tests
1918_match_test_func = None
1919_match_test_patterns = None
1920
1921
1922def match_test(test):
1923    # Function used by support.run_unittest() and regrtest --list-cases
1924    if _match_test_func is None:
1925        return True
1926    else:
1927        return _match_test_func(test.id())
1928
1929
1930def _is_full_match_test(pattern):
1931    # If a pattern contains at least one dot, it's considered
1932    # as a full test identifier.
1933    # Example: 'test.test_os.FileTests.test_access'.
1934    #
1935    # Reject patterns which contain fnmatch patterns: '*', '?', '[...]'
1936    # or '[!...]'. For example, reject 'test_access*'.
1937    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1938
1939
1940def set_match_tests(patterns):
1941    global _match_test_func, _match_test_patterns
1942
1943    if patterns == _match_test_patterns:
1944        # No change: no need to recompile patterns.
1945        return
1946
1947    if not patterns:
1948        func = None
1949        # set_match_tests(None) behaves as set_match_tests(())
1950        patterns = ()
1951    elif all(map(_is_full_match_test, patterns)):
1952        # Simple case: all patterns are full test identifier.
1953        # The test.bisect_cmd utility only uses such full test identifiers.
1954        func = set(patterns).__contains__
1955    else:
1956        regex = '|'.join(map(fnmatch.translate, patterns))
1957        # The search *is* case sensitive on purpose:
1958        # don't use flags=re.IGNORECASE
1959        regex_match = re.compile(regex).match
1960
1961        def match_test_regex(test_id):
1962            if regex_match(test_id):
1963                # The regex matchs the whole identifier like
1964                # 'test.test_os.FileTests.test_access'
1965                return True
1966            else:
1967                # Try to match parts of the test identifier.
1968                # For example, split 'test.test_os.FileTests.test_access'
1969                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1970                return any(map(regex_match, test_id.split(".")))
1971
1972        func = match_test_regex
1973
1974    # Create a copy since patterns can be mutable and so modified later
1975    _match_test_patterns = tuple(patterns)
1976    _match_test_func = func
1977
1978
1979
1980def run_unittest(*classes):
1981    """Run tests from unittest.TestCase-derived classes."""
1982    valid_types = (unittest.TestSuite, unittest.TestCase)
1983    suite = unittest.TestSuite()
1984    for cls in classes:
1985        if isinstance(cls, str):
1986            if cls in sys.modules:
1987                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1988            else:
1989                raise ValueError("str arguments must be keys in sys.modules")
1990        elif isinstance(cls, valid_types):
1991            suite.addTest(cls)
1992        else:
1993            suite.addTest(unittest.makeSuite(cls))
1994    _filter_suite(suite, match_test)
1995    _run_suite(suite)
1996
1997#=======================================================================
1998# Check for the presence of docstrings.
1999
2000# Rather than trying to enumerate all the cases where docstrings may be
2001# disabled, we just check for that directly
2002
2003def _check_docstrings():
2004    """Just used to check if docstrings are enabled"""
2005
2006MISSING_C_DOCSTRINGS = (check_impl_detail() and
2007                        sys.platform != 'win32' and
2008                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
2009
2010HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
2011                   not MISSING_C_DOCSTRINGS)
2012
2013requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
2014                                          "test requires docstrings")
2015
2016
2017#=======================================================================
2018# doctest driver.
2019
2020def run_doctest(module, verbosity=None, optionflags=0):
2021    """Run doctest on the given module.  Return (#failures, #tests).
2022
2023    If optional argument verbosity is not specified (or is None), pass
2024    support's belief about verbosity on to doctest.  Else doctest's
2025    usual behavior is used (it searches sys.argv for -v).
2026    """
2027
2028    import doctest
2029
2030    if verbosity is None:
2031        verbosity = verbose
2032    else:
2033        verbosity = None
2034
2035    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
2036    if f:
2037        raise TestFailed("%d of %d doctests failed" % (f, t))
2038    if verbose:
2039        print('doctest (%s) ... %d tests with zero failures' %
2040              (module.__name__, t))
2041    return f, t
2042
2043
2044#=======================================================================
2045# Support for saving and restoring the imported modules.
2046
2047def modules_setup():
2048    return sys.modules.copy(),
2049
2050def modules_cleanup(oldmodules):
2051    # Encoders/decoders are registered permanently within the internal
2052    # codec cache. If we destroy the corresponding modules their
2053    # globals will be set to None which will trip up the cached functions.
2054    encodings = [(k, v) for k, v in sys.modules.items()
2055                 if k.startswith('encodings.')]
2056    sys.modules.clear()
2057    sys.modules.update(encodings)
2058    # XXX: This kind of problem can affect more than just encodings. In particular
2059    # extension modules (such as _ssl) don't cope with reloading properly.
2060    # Really, test modules should be cleaning out the test specific modules they
2061    # know they added (ala test_runpy) rather than relying on this function (as
2062    # test_importhooks and test_pkg do currently).
2063    # Implicitly imported *real* modules should be left alone (see issue 10556).
2064    sys.modules.update(oldmodules)
2065
2066#=======================================================================
2067# Threading support to prevent reporting refleaks when running regrtest.py -R
2068
2069# Flag used by saved_test_environment of test.libregrtest.save_env,
2070# to check if a test modified the environment. The flag should be set to False
2071# before running a new test.
2072#
2073# For example, threading_cleanup() sets the flag is the function fails
2074# to cleanup threads.
2075environment_altered = False
2076
2077# NOTE: we use thread._count() rather than threading.enumerate() (or the
2078# moral equivalent thereof) because a threading.Thread object is still alive
2079# until its __bootstrap() method has returned, even after it has been
2080# unregistered from the threading module.
2081# thread._count(), on the other hand, only gets decremented *after* the
2082# __bootstrap() method has returned, which gives us reliable reference counts
2083# at the end of a test run.
2084
2085def threading_setup():
2086    return _thread._count(), threading._dangling.copy()
2087
2088def threading_cleanup(*original_values):
2089    global environment_altered
2090
2091    _MAX_COUNT = 100
2092
2093    for count in range(_MAX_COUNT):
2094        values = _thread._count(), threading._dangling
2095        if values == original_values:
2096            break
2097
2098        if not count:
2099            # Display a warning at the first iteration
2100            environment_altered = True
2101            dangling_threads = values[1]
2102            print("Warning -- threading_cleanup() failed to cleanup "
2103                  "%s threads (count: %s, dangling: %s)"
2104                  % (values[0] - original_values[0],
2105                     values[0], len(dangling_threads)),
2106                  file=sys.stderr)
2107            for thread in dangling_threads:
2108                print(f"Dangling thread: {thread!r}", file=sys.stderr)
2109            sys.stderr.flush()
2110
2111            # Don't hold references to threads
2112            dangling_threads = None
2113        values = None
2114
2115        time.sleep(0.01)
2116        gc_collect()
2117
2118
2119def reap_threads(func):
2120    """Use this function when threads are being used.  This will
2121    ensure that the threads are cleaned up even when the test fails.
2122    """
2123    @functools.wraps(func)
2124    def decorator(*args):
2125        key = threading_setup()
2126        try:
2127            return func(*args)
2128        finally:
2129            threading_cleanup(*key)
2130    return decorator
2131
2132
2133@contextlib.contextmanager
2134def wait_threads_exit(timeout=60.0):
2135    """
2136    bpo-31234: Context manager to wait until all threads created in the with
2137    statement exit.
2138
2139    Use _thread.count() to check if threads exited. Indirectly, wait until
2140    threads exit the internal t_bootstrap() C function of the _thread module.
2141
2142    threading_setup() and threading_cleanup() are designed to emit a warning
2143    if a test leaves running threads in the background. This context manager
2144    is designed to cleanup threads started by the _thread.start_new_thread()
2145    which doesn't allow to wait for thread exit, whereas thread.Thread has a
2146    join() method.
2147    """
2148    old_count = _thread._count()
2149    try:
2150        yield
2151    finally:
2152        start_time = time.monotonic()
2153        deadline = start_time + timeout
2154        while True:
2155            count = _thread._count()
2156            if count <= old_count:
2157                break
2158            if time.monotonic() > deadline:
2159                dt = time.monotonic() - start_time
2160                msg = (f"wait_threads() failed to cleanup {count - old_count} "
2161                       f"threads after {dt:.1f} seconds "
2162                       f"(count: {count}, old count: {old_count})")
2163                raise AssertionError(msg)
2164            time.sleep(0.010)
2165            gc_collect()
2166
2167
2168def join_thread(thread, timeout=30.0):
2169    """Join a thread. Raise an AssertionError if the thread is still alive
2170    after timeout seconds.
2171    """
2172    thread.join(timeout)
2173    if thread.is_alive():
2174        msg = f"failed to join the thread in {timeout:.1f} seconds"
2175        raise AssertionError(msg)
2176
2177
2178def reap_children():
2179    """Use this function at the end of test_main() whenever sub-processes
2180    are started.  This will help ensure that no extra children (zombies)
2181    stick around to hog resources and create problems when looking
2182    for refleaks.
2183    """
2184    global environment_altered
2185
2186    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
2187    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
2188        return
2189
2190    # Reap all our dead child processes so we don't leave zombies around.
2191    # These hog resources and might be causing some of the buildbots to die.
2192    while True:
2193        try:
2194            # Read the exit status of any child process which already completed
2195            pid, status = os.waitpid(-1, os.WNOHANG)
2196        except OSError:
2197            break
2198
2199        if pid == 0:
2200            break
2201
2202        print("Warning -- reap_children() reaped child process %s"
2203              % pid, file=sys.stderr)
2204        environment_altered = True
2205
2206
2207@contextlib.contextmanager
2208def start_threads(threads, unlock=None):
2209    threads = list(threads)
2210    started = []
2211    try:
2212        try:
2213            for t in threads:
2214                t.start()
2215                started.append(t)
2216        except:
2217            if verbose:
2218                print("Can't start %d threads, only %d threads started" %
2219                      (len(threads), len(started)))
2220            raise
2221        yield
2222    finally:
2223        try:
2224            if unlock:
2225                unlock()
2226            endtime = starttime = time.monotonic()
2227            for timeout in range(1, 16):
2228                endtime += 60
2229                for t in started:
2230                    t.join(max(endtime - time.monotonic(), 0.01))
2231                started = [t for t in started if t.is_alive()]
2232                if not started:
2233                    break
2234                if verbose:
2235                    print('Unable to join %d threads during a period of '
2236                          '%d minutes' % (len(started), timeout))
2237        finally:
2238            started = [t for t in started if t.is_alive()]
2239            if started:
2240                faulthandler.dump_traceback(sys.stdout)
2241                raise AssertionError('Unable to join %d threads' % len(started))
2242
2243@contextlib.contextmanager
2244def swap_attr(obj, attr, new_val):
2245    """Temporary swap out an attribute with a new object.
2246
2247    Usage:
2248        with swap_attr(obj, "attr", 5):
2249            ...
2250
2251        This will set obj.attr to 5 for the duration of the with: block,
2252        restoring the old value at the end of the block. If `attr` doesn't
2253        exist on `obj`, it will be created and then deleted at the end of the
2254        block.
2255
2256        The old value (or None if it doesn't exist) will be assigned to the
2257        target of the "as" clause, if there is one.
2258    """
2259    if hasattr(obj, attr):
2260        real_val = getattr(obj, attr)
2261        setattr(obj, attr, new_val)
2262        try:
2263            yield real_val
2264        finally:
2265            setattr(obj, attr, real_val)
2266    else:
2267        setattr(obj, attr, new_val)
2268        try:
2269            yield
2270        finally:
2271            if hasattr(obj, attr):
2272                delattr(obj, attr)
2273
2274@contextlib.contextmanager
2275def swap_item(obj, item, new_val):
2276    """Temporary swap out an item with a new object.
2277
2278    Usage:
2279        with swap_item(obj, "item", 5):
2280            ...
2281
2282        This will set obj["item"] to 5 for the duration of the with: block,
2283        restoring the old value at the end of the block. If `item` doesn't
2284        exist on `obj`, it will be created and then deleted at the end of the
2285        block.
2286
2287        The old value (or None if it doesn't exist) will be assigned to the
2288        target of the "as" clause, if there is one.
2289    """
2290    if item in obj:
2291        real_val = obj[item]
2292        obj[item] = new_val
2293        try:
2294            yield real_val
2295        finally:
2296            obj[item] = real_val
2297    else:
2298        obj[item] = new_val
2299        try:
2300            yield
2301        finally:
2302            if item in obj:
2303                del obj[item]
2304
2305def strip_python_stderr(stderr):
2306    """Strip the stderr of a Python process from potential debug output
2307    emitted by the interpreter.
2308
2309    This will typically be run on the result of the communicate() method
2310    of a subprocess.Popen object.
2311    """
2312    stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip()
2313    return stderr
2314
2315requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'),
2316                        'types are immortal if COUNT_ALLOCS is defined')
2317
2318def args_from_interpreter_flags():
2319    """Return a list of command-line arguments reproducing the current
2320    settings in sys.flags and sys.warnoptions."""
2321    return subprocess._args_from_interpreter_flags()
2322
2323def optim_args_from_interpreter_flags():
2324    """Return a list of command-line arguments reproducing the current
2325    optimization settings in sys.flags."""
2326    return subprocess._optim_args_from_interpreter_flags()
2327
2328#============================================================
2329# Support for assertions about logging.
2330#============================================================
2331
2332class TestHandler(logging.handlers.BufferingHandler):
2333    def __init__(self, matcher):
2334        # BufferingHandler takes a "capacity" argument
2335        # so as to know when to flush. As we're overriding
2336        # shouldFlush anyway, we can set a capacity of zero.
2337        # You can call flush() manually to clear out the
2338        # buffer.
2339        logging.handlers.BufferingHandler.__init__(self, 0)
2340        self.matcher = matcher
2341
2342    def shouldFlush(self):
2343        return False
2344
2345    def emit(self, record):
2346        self.format(record)
2347        self.buffer.append(record.__dict__)
2348
2349    def matches(self, **kwargs):
2350        """
2351        Look for a saved dict whose keys/values match the supplied arguments.
2352        """
2353        result = False
2354        for d in self.buffer:
2355            if self.matcher.matches(d, **kwargs):
2356                result = True
2357                break
2358        return result
2359
2360class Matcher(object):
2361
2362    _partial_matches = ('msg', 'message')
2363
2364    def matches(self, d, **kwargs):
2365        """
2366        Try to match a single dict with the supplied arguments.
2367
2368        Keys whose values are strings and which are in self._partial_matches
2369        will be checked for partial (i.e. substring) matches. You can extend
2370        this scheme to (for example) do regular expression matching, etc.
2371        """
2372        result = True
2373        for k in kwargs:
2374            v = kwargs[k]
2375            dv = d.get(k)
2376            if not self.match_value(k, dv, v):
2377                result = False
2378                break
2379        return result
2380
2381    def match_value(self, k, dv, v):
2382        """
2383        Try to match a single stored value (dv) with a supplied value (v).
2384        """
2385        if type(v) != type(dv):
2386            result = False
2387        elif type(dv) is not str or k not in self._partial_matches:
2388            result = (v == dv)
2389        else:
2390            result = dv.find(v) >= 0
2391        return result
2392
2393
2394_can_symlink = None
2395def can_symlink():
2396    global _can_symlink
2397    if _can_symlink is not None:
2398        return _can_symlink
2399    symlink_path = TESTFN + "can_symlink"
2400    try:
2401        os.symlink(TESTFN, symlink_path)
2402        can = True
2403    except (OSError, NotImplementedError, AttributeError):
2404        can = False
2405    else:
2406        os.remove(symlink_path)
2407    _can_symlink = can
2408    return can
2409
2410def skip_unless_symlink(test):
2411    """Skip decorator for tests that require functional symlink"""
2412    ok = can_symlink()
2413    msg = "Requires functional symlink implementation"
2414    return test if ok else unittest.skip(msg)(test)
2415
2416_can_xattr = None
2417def can_xattr():
2418    global _can_xattr
2419    if _can_xattr is not None:
2420        return _can_xattr
2421    if not hasattr(os, "setxattr"):
2422        can = False
2423    else:
2424        tmp_dir = tempfile.mkdtemp()
2425        tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
2426        try:
2427            with open(TESTFN, "wb") as fp:
2428                try:
2429                    # TESTFN & tempfile may use different file systems with
2430                    # different capabilities
2431                    os.setxattr(tmp_fp, b"user.test", b"")
2432                    os.setxattr(tmp_name, b"trusted.foo", b"42")
2433                    os.setxattr(fp.fileno(), b"user.test", b"")
2434                    # Kernels < 2.6.39 don't respect setxattr flags.
2435                    kernel_version = platform.release()
2436                    m = re.match(r"2.6.(\d{1,2})", kernel_version)
2437                    can = m is None or int(m.group(1)) >= 39
2438                except OSError:
2439                    can = False
2440        finally:
2441            unlink(TESTFN)
2442            unlink(tmp_name)
2443            rmdir(tmp_dir)
2444    _can_xattr = can
2445    return can
2446
2447def skip_unless_xattr(test):
2448    """Skip decorator for tests that require functional extended attributes"""
2449    ok = can_xattr()
2450    msg = "no non-broken extended attribute support"
2451    return test if ok else unittest.skip(msg)(test)
2452
2453_bind_nix_socket_error = None
2454def skip_unless_bind_unix_socket(test):
2455    """Decorator for tests requiring a functional bind() for unix sockets."""
2456    if not hasattr(socket, 'AF_UNIX'):
2457        return unittest.skip('No UNIX Sockets')(test)
2458    global _bind_nix_socket_error
2459    if _bind_nix_socket_error is None:
2460        path = TESTFN + "can_bind_unix_socket"
2461        with socket.socket(socket.AF_UNIX) as sock:
2462            try:
2463                sock.bind(path)
2464                _bind_nix_socket_error = False
2465            except OSError as e:
2466                _bind_nix_socket_error = e
2467            finally:
2468                unlink(path)
2469    if _bind_nix_socket_error:
2470        msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
2471        return unittest.skip(msg)(test)
2472    else:
2473        return test
2474
2475
2476def fs_is_case_insensitive(directory):
2477    """Detects if the file system for the specified directory is case-insensitive."""
2478    with tempfile.NamedTemporaryFile(dir=directory) as base:
2479        base_path = base.name
2480        case_path = base_path.upper()
2481        if case_path == base_path:
2482            case_path = base_path.lower()
2483        try:
2484            return os.path.samefile(base_path, case_path)
2485        except FileNotFoundError:
2486            return False
2487
2488
2489def detect_api_mismatch(ref_api, other_api, *, ignore=()):
2490    """Returns the set of items in ref_api not in other_api, except for a
2491    defined list of items to be ignored in this check.
2492
2493    By default this skips private attributes beginning with '_' but
2494    includes all magic methods, i.e. those starting and ending in '__'.
2495    """
2496    missing_items = set(dir(ref_api)) - set(dir(other_api))
2497    if ignore:
2498        missing_items -= set(ignore)
2499    missing_items = set(m for m in missing_items
2500                        if not m.startswith('_') or m.endswith('__'))
2501    return missing_items
2502
2503
2504def check__all__(test_case, module, name_of_module=None, extra=(),
2505                 blacklist=()):
2506    """Assert that the __all__ variable of 'module' contains all public names.
2507
2508    The module's public names (its API) are detected automatically based on
2509    whether they match the public name convention and were defined in
2510    'module'.
2511
2512    The 'name_of_module' argument can specify (as a string or tuple thereof)
2513    what module(s) an API could be defined in in order to be detected as a
2514    public API. One case for this is when 'module' imports part of its public
2515    API from other modules, possibly a C backend (like 'csv' and its '_csv').
2516
2517    The 'extra' argument can be a set of names that wouldn't otherwise be
2518    automatically detected as "public", like objects without a proper
2519    '__module__' attribute. If provided, it will be added to the
2520    automatically detected ones.
2521
2522    The 'blacklist' argument can be a set of names that must not be treated
2523    as part of the public API even though their names indicate otherwise.
2524
2525    Usage:
2526        import bar
2527        import foo
2528        import unittest
2529        from test import support
2530
2531        class MiscTestCase(unittest.TestCase):
2532            def test__all__(self):
2533                support.check__all__(self, foo)
2534
2535        class OtherTestCase(unittest.TestCase):
2536            def test__all__(self):
2537                extra = {'BAR_CONST', 'FOO_CONST'}
2538                blacklist = {'baz'}  # Undocumented name.
2539                # bar imports part of its API from _bar.
2540                support.check__all__(self, bar, ('bar', '_bar'),
2541                                     extra=extra, blacklist=blacklist)
2542
2543    """
2544
2545    if name_of_module is None:
2546        name_of_module = (module.__name__, )
2547    elif isinstance(name_of_module, str):
2548        name_of_module = (name_of_module, )
2549
2550    expected = set(extra)
2551
2552    for name in dir(module):
2553        if name.startswith('_') or name in blacklist:
2554            continue
2555        obj = getattr(module, name)
2556        if (getattr(obj, '__module__', None) in name_of_module or
2557                (not hasattr(obj, '__module__') and
2558                 not isinstance(obj, types.ModuleType))):
2559            expected.add(name)
2560    test_case.assertCountEqual(module.__all__, expected)
2561
2562
2563class SuppressCrashReport:
2564    """Try to prevent a crash report from popping up.
2565
2566    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
2567    disable the creation of coredump file.
2568    """
2569    old_value = None
2570    old_modes = None
2571
2572    def __enter__(self):
2573        """On Windows, disable Windows Error Reporting dialogs using
2574        SetErrorMode.
2575
2576        On UNIX, try to save the previous core file size limit, then set
2577        soft limit to 0.
2578        """
2579        if sys.platform.startswith('win'):
2580            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
2581            # GetErrorMode is not available on Windows XP and Windows Server 2003,
2582            # but SetErrorMode returns the previous value, so we can use that
2583            import ctypes
2584            self._k32 = ctypes.windll.kernel32
2585            SEM_NOGPFAULTERRORBOX = 0x02
2586            self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
2587            self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX)
2588
2589            # Suppress assert dialogs in debug builds
2590            # (see http://bugs.python.org/issue23314)
2591            try:
2592                import msvcrt
2593                msvcrt.CrtSetReportMode
2594            except (AttributeError, ImportError):
2595                # no msvcrt or a release build
2596                pass
2597            else:
2598                self.old_modes = {}
2599                for report_type in [msvcrt.CRT_WARN,
2600                                    msvcrt.CRT_ERROR,
2601                                    msvcrt.CRT_ASSERT]:
2602                    old_mode = msvcrt.CrtSetReportMode(report_type,
2603                            msvcrt.CRTDBG_MODE_FILE)
2604                    old_file = msvcrt.CrtSetReportFile(report_type,
2605                            msvcrt.CRTDBG_FILE_STDERR)
2606                    self.old_modes[report_type] = old_mode, old_file
2607
2608        else:
2609            if resource is not None:
2610                try:
2611                    self.old_value = resource.getrlimit(resource.RLIMIT_CORE)
2612                    resource.setrlimit(resource.RLIMIT_CORE,
2613                                       (0, self.old_value[1]))
2614                except (ValueError, OSError):
2615                    pass
2616
2617            if sys.platform == 'darwin':
2618                # Check if the 'Crash Reporter' on OSX was configured
2619                # in 'Developer' mode and warn that it will get triggered
2620                # when it is.
2621                #
2622                # This assumes that this context manager is used in tests
2623                # that might trigger the next manager.
2624                cmd = ['/usr/bin/defaults', 'read',
2625                       'com.apple.CrashReporter', 'DialogType']
2626                proc = subprocess.Popen(cmd,
2627                                        stdout=subprocess.PIPE,
2628                                        stderr=subprocess.PIPE)
2629                with proc:
2630                    stdout = proc.communicate()[0]
2631                if stdout.strip() == b'developer':
2632                    print("this test triggers the Crash Reporter, "
2633                          "that is intentional", end='', flush=True)
2634
2635        return self
2636
2637    def __exit__(self, *ignore_exc):
2638        """Restore Windows ErrorMode or core file behavior to initial value."""
2639        if self.old_value is None:
2640            return
2641
2642        if sys.platform.startswith('win'):
2643            self._k32.SetErrorMode(self.old_value)
2644
2645            if self.old_modes:
2646                import msvcrt
2647                for report_type, (old_mode, old_file) in self.old_modes.items():
2648                    msvcrt.CrtSetReportMode(report_type, old_mode)
2649                    msvcrt.CrtSetReportFile(report_type, old_file)
2650        else:
2651            if resource is not None:
2652                try:
2653                    resource.setrlimit(resource.RLIMIT_CORE, self.old_value)
2654                except (ValueError, OSError):
2655                    pass
2656
2657
2658def patch(test_instance, object_to_patch, attr_name, new_value):
2659    """Override 'object_to_patch'.'attr_name' with 'new_value'.
2660
2661    Also, add a cleanup procedure to 'test_instance' to restore
2662    'object_to_patch' value for 'attr_name'.
2663    The 'attr_name' should be a valid attribute for 'object_to_patch'.
2664
2665    """
2666    # check that 'attr_name' is a real attribute for 'object_to_patch'
2667    # will raise AttributeError if it does not exist
2668    getattr(object_to_patch, attr_name)
2669
2670    # keep a copy of the old value
2671    attr_is_local = False
2672    try:
2673        old_value = object_to_patch.__dict__[attr_name]
2674    except (AttributeError, KeyError):
2675        old_value = getattr(object_to_patch, attr_name, None)
2676    else:
2677        attr_is_local = True
2678
2679    # restore the value when the test is done
2680    def cleanup():
2681        if attr_is_local:
2682            setattr(object_to_patch, attr_name, old_value)
2683        else:
2684            delattr(object_to_patch, attr_name)
2685
2686    test_instance.addCleanup(cleanup)
2687
2688    # actually override the attribute
2689    setattr(object_to_patch, attr_name, new_value)
2690
2691
2692def run_in_subinterp(code):
2693    """
2694    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
2695    module is enabled.
2696    """
2697    # Issue #10915, #15751: PyGILState_*() functions don't work with
2698    # sub-interpreters, the tracemalloc module uses these functions internally
2699    try:
2700        import tracemalloc
2701    except ImportError:
2702        pass
2703    else:
2704        if tracemalloc.is_tracing():
2705            raise unittest.SkipTest("run_in_subinterp() cannot be used "
2706                                     "if tracemalloc module is tracing "
2707                                     "memory allocations")
2708    import _testcapi
2709    return _testcapi.run_in_subinterp(code)
2710
2711
2712def check_free_after_iterating(test, iter, cls, args=()):
2713    class A(cls):
2714        def __del__(self):
2715            nonlocal done
2716            done = True
2717            try:
2718                next(it)
2719            except StopIteration:
2720                pass
2721
2722    done = False
2723    it = iter(A(*args))
2724    # Issue 26494: Shouldn't crash
2725    test.assertRaises(StopIteration, next, it)
2726    # The sequence should be deallocated just after the end of iterating
2727    gc_collect()
2728    test.assertTrue(done)
2729
2730
2731def missing_compiler_executable(cmd_names=[]):
2732    """Check if the compiler components used to build the interpreter exist.
2733
2734    Check for the existence of the compiler executables whose names are listed
2735    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
2736    and return the first missing executable or None when none is found
2737    missing.
2738
2739    """
2740    from distutils import ccompiler, sysconfig, spawn
2741    compiler = ccompiler.new_compiler()
2742    sysconfig.customize_compiler(compiler)
2743    for name in compiler.executables:
2744        if cmd_names and name not in cmd_names:
2745            continue
2746        cmd = getattr(compiler, name)
2747        if cmd_names:
2748            assert cmd is not None, \
2749                    "the '%s' executable is not configured" % name
2750        elif cmd is None:
2751            continue
2752        if spawn.find_executable(cmd[0]) is None:
2753            return cmd[0]
2754
2755
2756_is_android_emulator = None
2757def setswitchinterval(interval):
2758    # Setting a very low gil interval on the Android emulator causes python
2759    # to hang (issue #26939).
2760    minimum_interval = 1e-5
2761    if is_android and interval < minimum_interval:
2762        global _is_android_emulator
2763        if _is_android_emulator is None:
2764            _is_android_emulator = (subprocess.check_output(
2765                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
2766        if _is_android_emulator:
2767            interval = minimum_interval
2768    return sys.setswitchinterval(interval)
2769
2770
2771@contextlib.contextmanager
2772def disable_faulthandler():
2773    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
2774    # sys.stderr with a StringIO which has no file descriptor when a test
2775    # is run with -W/--verbose3.
2776    fd = sys.__stderr__.fileno()
2777
2778    is_enabled = faulthandler.is_enabled()
2779    try:
2780        faulthandler.disable()
2781        yield
2782    finally:
2783        if is_enabled:
2784            faulthandler.enable(file=fd, all_threads=True)
2785
2786
2787def fd_count():
2788    """Count the number of open file descriptors.
2789    """
2790    if sys.platform.startswith(('linux', 'freebsd')):
2791        try:
2792            names = os.listdir("/proc/self/fd")
2793            # Substract one because listdir() opens internally a file
2794            # descriptor to list the content of the /proc/self/fd/ directory.
2795            return len(names) - 1
2796        except FileNotFoundError:
2797            pass
2798
2799    MAXFD = 256
2800    if hasattr(os, 'sysconf'):
2801        try:
2802            MAXFD = os.sysconf("SC_OPEN_MAX")
2803        except OSError:
2804            pass
2805
2806    old_modes = None
2807    if sys.platform == 'win32':
2808        # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
2809        # on invalid file descriptor if Python is compiled in debug mode
2810        try:
2811            import msvcrt
2812            msvcrt.CrtSetReportMode
2813        except (AttributeError, ImportError):
2814            # no msvcrt or a release build
2815            pass
2816        else:
2817            old_modes = {}
2818            for report_type in (msvcrt.CRT_WARN,
2819                                msvcrt.CRT_ERROR,
2820                                msvcrt.CRT_ASSERT):
2821                old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0)
2822
2823    try:
2824        count = 0
2825        for fd in range(MAXFD):
2826            try:
2827                # Prefer dup() over fstat(). fstat() can require input/output
2828                # whereas dup() doesn't.
2829                fd2 = os.dup(fd)
2830            except OSError as e:
2831                if e.errno != errno.EBADF:
2832                    raise
2833            else:
2834                os.close(fd2)
2835                count += 1
2836    finally:
2837        if old_modes is not None:
2838            for report_type in (msvcrt.CRT_WARN,
2839                                msvcrt.CRT_ERROR,
2840                                msvcrt.CRT_ASSERT):
2841                msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
2842
2843    return count
2844
2845
2846class SaveSignals:
2847    """
2848    Save an restore signal handlers.
2849
2850    This class is only able to save/restore signal handlers registered
2851    by the Python signal module: see bpo-13285 for "external" signal
2852    handlers.
2853    """
2854
2855    def __init__(self):
2856        import signal
2857        self.signal = signal
2858        self.signals = list(range(1, signal.NSIG))
2859        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
2860        for signame in ('SIGKILL', 'SIGSTOP'):
2861            try:
2862                signum = getattr(signal, signame)
2863            except AttributeError:
2864                continue
2865            self.signals.remove(signum)
2866        self.handlers = {}
2867
2868    def save(self):
2869        for signum in self.signals:
2870            handler = self.signal.getsignal(signum)
2871            if handler is None:
2872                # getsignal() returns None if a signal handler was not
2873                # registered by the Python signal module,
2874                # and the handler is not SIG_DFL nor SIG_IGN.
2875                #
2876                # Ignore the signal: we cannot restore the handler.
2877                continue
2878            self.handlers[signum] = handler
2879
2880    def restore(self):
2881        for signum, handler in self.handlers.items():
2882            self.signal.signal(signum, handler)
2883
2884
2885def with_pymalloc():
2886    import _testcapi
2887    return _testcapi.WITH_PYMALLOC
2888
2889
2890class FakePath:
2891    """Simple implementing of the path protocol.
2892    """
2893    def __init__(self, path):
2894        self.path = path
2895
2896    def __repr__(self):
2897        return f'<FakePath {self.path!r}>'
2898
2899    def __fspath__(self):
2900        if (isinstance(self.path, BaseException) or
2901            isinstance(self.path, type) and
2902                issubclass(self.path, BaseException)):
2903            raise self.path
2904        else:
2905            return self.path
2906