• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.support':
4    raise ImportError('support must be imported from the test package')
5
6import contextlib
7import functools
8import os
9import re
10import stat
11import sys
12import sysconfig
13import time
14import types
15import unittest
16import warnings
17
18from .testresult import get_test_runner
19
20
21try:
22    from _testcapi import unicode_legacy_string
23except ImportError:
24    unicode_legacy_string = None
25
26__all__ = [
27    # globals
28    "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast",
29    # exceptions
30    "Error", "TestFailed", "TestDidNotRun", "ResourceDenied",
31    # io
32    "record_original_stdout", "get_original_stdout", "captured_stdout",
33    "captured_stdin", "captured_stderr",
34    # unittest
35    "is_resource_enabled", "requires", "requires_freebsd_version",
36    "requires_linux_version", "requires_mac_ver",
37    "check_syntax_error",
38    "BasicTestRunner", "run_unittest", "run_doctest",
39    "requires_gzip", "requires_bz2", "requires_lzma",
40    "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute",
41    "requires_IEEE_754", "requires_zlib",
42    "anticipate_failure", "load_package_tests", "detect_api_mismatch",
43    "check__all__", "skip_if_buggy_ucrt_strfptime",
44    "check_disallow_instantiation",
45    # sys
46    "is_jython", "is_android", "check_impl_detail", "unix_shell",
47    "setswitchinterval",
48    # network
49    "open_urlresource",
50    # processes
51    "reap_children",
52    # miscellaneous
53    "run_with_locale", "swap_item", "findfile",
54    "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict",
55    "run_with_tz", "PGO", "missing_compiler_executable",
56    "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST",
57    "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT",
58    ]
59
60
61# Timeout in seconds for tests using a network server listening on the network
62# local loopback interface like 127.0.0.1.
63#
64# The timeout is long enough to prevent test failure: it takes into account
65# that the client and the server can run in different threads or even different
66# processes.
67#
68# The timeout should be long enough for connect(), recv() and send() methods
69# of socket.socket.
70LOOPBACK_TIMEOUT = 5.0
71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version:
72    # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2
73    # seconds on Windows ARM32 buildbot
74    LOOPBACK_TIMEOUT = 10
75elif sys.platform == 'vxworks':
76    LOOPBACK_TIMEOUT = 10
77
78# Timeout in seconds for network requests going to the internet. The timeout is
79# short enough to prevent a test to wait for too long if the internet request
80# is blocked for whatever reason.
81#
82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed,
83# but skip the test instead: see transient_internet().
84INTERNET_TIMEOUT = 60.0
85
86# Timeout in seconds to mark a test as failed if the test takes "too long".
87#
88# The timeout value depends on the regrtest --timeout command line option.
89#
90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use
91# LONG_TIMEOUT instead.
92SHORT_TIMEOUT = 30.0
93
94# Timeout in seconds to detect when a test hangs.
95#
96# It is long enough to reduce the risk of test failure on the slowest Python
97# buildbots. It should not be used to mark a test as failed if the test takes
98# "too long". The timeout value depends on the regrtest --timeout command line
99# option.
100LONG_TIMEOUT = 5 * 60.0
101
102
103class Error(Exception):
104    """Base class for regression test exceptions."""
105
106class TestFailed(Error):
107    """Test failed."""
108
109class TestFailedWithDetails(TestFailed):
110    """Test failed."""
111    def __init__(self, msg, errors, failures):
112        self.msg = msg
113        self.errors = errors
114        self.failures = failures
115        super().__init__(msg, errors, failures)
116
117    def __str__(self):
118        return self.msg
119
120class TestDidNotRun(Error):
121    """Test did not run any subtests."""
122
123class ResourceDenied(unittest.SkipTest):
124    """Test skipped because it requested a disallowed resource.
125
126    This is raised when a test calls requires() for a resource that
127    has not be enabled.  It is used to distinguish between expected
128    and unexpected skips.
129    """
130
131def anticipate_failure(condition):
132    """Decorator to mark a test that is known to be broken in some cases
133
134       Any use of this decorator should have a comment identifying the
135       associated tracker issue.
136    """
137    if condition:
138        return unittest.expectedFailure
139    return lambda f: f
140
141def load_package_tests(pkg_dir, loader, standard_tests, pattern):
142    """Generic load_tests implementation for simple test packages.
143
144    Most packages can implement load_tests using this function as follows:
145
146       def load_tests(*args):
147           return load_package_tests(os.path.dirname(__file__), *args)
148    """
149    if pattern is None:
150        pattern = "test*"
151    top_dir = os.path.dirname(              # Lib
152                  os.path.dirname(              # test
153                      os.path.dirname(__file__)))   # support
154    package_tests = loader.discover(start_dir=pkg_dir,
155                                    top_level_dir=top_dir,
156                                    pattern=pattern)
157    standard_tests.addTests(package_tests)
158    return standard_tests
159
160
161def get_attribute(obj, name):
162    """Get an attribute, raising SkipTest if AttributeError is raised."""
163    try:
164        attribute = getattr(obj, name)
165    except AttributeError:
166        raise unittest.SkipTest("object %r has no attribute %r" % (obj, name))
167    else:
168        return attribute
169
170verbose = 1              # Flag set to 0 by regrtest.py
171use_resources = None     # Flag set to [] by regrtest.py
172max_memuse = 0           # Disable bigmem tests (they will still be run with
173                         # small sizes, to make sure they work.)
174real_max_memuse = 0
175junit_xml_list = None    # list of testsuite XML elements
176failfast = False
177
178# _original_stdout is meant to hold stdout at the time regrtest began.
179# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
180# The point is to have some flavor of stdout the user can actually see.
181_original_stdout = None
182def record_original_stdout(stdout):
183    global _original_stdout
184    _original_stdout = stdout
185
186def get_original_stdout():
187    return _original_stdout or sys.stdout
188
189
190def _force_run(path, func, *args):
191    try:
192        return func(*args)
193    except OSError as err:
194        if verbose >= 2:
195            print('%s: %s' % (err.__class__.__name__, err))
196            print('re-run %s%r' % (func.__name__, args))
197        os.chmod(path, stat.S_IRWXU)
198        return func(*args)
199
200
201# Check whether a gui is actually available
202def _is_gui_available():
203    if hasattr(_is_gui_available, 'result'):
204        return _is_gui_available.result
205    import platform
206    reason = None
207    if sys.platform.startswith('win') and platform.win32_is_iot():
208        reason = "gui is not available on Windows IoT Core"
209    elif sys.platform.startswith('win'):
210        # if Python is running as a service (such as the buildbot service),
211        # gui interaction may be disallowed
212        import ctypes
213        import ctypes.wintypes
214        UOI_FLAGS = 1
215        WSF_VISIBLE = 0x0001
216        class USEROBJECTFLAGS(ctypes.Structure):
217            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
218                        ("fReserved", ctypes.wintypes.BOOL),
219                        ("dwFlags", ctypes.wintypes.DWORD)]
220        dll = ctypes.windll.user32
221        h = dll.GetProcessWindowStation()
222        if not h:
223            raise ctypes.WinError()
224        uof = USEROBJECTFLAGS()
225        needed = ctypes.wintypes.DWORD()
226        res = dll.GetUserObjectInformationW(h,
227            UOI_FLAGS,
228            ctypes.byref(uof),
229            ctypes.sizeof(uof),
230            ctypes.byref(needed))
231        if not res:
232            raise ctypes.WinError()
233        if not bool(uof.dwFlags & WSF_VISIBLE):
234            reason = "gui not available (WSF_VISIBLE flag not set)"
235    elif sys.platform == 'darwin':
236        # The Aqua Tk implementations on OS X can abort the process if
237        # being called in an environment where a window server connection
238        # cannot be made, for instance when invoked by a buildbot or ssh
239        # process not running under the same user id as the current console
240        # user.  To avoid that, raise an exception if the window manager
241        # connection is not available.
242        from ctypes import cdll, c_int, pointer, Structure
243        from ctypes.util import find_library
244
245        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
246
247        if app_services.CGMainDisplayID() == 0:
248            reason = "gui tests cannot run without OS X window manager"
249        else:
250            class ProcessSerialNumber(Structure):
251                _fields_ = [("highLongOfPSN", c_int),
252                            ("lowLongOfPSN", c_int)]
253            psn = ProcessSerialNumber()
254            psn_p = pointer(psn)
255            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
256                  (app_services.SetFrontProcess(psn_p) < 0) ):
257                reason = "cannot run without OS X gui process"
258
259    # check on every platform whether tkinter can actually do anything
260    if not reason:
261        try:
262            from tkinter import Tk
263            root = Tk()
264            root.withdraw()
265            root.update()
266            root.destroy()
267        except Exception as e:
268            err_string = str(e)
269            if len(err_string) > 50:
270                err_string = err_string[:50] + ' [...]'
271            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
272                                                           err_string)
273
274    _is_gui_available.reason = reason
275    _is_gui_available.result = not reason
276
277    return _is_gui_available.result
278
279def is_resource_enabled(resource):
280    """Test whether a resource is enabled.
281
282    Known resources are set by regrtest.py.  If not running under regrtest.py,
283    all resources are assumed enabled unless use_resources has been set.
284    """
285    return use_resources is None or resource in use_resources
286
287def requires(resource, msg=None):
288    """Raise ResourceDenied if the specified resource is not available."""
289    if not is_resource_enabled(resource):
290        if msg is None:
291            msg = "Use of the %r resource not enabled" % resource
292        raise ResourceDenied(msg)
293    if resource == 'gui' and not _is_gui_available():
294        raise ResourceDenied(_is_gui_available.reason)
295
296def _requires_unix_version(sysname, min_version):
297    """Decorator raising SkipTest if the OS is `sysname` and the version is less
298    than `min_version`.
299
300    For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if
301    the FreeBSD version is less than 7.2.
302    """
303    import platform
304    min_version_txt = '.'.join(map(str, min_version))
305    version_txt = platform.release().split('-', 1)[0]
306    if platform.system() == sysname:
307        try:
308            version = tuple(map(int, version_txt.split('.')))
309        except ValueError:
310            skip = False
311        else:
312            skip = version < min_version
313    else:
314        skip = False
315
316    return unittest.skipIf(
317        skip,
318        f"{sysname} version {min_version_txt} or higher required, not "
319        f"{version_txt}"
320    )
321
322
323def requires_freebsd_version(*min_version):
324    """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is
325    less than `min_version`.
326
327    For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD
328    version is less than 7.2.
329    """
330    return _requires_unix_version('FreeBSD', min_version)
331
332def requires_linux_version(*min_version):
333    """Decorator raising SkipTest if the OS is Linux and the Linux version is
334    less than `min_version`.
335
336    For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux
337    version is less than 2.6.32.
338    """
339    return _requires_unix_version('Linux', min_version)
340
341def requires_mac_ver(*min_version):
342    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
343    version if less than min_version.
344
345    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
346    is lesser than 10.5.
347    """
348    def decorator(func):
349        @functools.wraps(func)
350        def wrapper(*args, **kw):
351            if sys.platform == 'darwin':
352                import platform
353                version_txt = platform.mac_ver()[0]
354                try:
355                    version = tuple(map(int, version_txt.split('.')))
356                except ValueError:
357                    pass
358                else:
359                    if version < min_version:
360                        min_version_txt = '.'.join(map(str, min_version))
361                        raise unittest.SkipTest(
362                            "Mac OS X %s or higher required, not %s"
363                            % (min_version_txt, version_txt))
364            return func(*args, **kw)
365        wrapper.min_version = min_version
366        return wrapper
367    return decorator
368
369
370def system_must_validate_cert(f):
371    """Skip the test on TLS certificate validation failures."""
372    @functools.wraps(f)
373    def dec(*args, **kwargs):
374        try:
375            f(*args, **kwargs)
376        except OSError as e:
377            if "CERTIFICATE_VERIFY_FAILED" in str(e):
378                raise unittest.SkipTest("system does not contain "
379                                        "necessary certificates")
380            raise
381    return dec
382
383# A constant likely larger than the underlying OS pipe buffer size, to
384# make writes blocking.
385# Windows limit seems to be around 512 B, and many Unix kernels have a
386# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
387# (see issue #17835 for a discussion of this number).
388PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
389
390# A constant likely larger than the underlying OS socket buffer size, to make
391# writes blocking.
392# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
393# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF).  See issue #18643
394# for a discussion of this number.
395SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
396
397# decorator for skipping tests on non-IEEE 754 platforms
398requires_IEEE_754 = unittest.skipUnless(
399    float.__getformat__("double").startswith("IEEE"),
400    "test requires IEEE 754 doubles")
401
402def requires_zlib(reason='requires zlib'):
403    try:
404        import zlib
405    except ImportError:
406        zlib = None
407    return unittest.skipUnless(zlib, reason)
408
409def requires_gzip(reason='requires gzip'):
410    try:
411        import gzip
412    except ImportError:
413        gzip = None
414    return unittest.skipUnless(gzip, reason)
415
416def requires_bz2(reason='requires bz2'):
417    try:
418        import bz2
419    except ImportError:
420        bz2 = None
421    return unittest.skipUnless(bz2, reason)
422
423def requires_lzma(reason='requires lzma'):
424    try:
425        import lzma
426    except ImportError:
427        lzma = None
428    return unittest.skipUnless(lzma, reason)
429
430requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string,
431                        'requires legacy Unicode C API')
432
433is_jython = sys.platform.startswith('java')
434
435is_android = hasattr(sys, 'getandroidapilevel')
436
437if sys.platform not in ('win32', 'vxworks'):
438    unix_shell = '/system/bin/sh' if is_android else '/bin/sh'
439else:
440    unix_shell = None
441
442# Define the URL of a dedicated HTTP server for the network tests.
443# The URL must use clear-text HTTP: no redirection to encrypted HTTPS.
444TEST_HTTP_URL = "http://www.pythontest.net"
445
446# Set by libregrtest/main.py so we can skip tests that are not
447# useful for PGO
448PGO = False
449
450# Set by libregrtest/main.py if we are running the extended (time consuming)
451# PGO task.  If this is True, PGO is also True.
452PGO_EXTENDED = False
453
454# TEST_HOME_DIR refers to the top level directory of the "test" package
455# that contains Python's regression test suite
456TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
457TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
458
459# TEST_DATA_DIR is used as a target download location for remote resources
460TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
461
462
463def darwin_malloc_err_warning(test_name):
464    """Assure user that loud errors generated by macOS libc's malloc are
465    expected."""
466    if sys.platform != 'darwin':
467        return
468
469    import shutil
470    msg = ' NOTICE '
471    detail = (f'{test_name} may generate "malloc can\'t allocate region"\n'
472              'warnings on macOS systems. This behavior is known. Do not\n'
473              'report a bug unless tests are also failing. See bpo-40928.')
474
475    padding, _ = shutil.get_terminal_size()
476    print(msg.center(padding, '-'))
477    print(detail)
478    print('-' * padding)
479
480
481def findfile(filename, subdir=None):
482    """Try to find a file on sys.path or in the test directory.  If it is not
483    found the argument passed to the function is returned (this does not
484    necessarily signal failure; could still be the legitimate path).
485
486    Setting *subdir* indicates a relative path to use to find the file
487    rather than looking directly in the path directories.
488    """
489    if os.path.isabs(filename):
490        return filename
491    if subdir is not None:
492        filename = os.path.join(subdir, filename)
493    path = [TEST_HOME_DIR] + sys.path
494    for dn in path:
495        fn = os.path.join(dn, filename)
496        if os.path.exists(fn): return fn
497    return filename
498
499
500def sortdict(dict):
501    "Like repr(dict), but in sorted order."
502    items = sorted(dict.items())
503    reprpairs = ["%r: %r" % pair for pair in items]
504    withcommas = ", ".join(reprpairs)
505    return "{%s}" % withcommas
506
507def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None):
508    with testcase.assertRaisesRegex(SyntaxError, errtext) as cm:
509        compile(statement, '<test string>', 'exec')
510    err = cm.exception
511    testcase.assertIsNotNone(err.lineno)
512    if lineno is not None:
513        testcase.assertEqual(err.lineno, lineno)
514    testcase.assertIsNotNone(err.offset)
515    if offset is not None:
516        testcase.assertEqual(err.offset, offset)
517
518
519def open_urlresource(url, *args, **kw):
520    import urllib.request, urllib.parse
521    from .os_helper import unlink
522    try:
523        import gzip
524    except ImportError:
525        gzip = None
526
527    check = kw.pop('check', None)
528
529    filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
530
531    fn = os.path.join(TEST_DATA_DIR, filename)
532
533    def check_valid_file(fn):
534        f = open(fn, *args, **kw)
535        if check is None:
536            return f
537        elif check(f):
538            f.seek(0)
539            return f
540        f.close()
541
542    if os.path.exists(fn):
543        f = check_valid_file(fn)
544        if f is not None:
545            return f
546        unlink(fn)
547
548    # Verify the requirement before downloading the file
549    requires('urlfetch')
550
551    if verbose:
552        print('\tfetching %s ...' % url, file=get_original_stdout())
553    opener = urllib.request.build_opener()
554    if gzip:
555        opener.addheaders.append(('Accept-Encoding', 'gzip'))
556    f = opener.open(url, timeout=INTERNET_TIMEOUT)
557    if gzip and f.headers.get('Content-Encoding') == 'gzip':
558        f = gzip.GzipFile(fileobj=f)
559    try:
560        with open(fn, "wb") as out:
561            s = f.read()
562            while s:
563                out.write(s)
564                s = f.read()
565    finally:
566        f.close()
567
568    f = check_valid_file(fn)
569    if f is not None:
570        return f
571    raise TestFailed('invalid resource %r' % fn)
572
573
574@contextlib.contextmanager
575def captured_output(stream_name):
576    """Return a context manager used by captured_stdout/stdin/stderr
577    that temporarily replaces the sys stream *stream_name* with a StringIO."""
578    import io
579    orig_stdout = getattr(sys, stream_name)
580    setattr(sys, stream_name, io.StringIO())
581    try:
582        yield getattr(sys, stream_name)
583    finally:
584        setattr(sys, stream_name, orig_stdout)
585
586def captured_stdout():
587    """Capture the output of sys.stdout:
588
589       with captured_stdout() as stdout:
590           print("hello")
591       self.assertEqual(stdout.getvalue(), "hello\\n")
592    """
593    return captured_output("stdout")
594
595def captured_stderr():
596    """Capture the output of sys.stderr:
597
598       with captured_stderr() as stderr:
599           print("hello", file=sys.stderr)
600       self.assertEqual(stderr.getvalue(), "hello\\n")
601    """
602    return captured_output("stderr")
603
604def captured_stdin():
605    """Capture the input to sys.stdin:
606
607       with captured_stdin() as stdin:
608           stdin.write('hello\\n')
609           stdin.seek(0)
610           # call test code that consumes from sys.stdin
611           captured = input()
612       self.assertEqual(captured, "hello")
613    """
614    return captured_output("stdin")
615
616
617def gc_collect():
618    """Force as many objects as possible to be collected.
619
620    In non-CPython implementations of Python, this is needed because timely
621    deallocation is not guaranteed by the garbage collector.  (Even in CPython
622    this can be the case in case of reference cycles.)  This means that __del__
623    methods may be called later than expected and weakrefs may remain alive for
624    longer than expected.  This function tries its best to force all garbage
625    objects to disappear.
626    """
627    import gc
628    gc.collect()
629    if is_jython:
630        time.sleep(0.1)
631    gc.collect()
632    gc.collect()
633
634@contextlib.contextmanager
635def disable_gc():
636    import gc
637    have_gc = gc.isenabled()
638    gc.disable()
639    try:
640        yield
641    finally:
642        if have_gc:
643            gc.enable()
644
645
646def python_is_optimized():
647    """Find if Python was built with optimizations."""
648    cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
649    final_opt = ""
650    for opt in cflags.split():
651        if opt.startswith('-O'):
652            final_opt = opt
653    return final_opt not in ('', '-O0', '-Og')
654
655
656_header = 'nP'
657_align = '0n'
658if hasattr(sys, "getobjects"):
659    _header = '2P' + _header
660    _align = '0P'
661_vheader = _header + 'n'
662
663def calcobjsize(fmt):
664    import struct
665    return struct.calcsize(_header + fmt + _align)
666
667def calcvobjsize(fmt):
668    import struct
669    return struct.calcsize(_vheader + fmt + _align)
670
671
672_TPFLAGS_HAVE_GC = 1<<14
673_TPFLAGS_HEAPTYPE = 1<<9
674
675def check_sizeof(test, o, size):
676    import _testinternalcapi
677    result = sys.getsizeof(o)
678    # add GC header size
679    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
680        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
681        size += _testinternalcapi.SIZEOF_PYGC_HEAD
682    msg = 'wrong size for %s: got %d, expected %d' \
683            % (type(o), result, size)
684    test.assertEqual(result, size, msg)
685
686#=======================================================================
687# Decorator for running a function in a different locale, correctly resetting
688# it afterwards.
689
690@contextlib.contextmanager
691def run_with_locale(catstr, *locales):
692            try:
693                import locale
694                category = getattr(locale, catstr)
695                orig_locale = locale.setlocale(category)
696            except AttributeError:
697                # if the test author gives us an invalid category string
698                raise
699            except:
700                # cannot retrieve original locale, so do nothing
701                locale = orig_locale = None
702            else:
703                for loc in locales:
704                    try:
705                        locale.setlocale(category, loc)
706                        break
707                    except:
708                        pass
709
710            try:
711                yield
712            finally:
713                if locale and orig_locale:
714                    locale.setlocale(category, orig_locale)
715
716#=======================================================================
717# Decorator for running a function in a specific timezone, correctly
718# resetting it afterwards.
719
720def run_with_tz(tz):
721    def decorator(func):
722        def inner(*args, **kwds):
723            try:
724                tzset = time.tzset
725            except AttributeError:
726                raise unittest.SkipTest("tzset required")
727            if 'TZ' in os.environ:
728                orig_tz = os.environ['TZ']
729            else:
730                orig_tz = None
731            os.environ['TZ'] = tz
732            tzset()
733
734            # now run the function, resetting the tz on exceptions
735            try:
736                return func(*args, **kwds)
737            finally:
738                if orig_tz is None:
739                    del os.environ['TZ']
740                else:
741                    os.environ['TZ'] = orig_tz
742                time.tzset()
743
744        inner.__name__ = func.__name__
745        inner.__doc__ = func.__doc__
746        return inner
747    return decorator
748
749#=======================================================================
750# Big-memory-test support. Separate from 'resources' because memory use
751# should be configurable.
752
753# Some handy shorthands. Note that these are used for byte-limits as well
754# as size-limits, in the various bigmem tests
755_1M = 1024*1024
756_1G = 1024 * _1M
757_2G = 2 * _1G
758_4G = 4 * _1G
759
760MAX_Py_ssize_t = sys.maxsize
761
762def set_memlimit(limit):
763    global max_memuse
764    global real_max_memuse
765    sizes = {
766        'k': 1024,
767        'm': _1M,
768        'g': _1G,
769        't': 1024*_1G,
770    }
771    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
772                 re.IGNORECASE | re.VERBOSE)
773    if m is None:
774        raise ValueError('Invalid memory limit %r' % (limit,))
775    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
776    real_max_memuse = memlimit
777    if memlimit > MAX_Py_ssize_t:
778        memlimit = MAX_Py_ssize_t
779    if memlimit < _2G - 1:
780        raise ValueError('Memory limit %r too low to be useful' % (limit,))
781    max_memuse = memlimit
782
783class _MemoryWatchdog:
784    """An object which periodically watches the process' memory consumption
785    and prints it out.
786    """
787
788    def __init__(self):
789        self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
790        self.started = False
791
792    def start(self):
793        import warnings
794        try:
795            f = open(self.procfile, 'r')
796        except OSError as e:
797            warnings.warn('/proc not available for stats: {}'.format(e),
798                          RuntimeWarning)
799            sys.stderr.flush()
800            return
801
802        import subprocess
803        with f:
804            watchdog_script = findfile("memory_watchdog.py")
805            self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script],
806                                                 stdin=f,
807                                                 stderr=subprocess.DEVNULL)
808        self.started = True
809
810    def stop(self):
811        if self.started:
812            self.mem_watchdog.terminate()
813            self.mem_watchdog.wait()
814
815
816def bigmemtest(size, memuse, dry_run=True):
817    """Decorator for bigmem tests.
818
819    'size' is a requested size for the test (in arbitrary, test-interpreted
820    units.) 'memuse' is the number of bytes per unit for the test, or a good
821    estimate of it. For example, a test that needs two byte buffers, of 4 GiB
822    each, could be decorated with @bigmemtest(size=_4G, memuse=2).
823
824    The 'size' argument is normally passed to the decorated test method as an
825    extra argument. If 'dry_run' is true, the value passed to the test method
826    may be less than the requested value. If 'dry_run' is false, it means the
827    test doesn't support dummy runs when -M is not specified.
828    """
829    def decorator(f):
830        def wrapper(self):
831            size = wrapper.size
832            memuse = wrapper.memuse
833            if not real_max_memuse:
834                maxsize = 5147
835            else:
836                maxsize = size
837
838            if ((real_max_memuse or not dry_run)
839                and real_max_memuse < maxsize * memuse):
840                raise unittest.SkipTest(
841                    "not enough memory: %.1fG minimum needed"
842                    % (size * memuse / (1024 ** 3)))
843
844            if real_max_memuse and verbose:
845                print()
846                print(" ... expected peak memory use: {peak:.1f}G"
847                      .format(peak=size * memuse / (1024 ** 3)))
848                watchdog = _MemoryWatchdog()
849                watchdog.start()
850            else:
851                watchdog = None
852
853            try:
854                return f(self, maxsize)
855            finally:
856                if watchdog:
857                    watchdog.stop()
858
859        wrapper.size = size
860        wrapper.memuse = memuse
861        return wrapper
862    return decorator
863
864def bigaddrspacetest(f):
865    """Decorator for tests that fill the address space."""
866    def wrapper(self):
867        if max_memuse < MAX_Py_ssize_t:
868            if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31:
869                raise unittest.SkipTest(
870                    "not enough memory: try a 32-bit build instead")
871            else:
872                raise unittest.SkipTest(
873                    "not enough memory: %.1fG minimum needed"
874                    % (MAX_Py_ssize_t / (1024 ** 3)))
875        else:
876            return f(self)
877    return wrapper
878
879#=======================================================================
880# unittest integration.
881
882class BasicTestRunner:
883    def run(self, test):
884        result = unittest.TestResult()
885        test(result)
886        return result
887
888def _id(obj):
889    return obj
890
891def requires_resource(resource):
892    if resource == 'gui' and not _is_gui_available():
893        return unittest.skip(_is_gui_available.reason)
894    if is_resource_enabled(resource):
895        return _id
896    else:
897        return unittest.skip("resource {0!r} is not enabled".format(resource))
898
899def cpython_only(test):
900    """
901    Decorator for tests only applicable on CPython.
902    """
903    return impl_detail(cpython=True)(test)
904
905def impl_detail(msg=None, **guards):
906    if check_impl_detail(**guards):
907        return _id
908    if msg is None:
909        guardnames, default = _parse_guards(guards)
910        if default:
911            msg = "implementation detail not available on {0}"
912        else:
913            msg = "implementation detail specific to {0}"
914        guardnames = sorted(guardnames.keys())
915        msg = msg.format(' or '.join(guardnames))
916    return unittest.skip(msg)
917
918def _parse_guards(guards):
919    # Returns a tuple ({platform_name: run_me}, default_value)
920    if not guards:
921        return ({'cpython': True}, False)
922    is_true = list(guards.values())[0]
923    assert list(guards.values()) == [is_true] * len(guards)   # all True or all False
924    return (guards, not is_true)
925
926# Use the following check to guard CPython's implementation-specific tests --
927# or to run them only on the implementation(s) guarded by the arguments.
928def check_impl_detail(**guards):
929    """This function returns True or False depending on the host platform.
930       Examples:
931          if check_impl_detail():               # only on CPython (default)
932          if check_impl_detail(jython=True):    # only on Jython
933          if check_impl_detail(cpython=False):  # everywhere except on CPython
934    """
935    guards, default = _parse_guards(guards)
936    return guards.get(sys.implementation.name, default)
937
938
939def no_tracing(func):
940    """Decorator to temporarily turn off tracing for the duration of a test."""
941    if not hasattr(sys, 'gettrace'):
942        return func
943    else:
944        @functools.wraps(func)
945        def wrapper(*args, **kwargs):
946            original_trace = sys.gettrace()
947            try:
948                sys.settrace(None)
949                return func(*args, **kwargs)
950            finally:
951                sys.settrace(original_trace)
952        return wrapper
953
954
955def refcount_test(test):
956    """Decorator for tests which involve reference counting.
957
958    To start, the decorator does not run the test if is not run by CPython.
959    After that, any trace function is unset during the test to prevent
960    unexpected refcounts caused by the trace function.
961
962    """
963    return no_tracing(cpython_only(test))
964
965
966def _filter_suite(suite, pred):
967    """Recursively filter test cases in a suite based on a predicate."""
968    newtests = []
969    for test in suite._tests:
970        if isinstance(test, unittest.TestSuite):
971            _filter_suite(test, pred)
972            newtests.append(test)
973        else:
974            if pred(test):
975                newtests.append(test)
976    suite._tests = newtests
977
978def _run_suite(suite):
979    """Run tests from a unittest.TestSuite-derived class."""
980    runner = get_test_runner(sys.stdout,
981                             verbosity=verbose,
982                             capture_output=(junit_xml_list is not None))
983
984    result = runner.run(suite)
985
986    if junit_xml_list is not None:
987        junit_xml_list.append(result.get_xml_element())
988
989    if not result.testsRun and not result.skipped:
990        raise TestDidNotRun
991    if not result.wasSuccessful():
992        if len(result.errors) == 1 and not result.failures:
993            err = result.errors[0][1]
994        elif len(result.failures) == 1 and not result.errors:
995            err = result.failures[0][1]
996        else:
997            err = "multiple errors occurred"
998            if not verbose: err += "; run in verbose mode for details"
999        errors = [(str(tc), exc_str) for tc, exc_str in result.errors]
1000        failures = [(str(tc), exc_str) for tc, exc_str in result.failures]
1001        raise TestFailedWithDetails(err, errors, failures)
1002
1003
1004# By default, don't filter tests
1005_match_test_func = None
1006
1007_accept_test_patterns = None
1008_ignore_test_patterns = None
1009
1010
1011def match_test(test):
1012    # Function used by support.run_unittest() and regrtest --list-cases
1013    if _match_test_func is None:
1014        return True
1015    else:
1016        return _match_test_func(test.id())
1017
1018
1019def _is_full_match_test(pattern):
1020    # If a pattern contains at least one dot, it's considered
1021    # as a full test identifier.
1022    # Example: 'test.test_os.FileTests.test_access'.
1023    #
1024    # ignore patterns which contain fnmatch patterns: '*', '?', '[...]'
1025    # or '[!...]'. For example, ignore 'test_access*'.
1026    return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern))
1027
1028
1029def set_match_tests(accept_patterns=None, ignore_patterns=None):
1030    global _match_test_func, _accept_test_patterns, _ignore_test_patterns
1031
1032
1033    if accept_patterns is None:
1034        accept_patterns = ()
1035    if ignore_patterns is None:
1036        ignore_patterns = ()
1037
1038    accept_func = ignore_func = None
1039
1040    if accept_patterns != _accept_test_patterns:
1041        accept_patterns, accept_func = _compile_match_function(accept_patterns)
1042    if ignore_patterns != _ignore_test_patterns:
1043        ignore_patterns, ignore_func = _compile_match_function(ignore_patterns)
1044
1045    # Create a copy since patterns can be mutable and so modified later
1046    _accept_test_patterns = tuple(accept_patterns)
1047    _ignore_test_patterns = tuple(ignore_patterns)
1048
1049    if accept_func is not None or ignore_func is not None:
1050        def match_function(test_id):
1051            accept = True
1052            ignore = False
1053            if accept_func:
1054                accept = accept_func(test_id)
1055            if ignore_func:
1056                ignore = ignore_func(test_id)
1057            return accept and not ignore
1058
1059        _match_test_func = match_function
1060
1061
1062def _compile_match_function(patterns):
1063    if not patterns:
1064        func = None
1065        # set_match_tests(None) behaves as set_match_tests(())
1066        patterns = ()
1067    elif all(map(_is_full_match_test, patterns)):
1068        # Simple case: all patterns are full test identifier.
1069        # The test.bisect_cmd utility only uses such full test identifiers.
1070        func = set(patterns).__contains__
1071    else:
1072        import fnmatch
1073        regex = '|'.join(map(fnmatch.translate, patterns))
1074        # The search *is* case sensitive on purpose:
1075        # don't use flags=re.IGNORECASE
1076        regex_match = re.compile(regex).match
1077
1078        def match_test_regex(test_id):
1079            if regex_match(test_id):
1080                # The regex matches the whole identifier, for example
1081                # 'test.test_os.FileTests.test_access'.
1082                return True
1083            else:
1084                # Try to match parts of the test identifier.
1085                # For example, split 'test.test_os.FileTests.test_access'
1086                # into: 'test', 'test_os', 'FileTests' and 'test_access'.
1087                return any(map(regex_match, test_id.split(".")))
1088
1089        func = match_test_regex
1090
1091    return patterns, func
1092
1093
1094def run_unittest(*classes):
1095    """Run tests from unittest.TestCase-derived classes."""
1096    valid_types = (unittest.TestSuite, unittest.TestCase)
1097    suite = unittest.TestSuite()
1098    for cls in classes:
1099        if isinstance(cls, str):
1100            if cls in sys.modules:
1101                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1102            else:
1103                raise ValueError("str arguments must be keys in sys.modules")
1104        elif isinstance(cls, valid_types):
1105            suite.addTest(cls)
1106        else:
1107            suite.addTest(unittest.makeSuite(cls))
1108    _filter_suite(suite, match_test)
1109    _run_suite(suite)
1110
1111#=======================================================================
1112# Check for the presence of docstrings.
1113
1114# Rather than trying to enumerate all the cases where docstrings may be
1115# disabled, we just check for that directly
1116
1117def _check_docstrings():
1118    """Just used to check if docstrings are enabled"""
1119
1120MISSING_C_DOCSTRINGS = (check_impl_detail() and
1121                        sys.platform != 'win32' and
1122                        not sysconfig.get_config_var('WITH_DOC_STRINGS'))
1123
1124HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and
1125                   not MISSING_C_DOCSTRINGS)
1126
1127requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1128                                          "test requires docstrings")
1129
1130
1131#=======================================================================
1132# doctest driver.
1133
1134def run_doctest(module, verbosity=None, optionflags=0):
1135    """Run doctest on the given module.  Return (#failures, #tests).
1136
1137    If optional argument verbosity is not specified (or is None), pass
1138    support's belief about verbosity on to doctest.  Else doctest's
1139    usual behavior is used (it searches sys.argv for -v).
1140    """
1141
1142    import doctest
1143
1144    if verbosity is None:
1145        verbosity = verbose
1146    else:
1147        verbosity = None
1148
1149    f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags)
1150    if f:
1151        raise TestFailed("%d of %d doctests failed" % (f, t))
1152    if verbose:
1153        print('doctest (%s) ... %d tests with zero failures' %
1154              (module.__name__, t))
1155    return f, t
1156
1157
1158#=======================================================================
1159# Support for saving and restoring the imported modules.
1160
1161def print_warning(msg):
1162    # bpo-39983: Print into sys.__stderr__ to display the warning even
1163    # when sys.stderr is captured temporarily by a test
1164    for line in msg.splitlines():
1165        print(f"Warning -- {line}", file=sys.__stderr__, flush=True)
1166
1167
1168# Flag used by saved_test_environment of test.libregrtest.save_env,
1169# to check if a test modified the environment. The flag should be set to False
1170# before running a new test.
1171#
1172# For example, threading_helper.threading_cleanup() sets the flag is the function fails
1173# to cleanup threads.
1174environment_altered = False
1175
1176def reap_children():
1177    """Use this function at the end of test_main() whenever sub-processes
1178    are started.  This will help ensure that no extra children (zombies)
1179    stick around to hog resources and create problems when looking
1180    for refleaks.
1181    """
1182    global environment_altered
1183
1184    # Need os.waitpid(-1, os.WNOHANG): Windows is not supported
1185    if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')):
1186        return
1187
1188    # Reap all our dead child processes so we don't leave zombies around.
1189    # These hog resources and might be causing some of the buildbots to die.
1190    while True:
1191        try:
1192            # Read the exit status of any child process which already completed
1193            pid, status = os.waitpid(-1, os.WNOHANG)
1194        except OSError:
1195            break
1196
1197        if pid == 0:
1198            break
1199
1200        print_warning(f"reap_children() reaped child process {pid}")
1201        environment_altered = True
1202
1203
1204@contextlib.contextmanager
1205def swap_attr(obj, attr, new_val):
1206    """Temporary swap out an attribute with a new object.
1207
1208    Usage:
1209        with swap_attr(obj, "attr", 5):
1210            ...
1211
1212        This will set obj.attr to 5 for the duration of the with: block,
1213        restoring the old value at the end of the block. If `attr` doesn't
1214        exist on `obj`, it will be created and then deleted at the end of the
1215        block.
1216
1217        The old value (or None if it doesn't exist) will be assigned to the
1218        target of the "as" clause, if there is one.
1219    """
1220    if hasattr(obj, attr):
1221        real_val = getattr(obj, attr)
1222        setattr(obj, attr, new_val)
1223        try:
1224            yield real_val
1225        finally:
1226            setattr(obj, attr, real_val)
1227    else:
1228        setattr(obj, attr, new_val)
1229        try:
1230            yield
1231        finally:
1232            if hasattr(obj, attr):
1233                delattr(obj, attr)
1234
1235@contextlib.contextmanager
1236def swap_item(obj, item, new_val):
1237    """Temporary swap out an item with a new object.
1238
1239    Usage:
1240        with swap_item(obj, "item", 5):
1241            ...
1242
1243        This will set obj["item"] to 5 for the duration of the with: block,
1244        restoring the old value at the end of the block. If `item` doesn't
1245        exist on `obj`, it will be created and then deleted at the end of the
1246        block.
1247
1248        The old value (or None if it doesn't exist) will be assigned to the
1249        target of the "as" clause, if there is one.
1250    """
1251    if item in obj:
1252        real_val = obj[item]
1253        obj[item] = new_val
1254        try:
1255            yield real_val
1256        finally:
1257            obj[item] = real_val
1258    else:
1259        obj[item] = new_val
1260        try:
1261            yield
1262        finally:
1263            if item in obj:
1264                del obj[item]
1265
1266def args_from_interpreter_flags():
1267    """Return a list of command-line arguments reproducing the current
1268    settings in sys.flags and sys.warnoptions."""
1269    import subprocess
1270    return subprocess._args_from_interpreter_flags()
1271
1272def optim_args_from_interpreter_flags():
1273    """Return a list of command-line arguments reproducing the current
1274    optimization settings in sys.flags."""
1275    import subprocess
1276    return subprocess._optim_args_from_interpreter_flags()
1277
1278
1279class Matcher(object):
1280
1281    _partial_matches = ('msg', 'message')
1282
1283    def matches(self, d, **kwargs):
1284        """
1285        Try to match a single dict with the supplied arguments.
1286
1287        Keys whose values are strings and which are in self._partial_matches
1288        will be checked for partial (i.e. substring) matches. You can extend
1289        this scheme to (for example) do regular expression matching, etc.
1290        """
1291        result = True
1292        for k in kwargs:
1293            v = kwargs[k]
1294            dv = d.get(k)
1295            if not self.match_value(k, dv, v):
1296                result = False
1297                break
1298        return result
1299
1300    def match_value(self, k, dv, v):
1301        """
1302        Try to match a single stored value (dv) with a supplied value (v).
1303        """
1304        if type(v) != type(dv):
1305            result = False
1306        elif type(dv) is not str or k not in self._partial_matches:
1307            result = (v == dv)
1308        else:
1309            result = dv.find(v) >= 0
1310        return result
1311
1312
1313_buggy_ucrt = None
1314def skip_if_buggy_ucrt_strfptime(test):
1315    """
1316    Skip decorator for tests that use buggy strptime/strftime
1317
1318    If the UCRT bugs are present time.localtime().tm_zone will be
1319    an empty string, otherwise we assume the UCRT bugs are fixed
1320
1321    See bpo-37552 [Windows] strptime/strftime return invalid
1322    results with UCRT version 17763.615
1323    """
1324    import locale
1325    global _buggy_ucrt
1326    if _buggy_ucrt is None:
1327        if(sys.platform == 'win32' and
1328                locale.getdefaultlocale()[1]  == 'cp65001' and
1329                time.localtime().tm_zone == ''):
1330            _buggy_ucrt = True
1331        else:
1332            _buggy_ucrt = False
1333    return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test
1334
1335class PythonSymlink:
1336    """Creates a symlink for the current Python executable"""
1337    def __init__(self, link=None):
1338        from .os_helper import TESTFN
1339
1340        self.link = link or os.path.abspath(TESTFN)
1341        self._linked = []
1342        self.real = os.path.realpath(sys.executable)
1343        self._also_link = []
1344
1345        self._env = None
1346
1347        self._platform_specific()
1348
1349    def _platform_specific(self):
1350        pass
1351
1352    if sys.platform == "win32":
1353        def _platform_specific(self):
1354            import glob
1355            import _winapi
1356
1357            if os.path.lexists(self.real) and not os.path.exists(self.real):
1358                # App symlink appears to not exist, but we want the
1359                # real executable here anyway
1360                self.real = _winapi.GetModuleFileName(0)
1361
1362            dll = _winapi.GetModuleFileName(sys.dllhandle)
1363            src_dir = os.path.dirname(dll)
1364            dest_dir = os.path.dirname(self.link)
1365            self._also_link.append((
1366                dll,
1367                os.path.join(dest_dir, os.path.basename(dll))
1368            ))
1369            for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")):
1370                self._also_link.append((
1371                    runtime,
1372                    os.path.join(dest_dir, os.path.basename(runtime))
1373                ))
1374
1375            self._env = {k.upper(): os.getenv(k) for k in os.environ}
1376            self._env["PYTHONHOME"] = os.path.dirname(self.real)
1377            if sysconfig.is_python_build(True):
1378                self._env["PYTHONPATH"] = os.path.dirname(os.__file__)
1379
1380    def __enter__(self):
1381        os.symlink(self.real, self.link)
1382        self._linked.append(self.link)
1383        for real, link in self._also_link:
1384            os.symlink(real, link)
1385            self._linked.append(link)
1386        return self
1387
1388    def __exit__(self, exc_type, exc_value, exc_tb):
1389        for link in self._linked:
1390            try:
1391                os.remove(link)
1392            except IOError as ex:
1393                if verbose:
1394                    print("failed to clean up {}: {}".format(link, ex))
1395
1396    def _call(self, python, args, env, returncode):
1397        import subprocess
1398        cmd = [python, *args]
1399        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
1400                             stderr=subprocess.PIPE, env=env)
1401        r = p.communicate()
1402        if p.returncode != returncode:
1403            if verbose:
1404                print(repr(r[0]))
1405                print(repr(r[1]), file=sys.stderr)
1406            raise RuntimeError(
1407                'unexpected return code: {0} (0x{0:08X})'.format(p.returncode))
1408        return r
1409
1410    def call_real(self, *args, returncode=0):
1411        return self._call(self.real, args, None, returncode)
1412
1413    def call_link(self, *args, returncode=0):
1414        return self._call(self.link, args, self._env, returncode)
1415
1416
1417def skip_if_pgo_task(test):
1418    """Skip decorator for tests not run in (non-extended) PGO task"""
1419    ok = not PGO or PGO_EXTENDED
1420    msg = "Not run for (non-extended) PGO task"
1421    return test if ok else unittest.skip(msg)(test)
1422
1423
1424def detect_api_mismatch(ref_api, other_api, *, ignore=()):
1425    """Returns the set of items in ref_api not in other_api, except for a
1426    defined list of items to be ignored in this check.
1427
1428    By default this skips private attributes beginning with '_' but
1429    includes all magic methods, i.e. those starting and ending in '__'.
1430    """
1431    missing_items = set(dir(ref_api)) - set(dir(other_api))
1432    if ignore:
1433        missing_items -= set(ignore)
1434    missing_items = set(m for m in missing_items
1435                        if not m.startswith('_') or m.endswith('__'))
1436    return missing_items
1437
1438
1439def check__all__(test_case, module, name_of_module=None, extra=(),
1440                 not_exported=()):
1441    """Assert that the __all__ variable of 'module' contains all public names.
1442
1443    The module's public names (its API) are detected automatically based on
1444    whether they match the public name convention and were defined in
1445    'module'.
1446
1447    The 'name_of_module' argument can specify (as a string or tuple thereof)
1448    what module(s) an API could be defined in in order to be detected as a
1449    public API. One case for this is when 'module' imports part of its public
1450    API from other modules, possibly a C backend (like 'csv' and its '_csv').
1451
1452    The 'extra' argument can be a set of names that wouldn't otherwise be
1453    automatically detected as "public", like objects without a proper
1454    '__module__' attribute. If provided, it will be added to the
1455    automatically detected ones.
1456
1457    The 'not_exported' argument can be a set of names that must not be treated
1458    as part of the public API even though their names indicate otherwise.
1459
1460    Usage:
1461        import bar
1462        import foo
1463        import unittest
1464        from test import support
1465
1466        class MiscTestCase(unittest.TestCase):
1467            def test__all__(self):
1468                support.check__all__(self, foo)
1469
1470        class OtherTestCase(unittest.TestCase):
1471            def test__all__(self):
1472                extra = {'BAR_CONST', 'FOO_CONST'}
1473                not_exported = {'baz'}  # Undocumented name.
1474                # bar imports part of its API from _bar.
1475                support.check__all__(self, bar, ('bar', '_bar'),
1476                                     extra=extra, not_exported=not_exported)
1477
1478    """
1479
1480    if name_of_module is None:
1481        name_of_module = (module.__name__, )
1482    elif isinstance(name_of_module, str):
1483        name_of_module = (name_of_module, )
1484
1485    expected = set(extra)
1486
1487    for name in dir(module):
1488        if name.startswith('_') or name in not_exported:
1489            continue
1490        obj = getattr(module, name)
1491        if (getattr(obj, '__module__', None) in name_of_module or
1492                (not hasattr(obj, '__module__') and
1493                 not isinstance(obj, types.ModuleType))):
1494            expected.add(name)
1495    test_case.assertCountEqual(module.__all__, expected)
1496
1497
1498def suppress_msvcrt_asserts(verbose=False):
1499    try:
1500        import msvcrt
1501    except ImportError:
1502        return
1503
1504    msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS
1505                        | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT
1506                        | msvcrt.SEM_NOGPFAULTERRORBOX
1507                        | msvcrt.SEM_NOOPENFILEERRORBOX)
1508
1509    # CrtSetReportMode() is only available in debug build
1510    if hasattr(msvcrt, 'CrtSetReportMode'):
1511        for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]:
1512            if verbose:
1513                msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE)
1514                msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR)
1515            else:
1516                msvcrt.CrtSetReportMode(m, 0)
1517
1518
1519class SuppressCrashReport:
1520    """Try to prevent a crash report from popping up.
1521
1522    On Windows, don't display the Windows Error Reporting dialog.  On UNIX,
1523    disable the creation of coredump file.
1524    """
1525    old_value = None
1526    old_modes = None
1527
1528    def __enter__(self):
1529        """On Windows, disable Windows Error Reporting dialogs using
1530        SetErrorMode() and CrtSetReportMode().
1531
1532        On UNIX, try to save the previous core file size limit, then set
1533        soft limit to 0.
1534        """
1535        if sys.platform.startswith('win'):
1536            # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx
1537            try:
1538                import msvcrt
1539            except ImportError:
1540                return
1541
1542            self.old_value = msvcrt.GetErrorMode()
1543
1544            msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX)
1545
1546            # bpo-23314: Suppress assert dialogs in debug builds.
1547            # CrtSetReportMode() is only available in debug build.
1548            if hasattr(msvcrt, 'CrtSetReportMode'):
1549                self.old_modes = {}
1550                for report_type in [msvcrt.CRT_WARN,
1551                                    msvcrt.CRT_ERROR,
1552                                    msvcrt.CRT_ASSERT]:
1553                    old_mode = msvcrt.CrtSetReportMode(report_type,
1554                            msvcrt.CRTDBG_MODE_FILE)
1555                    old_file = msvcrt.CrtSetReportFile(report_type,
1556                            msvcrt.CRTDBG_FILE_STDERR)
1557                    self.old_modes[report_type] = old_mode, old_file
1558
1559        else:
1560            try:
1561                import resource
1562                self.resource = resource
1563            except ImportError:
1564                self.resource = None
1565            if self.resource is not None:
1566                try:
1567                    self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE)
1568                    self.resource.setrlimit(self.resource.RLIMIT_CORE,
1569                                            (0, self.old_value[1]))
1570                except (ValueError, OSError):
1571                    pass
1572
1573            if sys.platform == 'darwin':
1574                import subprocess
1575                # Check if the 'Crash Reporter' on OSX was configured
1576                # in 'Developer' mode and warn that it will get triggered
1577                # when it is.
1578                #
1579                # This assumes that this context manager is used in tests
1580                # that might trigger the next manager.
1581                cmd = ['/usr/bin/defaults', 'read',
1582                       'com.apple.CrashReporter', 'DialogType']
1583                proc = subprocess.Popen(cmd,
1584                                        stdout=subprocess.PIPE,
1585                                        stderr=subprocess.PIPE)
1586                with proc:
1587                    stdout = proc.communicate()[0]
1588                if stdout.strip() == b'developer':
1589                    print("this test triggers the Crash Reporter, "
1590                          "that is intentional", end='', flush=True)
1591
1592        return self
1593
1594    def __exit__(self, *ignore_exc):
1595        """Restore Windows ErrorMode or core file behavior to initial value."""
1596        if self.old_value is None:
1597            return
1598
1599        if sys.platform.startswith('win'):
1600            import msvcrt
1601            msvcrt.SetErrorMode(self.old_value)
1602
1603            if self.old_modes:
1604                for report_type, (old_mode, old_file) in self.old_modes.items():
1605                    msvcrt.CrtSetReportMode(report_type, old_mode)
1606                    msvcrt.CrtSetReportFile(report_type, old_file)
1607        else:
1608            if self.resource is not None:
1609                try:
1610                    self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value)
1611                except (ValueError, OSError):
1612                    pass
1613
1614
1615def patch(test_instance, object_to_patch, attr_name, new_value):
1616    """Override 'object_to_patch'.'attr_name' with 'new_value'.
1617
1618    Also, add a cleanup procedure to 'test_instance' to restore
1619    'object_to_patch' value for 'attr_name'.
1620    The 'attr_name' should be a valid attribute for 'object_to_patch'.
1621
1622    """
1623    # check that 'attr_name' is a real attribute for 'object_to_patch'
1624    # will raise AttributeError if it does not exist
1625    getattr(object_to_patch, attr_name)
1626
1627    # keep a copy of the old value
1628    attr_is_local = False
1629    try:
1630        old_value = object_to_patch.__dict__[attr_name]
1631    except (AttributeError, KeyError):
1632        old_value = getattr(object_to_patch, attr_name, None)
1633    else:
1634        attr_is_local = True
1635
1636    # restore the value when the test is done
1637    def cleanup():
1638        if attr_is_local:
1639            setattr(object_to_patch, attr_name, old_value)
1640        else:
1641            delattr(object_to_patch, attr_name)
1642
1643    test_instance.addCleanup(cleanup)
1644
1645    # actually override the attribute
1646    setattr(object_to_patch, attr_name, new_value)
1647
1648
1649def run_in_subinterp(code):
1650    """
1651    Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc
1652    module is enabled.
1653    """
1654    # Issue #10915, #15751: PyGILState_*() functions don't work with
1655    # sub-interpreters, the tracemalloc module uses these functions internally
1656    try:
1657        import tracemalloc
1658    except ImportError:
1659        pass
1660    else:
1661        if tracemalloc.is_tracing():
1662            raise unittest.SkipTest("run_in_subinterp() cannot be used "
1663                                     "if tracemalloc module is tracing "
1664                                     "memory allocations")
1665    import _testcapi
1666    return _testcapi.run_in_subinterp(code)
1667
1668
1669def check_free_after_iterating(test, iter, cls, args=()):
1670    class A(cls):
1671        def __del__(self):
1672            nonlocal done
1673            done = True
1674            try:
1675                next(it)
1676            except StopIteration:
1677                pass
1678
1679    done = False
1680    it = iter(A(*args))
1681    # Issue 26494: Shouldn't crash
1682    test.assertRaises(StopIteration, next, it)
1683    # The sequence should be deallocated just after the end of iterating
1684    gc_collect()
1685    test.assertTrue(done)
1686
1687
1688def missing_compiler_executable(cmd_names=[]):
1689    """Check if the compiler components used to build the interpreter exist.
1690
1691    Check for the existence of the compiler executables whose names are listed
1692    in 'cmd_names' or all the compiler executables when 'cmd_names' is empty
1693    and return the first missing executable or None when none is found
1694    missing.
1695
1696    """
1697    # TODO (PEP 632): alternate check without using distutils
1698    from distutils import ccompiler, sysconfig, spawn, errors
1699    compiler = ccompiler.new_compiler()
1700    sysconfig.customize_compiler(compiler)
1701    if compiler.compiler_type == "msvc":
1702        # MSVC has no executables, so check whether initialization succeeds
1703        try:
1704            compiler.initialize()
1705        except errors.DistutilsPlatformError:
1706            return "msvc"
1707    for name in compiler.executables:
1708        if cmd_names and name not in cmd_names:
1709            continue
1710        cmd = getattr(compiler, name)
1711        if cmd_names:
1712            assert cmd is not None, \
1713                    "the '%s' executable is not configured" % name
1714        elif not cmd:
1715            continue
1716        if spawn.find_executable(cmd[0]) is None:
1717            return cmd[0]
1718
1719
1720_is_android_emulator = None
1721def setswitchinterval(interval):
1722    # Setting a very low gil interval on the Android emulator causes python
1723    # to hang (issue #26939).
1724    minimum_interval = 1e-5
1725    if is_android and interval < minimum_interval:
1726        global _is_android_emulator
1727        if _is_android_emulator is None:
1728            import subprocess
1729            _is_android_emulator = (subprocess.check_output(
1730                               ['getprop', 'ro.kernel.qemu']).strip() == b'1')
1731        if _is_android_emulator:
1732            interval = minimum_interval
1733    return sys.setswitchinterval(interval)
1734
1735
1736@contextlib.contextmanager
1737def disable_faulthandler():
1738    import faulthandler
1739
1740    # use sys.__stderr__ instead of sys.stderr, since regrtest replaces
1741    # sys.stderr with a StringIO which has no file descriptor when a test
1742    # is run with -W/--verbose3.
1743    fd = sys.__stderr__.fileno()
1744
1745    is_enabled = faulthandler.is_enabled()
1746    try:
1747        faulthandler.disable()
1748        yield
1749    finally:
1750        if is_enabled:
1751            faulthandler.enable(file=fd, all_threads=True)
1752
1753
1754class SaveSignals:
1755    """
1756    Save and restore signal handlers.
1757
1758    This class is only able to save/restore signal handlers registered
1759    by the Python signal module: see bpo-13285 for "external" signal
1760    handlers.
1761    """
1762
1763    def __init__(self):
1764        import signal
1765        self.signal = signal
1766        self.signals = signal.valid_signals()
1767        # SIGKILL and SIGSTOP signals cannot be ignored nor caught
1768        for signame in ('SIGKILL', 'SIGSTOP'):
1769            try:
1770                signum = getattr(signal, signame)
1771            except AttributeError:
1772                continue
1773            self.signals.remove(signum)
1774        self.handlers = {}
1775
1776    def save(self):
1777        for signum in self.signals:
1778            handler = self.signal.getsignal(signum)
1779            if handler is None:
1780                # getsignal() returns None if a signal handler was not
1781                # registered by the Python signal module,
1782                # and the handler is not SIG_DFL nor SIG_IGN.
1783                #
1784                # Ignore the signal: we cannot restore the handler.
1785                continue
1786            self.handlers[signum] = handler
1787
1788    def restore(self):
1789        for signum, handler in self.handlers.items():
1790            self.signal.signal(signum, handler)
1791
1792
1793def with_pymalloc():
1794    import _testcapi
1795    return _testcapi.WITH_PYMALLOC
1796
1797
1798class _ALWAYS_EQ:
1799    """
1800    Object that is equal to anything.
1801    """
1802    def __eq__(self, other):
1803        return True
1804    def __ne__(self, other):
1805        return False
1806
1807ALWAYS_EQ = _ALWAYS_EQ()
1808
1809class _NEVER_EQ:
1810    """
1811    Object that is not equal to anything.
1812    """
1813    def __eq__(self, other):
1814        return False
1815    def __ne__(self, other):
1816        return True
1817    def __hash__(self):
1818        return 1
1819
1820NEVER_EQ = _NEVER_EQ()
1821
1822@functools.total_ordering
1823class _LARGEST:
1824    """
1825    Object that is greater than anything (except itself).
1826    """
1827    def __eq__(self, other):
1828        return isinstance(other, _LARGEST)
1829    def __lt__(self, other):
1830        return False
1831
1832LARGEST = _LARGEST()
1833
1834@functools.total_ordering
1835class _SMALLEST:
1836    """
1837    Object that is less than anything (except itself).
1838    """
1839    def __eq__(self, other):
1840        return isinstance(other, _SMALLEST)
1841    def __gt__(self, other):
1842        return False
1843
1844SMALLEST = _SMALLEST()
1845
1846def maybe_get_event_loop_policy():
1847    """Return the global event loop policy if one is set, else return None."""
1848    import asyncio.events
1849    return asyncio.events._event_loop_policy
1850
1851# Helpers for testing hashing.
1852NHASHBITS = sys.hash_info.width # number of bits in hash() result
1853assert NHASHBITS in (32, 64)
1854
1855# Return mean and sdev of number of collisions when tossing nballs balls
1856# uniformly at random into nbins bins.  By definition, the number of
1857# collisions is the number of balls minus the number of occupied bins at
1858# the end.
1859def collision_stats(nbins, nballs):
1860    n, k = nbins, nballs
1861    # prob a bin empty after k trials = (1 - 1/n)**k
1862    # mean # empty is then n * (1 - 1/n)**k
1863    # so mean # occupied is n - n * (1 - 1/n)**k
1864    # so collisions = k - (n - n*(1 - 1/n)**k)
1865    #
1866    # For the variance:
1867    # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 =
1868    # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty)
1869    #
1870    # Massive cancellation occurs, and, e.g., for a 64-bit hash code
1871    # 1-1/2**64 rounds uselessly to 1.0.  Rather than make heroic (and
1872    # error-prone) efforts to rework the naive formulas to avoid those,
1873    # we use the `decimal` module to get plenty of extra precision.
1874    #
1875    # Note:  the exact values are straightforward to compute with
1876    # rationals, but in context that's unbearably slow, requiring
1877    # multi-million bit arithmetic.
1878    import decimal
1879    with decimal.localcontext() as ctx:
1880        bits = n.bit_length() * 2  # bits in n**2
1881        # At least that many bits will likely cancel out.
1882        # Use that many decimal digits instead.
1883        ctx.prec = max(bits, 30)
1884        dn = decimal.Decimal(n)
1885        p1empty = ((dn - 1) / dn) ** k
1886        meanempty = n * p1empty
1887        occupied = n - meanempty
1888        collisions = k - occupied
1889        var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
1890        return float(collisions), float(var.sqrt())
1891
1892
1893class catch_unraisable_exception:
1894    """
1895    Context manager catching unraisable exception using sys.unraisablehook.
1896
1897    Storing the exception value (cm.unraisable.exc_value) creates a reference
1898    cycle. The reference cycle is broken explicitly when the context manager
1899    exits.
1900
1901    Storing the object (cm.unraisable.object) can resurrect it if it is set to
1902    an object which is being finalized. Exiting the context manager clears the
1903    stored object.
1904
1905    Usage:
1906
1907        with support.catch_unraisable_exception() as cm:
1908            # code creating an "unraisable exception"
1909            ...
1910
1911            # check the unraisable exception: use cm.unraisable
1912            ...
1913
1914        # cm.unraisable attribute no longer exists at this point
1915        # (to break a reference cycle)
1916    """
1917
1918    def __init__(self):
1919        self.unraisable = None
1920        self._old_hook = None
1921
1922    def _hook(self, unraisable):
1923        # Storing unraisable.object can resurrect an object which is being
1924        # finalized. Storing unraisable.exc_value creates a reference cycle.
1925        self.unraisable = unraisable
1926
1927    def __enter__(self):
1928        self._old_hook = sys.unraisablehook
1929        sys.unraisablehook = self._hook
1930        return self
1931
1932    def __exit__(self, *exc_info):
1933        sys.unraisablehook = self._old_hook
1934        del self.unraisable
1935
1936
1937def wait_process(pid, *, exitcode, timeout=None):
1938    """
1939    Wait until process pid completes and check that the process exit code is
1940    exitcode.
1941
1942    Raise an AssertionError if the process exit code is not equal to exitcode.
1943
1944    If the process runs longer than timeout seconds (SHORT_TIMEOUT by default),
1945    kill the process (if signal.SIGKILL is available) and raise an
1946    AssertionError. The timeout feature is not available on Windows.
1947    """
1948    if os.name != "nt":
1949        import signal
1950
1951        if timeout is None:
1952            timeout = SHORT_TIMEOUT
1953        t0 = time.monotonic()
1954        sleep = 0.001
1955        max_sleep = 0.1
1956        while True:
1957            pid2, status = os.waitpid(pid, os.WNOHANG)
1958            if pid2 != 0:
1959                break
1960            # process is still running
1961
1962            dt = time.monotonic() - t0
1963            if dt > SHORT_TIMEOUT:
1964                try:
1965                    os.kill(pid, signal.SIGKILL)
1966                    os.waitpid(pid, 0)
1967                except OSError:
1968                    # Ignore errors like ChildProcessError or PermissionError
1969                    pass
1970
1971                raise AssertionError(f"process {pid} is still running "
1972                                     f"after {dt:.1f} seconds")
1973
1974            sleep = min(sleep * 2, max_sleep)
1975            time.sleep(sleep)
1976    else:
1977        # Windows implementation
1978        pid2, status = os.waitpid(pid, 0)
1979
1980    exitcode2 = os.waitstatus_to_exitcode(status)
1981    if exitcode2 != exitcode:
1982        raise AssertionError(f"process {pid} exited with code {exitcode2}, "
1983                             f"but exit code {exitcode} is expected")
1984
1985    # sanity check: it should not fail in practice
1986    if pid2 != pid:
1987        raise AssertionError(f"pid {pid2} != pid {pid}")
1988
1989def skip_if_broken_multiprocessing_synchronize():
1990    """
1991    Skip tests if the multiprocessing.synchronize module is missing, if there
1992    is no available semaphore implementation, or if creating a lock raises an
1993    OSError (on Linux only).
1994    """
1995    from .import_helper import import_module
1996
1997    # Skip tests if the _multiprocessing extension is missing.
1998    import_module('_multiprocessing')
1999
2000    # Skip tests if there is no available semaphore implementation:
2001    # multiprocessing.synchronize requires _multiprocessing.SemLock.
2002    synchronize = import_module('multiprocessing.synchronize')
2003
2004    if sys.platform == "linux":
2005        try:
2006            # bpo-38377: On Linux, creating a semaphore fails with OSError
2007            # if the current user does not have the permission to create
2008            # a file in /dev/shm/ directory.
2009            synchronize.Lock(ctx=None)
2010        except OSError as exc:
2011            raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}")
2012
2013
2014@contextlib.contextmanager
2015def infinite_recursion(max_depth=75):
2016    original_depth = sys.getrecursionlimit()
2017    try:
2018        sys.setrecursionlimit(max_depth)
2019        yield
2020    finally:
2021        sys.setrecursionlimit(original_depth)
2022
2023
2024def check_disallow_instantiation(testcase, tp, *args, **kwds):
2025    """
2026    Check that given type cannot be instantiated using *args and **kwds.
2027
2028    See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag.
2029    """
2030    mod = tp.__module__
2031    name = tp.__name__
2032    if mod != 'builtins':
2033        qualname = f"{mod}.{name}"
2034    else:
2035        qualname = f"{name}"
2036    msg = f"cannot create '{re.escape(qualname)}' instances"
2037    testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds)
2038
2039
2040def ignore_deprecations_from(module: str, *, like: str) -> object:
2041    token = object()
2042    warnings.filterwarnings(
2043        "ignore",
2044        category=DeprecationWarning,
2045        module=module,
2046        message=like + fr"(?#support{id(token)})",
2047    )
2048    return token
2049
2050
2051def clear_ignored_deprecations(*tokens: object) -> None:
2052    if not tokens:
2053        raise ValueError("Provide token or tokens returned by ignore_deprecations_from")
2054
2055    new_filters = []
2056    endswith = tuple(rf"(?#support{id(token)})" for token in tokens)
2057    for action, message, category, module, lineno in warnings.filters:
2058        if action == "ignore" and category is DeprecationWarning:
2059            if isinstance(message, re.Pattern):
2060                msg = message.pattern
2061            else:
2062                msg = message or ""
2063            if msg.endswith(endswith):
2064                continue
2065        new_filters.append((action, message, category, module, lineno))
2066    if warnings.filters != new_filters:
2067        warnings.filters[:] = new_filters
2068        warnings._filters_mutated()
2069