1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import contextlib 7import dataclasses 8import functools 9import _opcode 10import os 11import re 12import stat 13import sys 14import sysconfig 15import textwrap 16import time 17import types 18import unittest 19import warnings 20 21 22__all__ = [ 23 # globals 24 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 25 # exceptions 26 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 27 # io 28 "record_original_stdout", "get_original_stdout", "captured_stdout", 29 "captured_stdin", "captured_stderr", "captured_output", 30 # unittest 31 "is_resource_enabled", "requires", "requires_freebsd_version", 32 "requires_gil_enabled", "requires_linux_version", "requires_mac_ver", 33 "check_syntax_error", 34 "requires_gzip", "requires_bz2", "requires_lzma", 35 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 36 "requires_IEEE_754", "requires_zlib", 37 "has_fork_support", "requires_fork", 38 "has_subprocess_support", "requires_subprocess", 39 "has_socket_support", "requires_working_socket", 40 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 41 "check__all__", "skip_if_buggy_ucrt_strfptime", 42 "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", 43 "requires_limited_api", "requires_specialization", 44 # sys 45 "MS_WINDOWS", "is_jython", "is_android", "is_emscripten", "is_wasi", 46 "is_apple_mobile", "check_impl_detail", "unix_shell", "setswitchinterval", 47 # os 48 "get_pagesize", 49 # network 50 "open_urlresource", 51 # processes 52 "reap_children", 53 # miscellaneous 54 "run_with_locale", "swap_item", "findfile", "infinite_recursion", 55 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 56 "run_with_tz", "PGO", "missing_compiler_executable", 57 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 58 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 59 "Py_DEBUG", "exceeds_recursion_limit", "get_c_recursion_limit", 60 "skip_on_s390x", 61 "without_optimizer", 62 "force_not_colorized", 63 "BrokenIter", 64 ] 65 66 67# Timeout in seconds for tests using a network server listening on the network 68# local loopback interface like 127.0.0.1. 69# 70# The timeout is long enough to prevent test failure: it takes into account 71# that the client and the server can run in different threads or even different 72# processes. 73# 74# The timeout should be long enough for connect(), recv() and send() methods 75# of socket.socket. 76LOOPBACK_TIMEOUT = 10.0 77 78# Timeout in seconds for network requests going to the internet. The timeout is 79# short enough to prevent a test to wait for too long if the internet request 80# is blocked for whatever reason. 81# 82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 83# but skip the test instead: see transient_internet(). 84INTERNET_TIMEOUT = 60.0 85 86# Timeout in seconds to mark a test as failed if the test takes "too long". 87# 88# The timeout value depends on the regrtest --timeout command line option. 89# 90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 91# LONG_TIMEOUT instead. 92SHORT_TIMEOUT = 30.0 93 94# Timeout in seconds to detect when a test hangs. 95# 96# It is long enough to reduce the risk of test failure on the slowest Python 97# buildbots. It should not be used to mark a test as failed if the test takes 98# "too long". The timeout value depends on the regrtest --timeout command line 99# option. 100LONG_TIMEOUT = 5 * 60.0 101 102# TEST_HOME_DIR refers to the top level directory of the "test" package 103# that contains Python's regression test suite 104TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 105TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 106STDLIB_DIR = os.path.dirname(TEST_HOME_DIR) 107REPO_ROOT = os.path.dirname(STDLIB_DIR) 108 109 110class Error(Exception): 111 """Base class for regression test exceptions.""" 112 113class TestFailed(Error): 114 """Test failed.""" 115 def __init__(self, msg, *args, stats=None): 116 self.msg = msg 117 self.stats = stats 118 super().__init__(msg, *args) 119 120 def __str__(self): 121 return self.msg 122 123class TestFailedWithDetails(TestFailed): 124 """Test failed.""" 125 def __init__(self, msg, errors, failures, stats): 126 self.errors = errors 127 self.failures = failures 128 super().__init__(msg, errors, failures, stats=stats) 129 130class TestDidNotRun(Error): 131 """Test did not run any subtests.""" 132 133class ResourceDenied(unittest.SkipTest): 134 """Test skipped because it requested a disallowed resource. 135 136 This is raised when a test calls requires() for a resource that 137 has not be enabled. It is used to distinguish between expected 138 and unexpected skips. 139 """ 140 141def anticipate_failure(condition): 142 """Decorator to mark a test that is known to be broken in some cases 143 144 Any use of this decorator should have a comment identifying the 145 associated tracker issue. 146 """ 147 if condition: 148 return unittest.expectedFailure 149 return lambda f: f 150 151def load_package_tests(pkg_dir, loader, standard_tests, pattern): 152 """Generic load_tests implementation for simple test packages. 153 154 Most packages can implement load_tests using this function as follows: 155 156 def load_tests(*args): 157 return load_package_tests(os.path.dirname(__file__), *args) 158 """ 159 if pattern is None: 160 pattern = "test*" 161 top_dir = STDLIB_DIR 162 package_tests = loader.discover(start_dir=pkg_dir, 163 top_level_dir=top_dir, 164 pattern=pattern) 165 standard_tests.addTests(package_tests) 166 return standard_tests 167 168 169def get_attribute(obj, name): 170 """Get an attribute, raising SkipTest if AttributeError is raised.""" 171 try: 172 attribute = getattr(obj, name) 173 except AttributeError: 174 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 175 else: 176 return attribute 177 178verbose = 1 # Flag set to 0 by regrtest.py 179use_resources = None # Flag set to [] by regrtest.py 180max_memuse = 0 # Disable bigmem tests (they will still be run with 181 # small sizes, to make sure they work.) 182real_max_memuse = 0 183junit_xml_list = None # list of testsuite XML elements 184failfast = False 185 186# _original_stdout is meant to hold stdout at the time regrtest began. 187# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 188# The point is to have some flavor of stdout the user can actually see. 189_original_stdout = None 190def record_original_stdout(stdout): 191 global _original_stdout 192 _original_stdout = stdout 193 194def get_original_stdout(): 195 return _original_stdout or sys.stdout 196 197 198def _force_run(path, func, *args): 199 try: 200 return func(*args) 201 except FileNotFoundError as err: 202 # chmod() won't fix a missing file. 203 if verbose >= 2: 204 print('%s: %s' % (err.__class__.__name__, err)) 205 raise 206 except OSError as err: 207 if verbose >= 2: 208 print('%s: %s' % (err.__class__.__name__, err)) 209 print('re-run %s%r' % (func.__name__, args)) 210 os.chmod(path, stat.S_IRWXU) 211 return func(*args) 212 213 214# Check whether a gui is actually available 215def _is_gui_available(): 216 if hasattr(_is_gui_available, 'result'): 217 return _is_gui_available.result 218 import platform 219 reason = None 220 if sys.platform.startswith('win') and platform.win32_is_iot(): 221 reason = "gui is not available on Windows IoT Core" 222 elif sys.platform.startswith('win'): 223 # if Python is running as a service (such as the buildbot service), 224 # gui interaction may be disallowed 225 import ctypes 226 import ctypes.wintypes 227 UOI_FLAGS = 1 228 WSF_VISIBLE = 0x0001 229 class USEROBJECTFLAGS(ctypes.Structure): 230 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 231 ("fReserved", ctypes.wintypes.BOOL), 232 ("dwFlags", ctypes.wintypes.DWORD)] 233 dll = ctypes.windll.user32 234 h = dll.GetProcessWindowStation() 235 if not h: 236 raise ctypes.WinError() 237 uof = USEROBJECTFLAGS() 238 needed = ctypes.wintypes.DWORD() 239 res = dll.GetUserObjectInformationW(h, 240 UOI_FLAGS, 241 ctypes.byref(uof), 242 ctypes.sizeof(uof), 243 ctypes.byref(needed)) 244 if not res: 245 raise ctypes.WinError() 246 if not bool(uof.dwFlags & WSF_VISIBLE): 247 reason = "gui not available (WSF_VISIBLE flag not set)" 248 elif sys.platform == 'darwin': 249 # The Aqua Tk implementations on OS X can abort the process if 250 # being called in an environment where a window server connection 251 # cannot be made, for instance when invoked by a buildbot or ssh 252 # process not running under the same user id as the current console 253 # user. To avoid that, raise an exception if the window manager 254 # connection is not available. 255 import subprocess 256 try: 257 rc = subprocess.run(["launchctl", "managername"], 258 capture_output=True, check=True) 259 managername = rc.stdout.decode("utf-8").strip() 260 except subprocess.CalledProcessError: 261 reason = "unable to detect macOS launchd job manager" 262 else: 263 if managername != "Aqua": 264 reason = f"{managername=} -- can only run in a macOS GUI session" 265 266 # check on every platform whether tkinter can actually do anything 267 if not reason: 268 try: 269 from tkinter import Tk 270 root = Tk() 271 root.withdraw() 272 root.update() 273 root.destroy() 274 except Exception as e: 275 err_string = str(e) 276 if len(err_string) > 50: 277 err_string = err_string[:50] + ' [...]' 278 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 279 err_string) 280 281 _is_gui_available.reason = reason 282 _is_gui_available.result = not reason 283 284 return _is_gui_available.result 285 286def is_resource_enabled(resource): 287 """Test whether a resource is enabled. 288 289 Known resources are set by regrtest.py. If not running under regrtest.py, 290 all resources are assumed enabled unless use_resources has been set. 291 """ 292 return use_resources is None or resource in use_resources 293 294def requires(resource, msg=None): 295 """Raise ResourceDenied if the specified resource is not available.""" 296 if not is_resource_enabled(resource): 297 if msg is None: 298 msg = "Use of the %r resource not enabled" % resource 299 raise ResourceDenied(msg) 300 if resource in {"network", "urlfetch"} and not has_socket_support: 301 raise ResourceDenied("No socket support") 302 if resource == 'gui' and not _is_gui_available(): 303 raise ResourceDenied(_is_gui_available.reason) 304 305def _requires_unix_version(sysname, min_version): 306 """Decorator raising SkipTest if the OS is `sysname` and the version is less 307 than `min_version`. 308 309 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 310 the FreeBSD version is less than 7.2. 311 """ 312 import platform 313 min_version_txt = '.'.join(map(str, min_version)) 314 version_txt = platform.release().split('-', 1)[0] 315 if platform.system() == sysname: 316 try: 317 version = tuple(map(int, version_txt.split('.'))) 318 except ValueError: 319 skip = False 320 else: 321 skip = version < min_version 322 else: 323 skip = False 324 325 return unittest.skipIf( 326 skip, 327 f"{sysname} version {min_version_txt} or higher required, not " 328 f"{version_txt}" 329 ) 330 331 332def requires_freebsd_version(*min_version): 333 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 334 less than `min_version`. 335 336 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 337 version is less than 7.2. 338 """ 339 return _requires_unix_version('FreeBSD', min_version) 340 341def requires_linux_version(*min_version): 342 """Decorator raising SkipTest if the OS is Linux and the Linux version is 343 less than `min_version`. 344 345 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 346 version is less than 2.6.32. 347 """ 348 return _requires_unix_version('Linux', min_version) 349 350def requires_mac_ver(*min_version): 351 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 352 version if less than min_version. 353 354 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 355 is lesser than 10.5. 356 """ 357 def decorator(func): 358 @functools.wraps(func) 359 def wrapper(*args, **kw): 360 if sys.platform == 'darwin': 361 import platform 362 version_txt = platform.mac_ver()[0] 363 try: 364 version = tuple(map(int, version_txt.split('.'))) 365 except ValueError: 366 pass 367 else: 368 if version < min_version: 369 min_version_txt = '.'.join(map(str, min_version)) 370 raise unittest.SkipTest( 371 "Mac OS X %s or higher required, not %s" 372 % (min_version_txt, version_txt)) 373 return func(*args, **kw) 374 wrapper.min_version = min_version 375 return wrapper 376 return decorator 377 378 379def skip_if_buildbot(reason=None): 380 """Decorator raising SkipTest if running on a buildbot.""" 381 import getpass 382 if not reason: 383 reason = 'not suitable for buildbots' 384 try: 385 isbuildbot = getpass.getuser().lower() == 'buildbot' 386 except (KeyError, OSError) as err: 387 warnings.warn(f'getpass.getuser() failed {err}.', RuntimeWarning) 388 isbuildbot = False 389 return unittest.skipIf(isbuildbot, reason) 390 391def check_sanitizer(*, address=False, memory=False, ub=False, thread=False): 392 """Returns True if Python is compiled with sanitizer support""" 393 if not (address or memory or ub or thread): 394 raise ValueError('At least one of address, memory, ub or thread must be True') 395 396 397 cflags = sysconfig.get_config_var('CFLAGS') or '' 398 config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 399 memory_sanitizer = ( 400 '-fsanitize=memory' in cflags or 401 '--with-memory-sanitizer' in config_args 402 ) 403 address_sanitizer = ( 404 '-fsanitize=address' in cflags or 405 '--with-address-sanitizer' in config_args 406 ) 407 ub_sanitizer = ( 408 '-fsanitize=undefined' in cflags or 409 '--with-undefined-behavior-sanitizer' in config_args 410 ) 411 thread_sanitizer = ( 412 '-fsanitize=thread' in cflags or 413 '--with-thread-sanitizer' in config_args 414 ) 415 return ( 416 (memory and memory_sanitizer) or 417 (address and address_sanitizer) or 418 (ub and ub_sanitizer) or 419 (thread and thread_sanitizer) 420 ) 421 422 423def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False, thread=False): 424 """Decorator raising SkipTest if running with a sanitizer active.""" 425 if not reason: 426 reason = 'not working with sanitizers active' 427 skip = check_sanitizer(address=address, memory=memory, ub=ub, thread=thread) 428 return unittest.skipIf(skip, reason) 429 430# gh-89363: True if fork() can hang if Python is built with Address Sanitizer 431# (ASAN): libasan race condition, dead lock in pthread_create(). 432HAVE_ASAN_FORK_BUG = check_sanitizer(address=True) 433 434 435def set_sanitizer_env_var(env, option): 436 for name in ('ASAN_OPTIONS', 'MSAN_OPTIONS', 'UBSAN_OPTIONS', 'TSAN_OPTIONS'): 437 if name in env: 438 env[name] += f':{option}' 439 else: 440 env[name] = option 441 442 443def system_must_validate_cert(f): 444 """Skip the test on TLS certificate validation failures.""" 445 @functools.wraps(f) 446 def dec(*args, **kwargs): 447 try: 448 f(*args, **kwargs) 449 except OSError as e: 450 if "CERTIFICATE_VERIFY_FAILED" in str(e): 451 raise unittest.SkipTest("system does not contain " 452 "necessary certificates") 453 raise 454 return dec 455 456# A constant likely larger than the underlying OS pipe buffer size, to 457# make writes blocking. 458# Windows limit seems to be around 512 B, and many Unix kernels have a 459# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 460# (see issue #17835 for a discussion of this number). 461PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 462 463# A constant likely larger than the underlying OS socket buffer size, to make 464# writes blocking. 465# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 466# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 467# for a discussion of this number. 468SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 469 470# decorator for skipping tests on non-IEEE 754 platforms 471requires_IEEE_754 = unittest.skipUnless( 472 float.__getformat__("double").startswith("IEEE"), 473 "test requires IEEE 754 doubles") 474 475def requires_zlib(reason='requires zlib'): 476 try: 477 import zlib 478 except ImportError: 479 zlib = None 480 return unittest.skipUnless(zlib, reason) 481 482def requires_gzip(reason='requires gzip'): 483 try: 484 import gzip 485 except ImportError: 486 gzip = None 487 return unittest.skipUnless(gzip, reason) 488 489def requires_bz2(reason='requires bz2'): 490 try: 491 import bz2 492 except ImportError: 493 bz2 = None 494 return unittest.skipUnless(bz2, reason) 495 496def requires_lzma(reason='requires lzma'): 497 try: 498 import lzma 499 except ImportError: 500 lzma = None 501 return unittest.skipUnless(lzma, reason) 502 503def has_no_debug_ranges(): 504 try: 505 import _testinternalcapi 506 except ImportError: 507 raise unittest.SkipTest("_testinternalcapi required") 508 config = _testinternalcapi.get_config() 509 return not bool(config['code_debug_ranges']) 510 511def requires_debug_ranges(reason='requires co_positions / debug_ranges'): 512 return unittest.skipIf(has_no_debug_ranges(), reason) 513 514@contextlib.contextmanager 515def suppress_immortalization(suppress=True): 516 """Suppress immortalization of deferred objects.""" 517 try: 518 import _testinternalcapi 519 except ImportError: 520 yield 521 return 522 523 if not suppress: 524 yield 525 return 526 527 _testinternalcapi.suppress_immortalization(True) 528 try: 529 yield 530 finally: 531 _testinternalcapi.suppress_immortalization(False) 532 533def skip_if_suppress_immortalization(): 534 try: 535 import _testinternalcapi 536 except ImportError: 537 return 538 return unittest.skipUnless(_testinternalcapi.get_immortalize_deferred(), 539 "requires immortalization of deferred objects") 540 541 542MS_WINDOWS = (sys.platform == 'win32') 543 544# Is not actually used in tests, but is kept for compatibility. 545is_jython = sys.platform.startswith('java') 546 547is_android = sys.platform == "android" 548 549if sys.platform not in {"win32", "vxworks", "ios", "tvos", "watchos"}: 550 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 551else: 552 unix_shell = None 553 554# wasm32-emscripten and -wasi are POSIX-like but do not 555# have subprocess or fork support. 556is_emscripten = sys.platform == "emscripten" 557is_wasi = sys.platform == "wasi" 558 559is_apple_mobile = sys.platform in {"ios", "tvos", "watchos"} 560is_apple = is_apple_mobile or sys.platform == "darwin" 561 562has_fork_support = hasattr(os, "fork") and not ( 563 # WASM and Apple mobile platforms do not support subprocesses. 564 is_emscripten 565 or is_wasi 566 or is_apple_mobile 567 568 # Although Android supports fork, it's unsafe to call it from Python because 569 # all Android apps are multi-threaded. 570 or is_android 571) 572 573def requires_fork(): 574 return unittest.skipUnless(has_fork_support, "requires working os.fork()") 575 576has_subprocess_support = not ( 577 # WASM and Apple mobile platforms do not support subprocesses. 578 is_emscripten 579 or is_wasi 580 or is_apple_mobile 581 582 # Although Android supports subproceses, they're almost never useful in 583 # practice (see PEP 738). And most of the tests that use them are calling 584 # sys.executable, which won't work when Python is embedded in an Android app. 585 or is_android 586) 587 588def requires_subprocess(): 589 """Used for subprocess, os.spawn calls, fd inheritance""" 590 return unittest.skipUnless(has_subprocess_support, "requires subprocess support") 591 592# Emscripten's socket emulation and WASI sockets have limitations. 593has_socket_support = not ( 594 is_emscripten 595 or is_wasi 596) 597 598def requires_working_socket(*, module=False): 599 """Skip tests or modules that require working sockets 600 601 Can be used as a function/class decorator or to skip an entire module. 602 """ 603 msg = "requires socket support" 604 if module: 605 if not has_socket_support: 606 raise unittest.SkipTest(msg) 607 else: 608 return unittest.skipUnless(has_socket_support, msg) 609 610# Does strftime() support glibc extension like '%4Y'? 611has_strftime_extensions = False 612if sys.platform != "win32": 613 # bpo-47037: Windows debug builds crash with "Debug Assertion Failed" 614 try: 615 has_strftime_extensions = time.strftime("%4Y") != "%4Y" 616 except ValueError: 617 pass 618 619# Define the URL of a dedicated HTTP server for the network tests. 620# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 621TEST_HTTP_URL = "http://www.pythontest.net" 622 623# Set by libregrtest/main.py so we can skip tests that are not 624# useful for PGO 625PGO = False 626 627# Set by libregrtest/main.py if we are running the extended (time consuming) 628# PGO task. If this is True, PGO is also True. 629PGO_EXTENDED = False 630 631# TEST_DATA_DIR is used as a target download location for remote resources 632TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 633 634 635def darwin_malloc_err_warning(test_name): 636 """Assure user that loud errors generated by macOS libc's malloc are 637 expected.""" 638 if sys.platform != 'darwin': 639 return 640 641 import shutil 642 msg = ' NOTICE ' 643 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' 644 'warnings on macOS systems. This behavior is known. Do not\n' 645 'report a bug unless tests are also failing.\n' 646 'See https://github.com/python/cpython/issues/85100') 647 648 padding, _ = shutil.get_terminal_size() 649 print(msg.center(padding, '-')) 650 print(detail) 651 print('-' * padding) 652 653 654def findfile(filename, subdir=None): 655 """Try to find a file on sys.path or in the test directory. If it is not 656 found the argument passed to the function is returned (this does not 657 necessarily signal failure; could still be the legitimate path). 658 659 Setting *subdir* indicates a relative path to use to find the file 660 rather than looking directly in the path directories. 661 """ 662 if os.path.isabs(filename): 663 return filename 664 if subdir is not None: 665 filename = os.path.join(subdir, filename) 666 path = [TEST_HOME_DIR] + sys.path 667 for dn in path: 668 fn = os.path.join(dn, filename) 669 if os.path.exists(fn): return fn 670 return filename 671 672 673def sortdict(dict): 674 "Like repr(dict), but in sorted order." 675 items = sorted(dict.items()) 676 reprpairs = ["%r: %r" % pair for pair in items] 677 withcommas = ", ".join(reprpairs) 678 return "{%s}" % withcommas 679 680 681def run_code(code: str) -> dict[str, object]: 682 """Run a piece of code after dedenting it, and return its global namespace.""" 683 ns = {} 684 exec(textwrap.dedent(code), ns) 685 return ns 686 687 688def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 689 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 690 compile(statement, '<test string>', 'exec') 691 err = cm.exception 692 testcase.assertIsNotNone(err.lineno) 693 if lineno is not None: 694 testcase.assertEqual(err.lineno, lineno) 695 testcase.assertIsNotNone(err.offset) 696 if offset is not None: 697 testcase.assertEqual(err.offset, offset) 698 699 700def open_urlresource(url, *args, **kw): 701 import urllib.request, urllib.parse 702 from .os_helper import unlink 703 try: 704 import gzip 705 except ImportError: 706 gzip = None 707 708 check = kw.pop('check', None) 709 710 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 711 712 fn = os.path.join(TEST_DATA_DIR, filename) 713 714 def check_valid_file(fn): 715 f = open(fn, *args, **kw) 716 if check is None: 717 return f 718 elif check(f): 719 f.seek(0) 720 return f 721 f.close() 722 723 if os.path.exists(fn): 724 f = check_valid_file(fn) 725 if f is not None: 726 return f 727 unlink(fn) 728 729 # Verify the requirement before downloading the file 730 requires('urlfetch') 731 732 if verbose: 733 print('\tfetching %s ...' % url, file=get_original_stdout()) 734 opener = urllib.request.build_opener() 735 if gzip: 736 opener.addheaders.append(('Accept-Encoding', 'gzip')) 737 f = opener.open(url, timeout=INTERNET_TIMEOUT) 738 if gzip and f.headers.get('Content-Encoding') == 'gzip': 739 f = gzip.GzipFile(fileobj=f) 740 try: 741 with open(fn, "wb") as out: 742 s = f.read() 743 while s: 744 out.write(s) 745 s = f.read() 746 finally: 747 f.close() 748 749 f = check_valid_file(fn) 750 if f is not None: 751 return f 752 raise TestFailed('invalid resource %r' % fn) 753 754 755@contextlib.contextmanager 756def captured_output(stream_name): 757 """Return a context manager used by captured_stdout/stdin/stderr 758 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 759 import io 760 orig_stdout = getattr(sys, stream_name) 761 setattr(sys, stream_name, io.StringIO()) 762 try: 763 yield getattr(sys, stream_name) 764 finally: 765 setattr(sys, stream_name, orig_stdout) 766 767def captured_stdout(): 768 """Capture the output of sys.stdout: 769 770 with captured_stdout() as stdout: 771 print("hello") 772 self.assertEqual(stdout.getvalue(), "hello\\n") 773 """ 774 return captured_output("stdout") 775 776def captured_stderr(): 777 """Capture the output of sys.stderr: 778 779 with captured_stderr() as stderr: 780 print("hello", file=sys.stderr) 781 self.assertEqual(stderr.getvalue(), "hello\\n") 782 """ 783 return captured_output("stderr") 784 785def captured_stdin(): 786 """Capture the input to sys.stdin: 787 788 with captured_stdin() as stdin: 789 stdin.write('hello\\n') 790 stdin.seek(0) 791 # call test code that consumes from sys.stdin 792 captured = input() 793 self.assertEqual(captured, "hello") 794 """ 795 return captured_output("stdin") 796 797 798def gc_collect(): 799 """Force as many objects as possible to be collected. 800 801 In non-CPython implementations of Python, this is needed because timely 802 deallocation is not guaranteed by the garbage collector. (Even in CPython 803 this can be the case in case of reference cycles.) This means that __del__ 804 methods may be called later than expected and weakrefs may remain alive for 805 longer than expected. This function tries its best to force all garbage 806 objects to disappear. 807 """ 808 import gc 809 gc.collect() 810 gc.collect() 811 gc.collect() 812 813@contextlib.contextmanager 814def disable_gc(): 815 import gc 816 have_gc = gc.isenabled() 817 gc.disable() 818 try: 819 yield 820 finally: 821 if have_gc: 822 gc.enable() 823 824@contextlib.contextmanager 825def gc_threshold(*args): 826 import gc 827 old_threshold = gc.get_threshold() 828 gc.set_threshold(*args) 829 try: 830 yield 831 finally: 832 gc.set_threshold(*old_threshold) 833 834 835def python_is_optimized(): 836 """Find if Python was built with optimizations.""" 837 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 838 final_opt = "" 839 for opt in cflags.split(): 840 if opt.startswith('-O'): 841 final_opt = opt 842 return final_opt not in ('', '-O0', '-Og') 843 844 845def check_cflags_pgo(): 846 # Check if Python was built with ./configure --enable-optimizations: 847 # with Profile Guided Optimization (PGO). 848 cflags_nodist = sysconfig.get_config_var('PY_CFLAGS_NODIST') or '' 849 pgo_options = [ 850 # GCC 851 '-fprofile-use', 852 # clang: -fprofile-instr-use=code.profclangd 853 '-fprofile-instr-use', 854 # ICC 855 "-prof-use", 856 ] 857 PGO_PROF_USE_FLAG = sysconfig.get_config_var('PGO_PROF_USE_FLAG') 858 if PGO_PROF_USE_FLAG: 859 pgo_options.append(PGO_PROF_USE_FLAG) 860 return any(option in cflags_nodist for option in pgo_options) 861 862 863def check_bolt_optimized(): 864 # Always return false, if the platform is WASI, 865 # because BOLT optimization does not support WASM binary. 866 if is_wasi: 867 return False 868 config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 869 return '--enable-bolt' in config_args 870 871 872Py_GIL_DISABLED = bool(sysconfig.get_config_var('Py_GIL_DISABLED')) 873 874def requires_gil_enabled(msg="needs the GIL enabled"): 875 """Decorator for skipping tests on the free-threaded build.""" 876 return unittest.skipIf(Py_GIL_DISABLED, msg) 877 878def expected_failure_if_gil_disabled(): 879 """Expect test failure if the GIL is disabled.""" 880 if Py_GIL_DISABLED: 881 return unittest.expectedFailure 882 return lambda test_case: test_case 883 884if Py_GIL_DISABLED: 885 _header = 'PHBBInP' 886else: 887 _header = 'nP' 888_align = '0n' 889_vheader = _header + 'n' 890 891def calcobjsize(fmt): 892 import struct 893 return struct.calcsize(_header + fmt + _align) 894 895def calcvobjsize(fmt): 896 import struct 897 return struct.calcsize(_vheader + fmt + _align) 898 899 900_TPFLAGS_HAVE_GC = 1<<14 901_TPFLAGS_HEAPTYPE = 1<<9 902 903def check_sizeof(test, o, size): 904 try: 905 import _testinternalcapi 906 except ImportError: 907 raise unittest.SkipTest("_testinternalcapi required") 908 result = sys.getsizeof(o) 909 # add GC header size 910 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 911 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 912 size += _testinternalcapi.SIZEOF_PYGC_HEAD 913 msg = 'wrong size for %s: got %d, expected %d' \ 914 % (type(o), result, size) 915 test.assertEqual(result, size, msg) 916 917#======================================================================= 918# Decorator/context manager for running a code in a different locale, 919# correctly resetting it afterwards. 920 921@contextlib.contextmanager 922def run_with_locale(catstr, *locales): 923 try: 924 import locale 925 category = getattr(locale, catstr) 926 orig_locale = locale.setlocale(category) 927 except AttributeError: 928 # if the test author gives us an invalid category string 929 raise 930 except Exception: 931 # cannot retrieve original locale, so do nothing 932 locale = orig_locale = None 933 if '' not in locales: 934 raise unittest.SkipTest('no locales') 935 else: 936 for loc in locales: 937 try: 938 locale.setlocale(category, loc) 939 break 940 except locale.Error: 941 pass 942 else: 943 if '' not in locales: 944 raise unittest.SkipTest(f'no locales {locales}') 945 946 try: 947 yield 948 finally: 949 if locale and orig_locale: 950 locale.setlocale(category, orig_locale) 951 952#======================================================================= 953# Decorator for running a function in multiple locales (if they are 954# availasble) and resetting the original locale afterwards. 955 956def run_with_locales(catstr, *locales): 957 def deco(func): 958 @functools.wraps(func) 959 def wrapper(self, /, *args, **kwargs): 960 dry_run = '' in locales 961 try: 962 import locale 963 category = getattr(locale, catstr) 964 orig_locale = locale.setlocale(category) 965 except AttributeError: 966 # if the test author gives us an invalid category string 967 raise 968 except Exception: 969 # cannot retrieve original locale, so do nothing 970 pass 971 else: 972 try: 973 for loc in locales: 974 with self.subTest(locale=loc): 975 try: 976 locale.setlocale(category, loc) 977 except locale.Error: 978 self.skipTest(f'no locale {loc!r}') 979 else: 980 dry_run = False 981 func(self, *args, **kwargs) 982 finally: 983 locale.setlocale(category, orig_locale) 984 if dry_run: 985 # no locales available, so just run the test 986 # with the current locale 987 with self.subTest(locale=None): 988 func(self, *args, **kwargs) 989 return wrapper 990 return deco 991 992#======================================================================= 993# Decorator for running a function in a specific timezone, correctly 994# resetting it afterwards. 995 996def run_with_tz(tz): 997 def decorator(func): 998 def inner(*args, **kwds): 999 try: 1000 tzset = time.tzset 1001 except AttributeError: 1002 raise unittest.SkipTest("tzset required") 1003 if 'TZ' in os.environ: 1004 orig_tz = os.environ['TZ'] 1005 else: 1006 orig_tz = None 1007 os.environ['TZ'] = tz 1008 tzset() 1009 1010 # now run the function, resetting the tz on exceptions 1011 try: 1012 return func(*args, **kwds) 1013 finally: 1014 if orig_tz is None: 1015 del os.environ['TZ'] 1016 else: 1017 os.environ['TZ'] = orig_tz 1018 time.tzset() 1019 1020 inner.__name__ = func.__name__ 1021 inner.__doc__ = func.__doc__ 1022 return inner 1023 return decorator 1024 1025#======================================================================= 1026# Big-memory-test support. Separate from 'resources' because memory use 1027# should be configurable. 1028 1029# Some handy shorthands. Note that these are used for byte-limits as well 1030# as size-limits, in the various bigmem tests 1031_1M = 1024*1024 1032_1G = 1024 * _1M 1033_2G = 2 * _1G 1034_4G = 4 * _1G 1035 1036MAX_Py_ssize_t = sys.maxsize 1037 1038def _parse_memlimit(limit: str) -> int: 1039 sizes = { 1040 'k': 1024, 1041 'm': _1M, 1042 'g': _1G, 1043 't': 1024*_1G, 1044 } 1045 m = re.match(r'(\d+(?:\.\d+)?) (K|M|G|T)b?$', limit, 1046 re.IGNORECASE | re.VERBOSE) 1047 if m is None: 1048 raise ValueError(f'Invalid memory limit: {limit!r}') 1049 return int(float(m.group(1)) * sizes[m.group(2).lower()]) 1050 1051def set_memlimit(limit: str) -> None: 1052 global max_memuse 1053 global real_max_memuse 1054 memlimit = _parse_memlimit(limit) 1055 if memlimit < _2G - 1: 1056 raise ValueError('Memory limit {limit!r} too low to be useful') 1057 1058 real_max_memuse = memlimit 1059 memlimit = min(memlimit, MAX_Py_ssize_t) 1060 max_memuse = memlimit 1061 1062 1063class _MemoryWatchdog: 1064 """An object which periodically watches the process' memory consumption 1065 and prints it out. 1066 """ 1067 1068 def __init__(self): 1069 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1070 self.started = False 1071 1072 def start(self): 1073 import warnings 1074 try: 1075 f = open(self.procfile, 'r') 1076 except OSError as e: 1077 warnings.warn('/proc not available for stats: {}'.format(e), 1078 RuntimeWarning) 1079 sys.stderr.flush() 1080 return 1081 1082 import subprocess 1083 with f: 1084 watchdog_script = findfile("memory_watchdog.py") 1085 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1086 stdin=f, 1087 stderr=subprocess.DEVNULL) 1088 self.started = True 1089 1090 def stop(self): 1091 if self.started: 1092 self.mem_watchdog.terminate() 1093 self.mem_watchdog.wait() 1094 1095 1096def bigmemtest(size, memuse, dry_run=True): 1097 """Decorator for bigmem tests. 1098 1099 'size' is a requested size for the test (in arbitrary, test-interpreted 1100 units.) 'memuse' is the number of bytes per unit for the test, or a good 1101 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1102 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1103 1104 The 'size' argument is normally passed to the decorated test method as an 1105 extra argument. If 'dry_run' is true, the value passed to the test method 1106 may be less than the requested value. If 'dry_run' is false, it means the 1107 test doesn't support dummy runs when -M is not specified. 1108 """ 1109 def decorator(f): 1110 def wrapper(self): 1111 size = wrapper.size 1112 memuse = wrapper.memuse 1113 if not real_max_memuse: 1114 maxsize = 5147 1115 else: 1116 maxsize = size 1117 1118 if ((real_max_memuse or not dry_run) 1119 and real_max_memuse < maxsize * memuse): 1120 raise unittest.SkipTest( 1121 "not enough memory: %.1fG minimum needed" 1122 % (size * memuse / (1024 ** 3))) 1123 1124 if real_max_memuse and verbose: 1125 print() 1126 print(" ... expected peak memory use: {peak:.1f}G" 1127 .format(peak=size * memuse / (1024 ** 3))) 1128 watchdog = _MemoryWatchdog() 1129 watchdog.start() 1130 else: 1131 watchdog = None 1132 1133 try: 1134 return f(self, maxsize) 1135 finally: 1136 if watchdog: 1137 watchdog.stop() 1138 1139 wrapper.size = size 1140 wrapper.memuse = memuse 1141 return wrapper 1142 return decorator 1143 1144def bigaddrspacetest(f): 1145 """Decorator for tests that fill the address space.""" 1146 def wrapper(self): 1147 if max_memuse < MAX_Py_ssize_t: 1148 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1149 raise unittest.SkipTest( 1150 "not enough memory: try a 32-bit build instead") 1151 else: 1152 raise unittest.SkipTest( 1153 "not enough memory: %.1fG minimum needed" 1154 % (MAX_Py_ssize_t / (1024 ** 3))) 1155 else: 1156 return f(self) 1157 return wrapper 1158 1159#======================================================================= 1160# unittest integration. 1161 1162def _id(obj): 1163 return obj 1164 1165def requires_resource(resource): 1166 if resource == 'gui' and not _is_gui_available(): 1167 return unittest.skip(_is_gui_available.reason) 1168 if is_resource_enabled(resource): 1169 return _id 1170 else: 1171 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1172 1173def cpython_only(test): 1174 """ 1175 Decorator for tests only applicable on CPython. 1176 """ 1177 return impl_detail(cpython=True)(test) 1178 1179def impl_detail(msg=None, **guards): 1180 if check_impl_detail(**guards): 1181 return _id 1182 if msg is None: 1183 guardnames, default = _parse_guards(guards) 1184 if default: 1185 msg = "implementation detail not available on {0}" 1186 else: 1187 msg = "implementation detail specific to {0}" 1188 guardnames = sorted(guardnames.keys()) 1189 msg = msg.format(' or '.join(guardnames)) 1190 return unittest.skip(msg) 1191 1192def _parse_guards(guards): 1193 # Returns a tuple ({platform_name: run_me}, default_value) 1194 if not guards: 1195 return ({'cpython': True}, False) 1196 is_true = list(guards.values())[0] 1197 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1198 return (guards, not is_true) 1199 1200# Use the following check to guard CPython's implementation-specific tests -- 1201# or to run them only on the implementation(s) guarded by the arguments. 1202def check_impl_detail(**guards): 1203 """This function returns True or False depending on the host platform. 1204 Examples: 1205 if check_impl_detail(): # only on CPython (default) 1206 if check_impl_detail(jython=True): # only on Jython 1207 if check_impl_detail(cpython=False): # everywhere except on CPython 1208 """ 1209 guards, default = _parse_guards(guards) 1210 return guards.get(sys.implementation.name, default) 1211 1212 1213def no_tracing(func): 1214 """Decorator to temporarily turn off tracing for the duration of a test.""" 1215 trace_wrapper = func 1216 if hasattr(sys, 'gettrace'): 1217 @functools.wraps(func) 1218 def trace_wrapper(*args, **kwargs): 1219 original_trace = sys.gettrace() 1220 try: 1221 sys.settrace(None) 1222 return func(*args, **kwargs) 1223 finally: 1224 sys.settrace(original_trace) 1225 1226 coverage_wrapper = trace_wrapper 1227 if 'test.cov' in sys.modules: # -Xpresite=test.cov used 1228 cov = sys.monitoring.COVERAGE_ID 1229 @functools.wraps(func) 1230 def coverage_wrapper(*args, **kwargs): 1231 original_events = sys.monitoring.get_events(cov) 1232 try: 1233 sys.monitoring.set_events(cov, 0) 1234 return trace_wrapper(*args, **kwargs) 1235 finally: 1236 sys.monitoring.set_events(cov, original_events) 1237 1238 return coverage_wrapper 1239 1240 1241def refcount_test(test): 1242 """Decorator for tests which involve reference counting. 1243 1244 To start, the decorator does not run the test if is not run by CPython. 1245 After that, any trace function is unset during the test to prevent 1246 unexpected refcounts caused by the trace function. 1247 1248 """ 1249 return no_tracing(cpython_only(test)) 1250 1251 1252def requires_limited_api(test): 1253 try: 1254 import _testcapi 1255 import _testlimitedcapi 1256 except ImportError: 1257 return unittest.skip('needs _testcapi and _testlimitedcapi modules')(test) 1258 return test 1259 1260 1261# Windows build doesn't support --disable-test-modules feature, so there's no 1262# 'TEST_MODULES' var in config 1263TEST_MODULES_ENABLED = (sysconfig.get_config_var('TEST_MODULES') or 'yes') == 'yes' 1264 1265def requires_specialization(test): 1266 return unittest.skipUnless( 1267 _opcode.ENABLE_SPECIALIZATION, "requires specialization")(test) 1268 1269 1270#======================================================================= 1271# Check for the presence of docstrings. 1272 1273# Rather than trying to enumerate all the cases where docstrings may be 1274# disabled, we just check for that directly 1275 1276def _check_docstrings(): 1277 """Just used to check if docstrings are enabled""" 1278 1279MISSING_C_DOCSTRINGS = (check_impl_detail() and 1280 sys.platform != 'win32' and 1281 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1282 1283HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1284 not MISSING_C_DOCSTRINGS) 1285 1286requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1287 "test requires docstrings") 1288 1289 1290#======================================================================= 1291# Support for saving and restoring the imported modules. 1292 1293def flush_std_streams(): 1294 if sys.stdout is not None: 1295 sys.stdout.flush() 1296 if sys.stderr is not None: 1297 sys.stderr.flush() 1298 1299 1300def print_warning(msg): 1301 # bpo-45410: Explicitly flush stdout to keep logs in order 1302 flush_std_streams() 1303 stream = print_warning.orig_stderr 1304 for line in msg.splitlines(): 1305 print(f"Warning -- {line}", file=stream) 1306 stream.flush() 1307 1308# bpo-39983: Store the original sys.stderr at Python startup to be able to 1309# log warnings even if sys.stderr is captured temporarily by a test. 1310print_warning.orig_stderr = sys.stderr 1311 1312 1313# Flag used by saved_test_environment of test.libregrtest.save_env, 1314# to check if a test modified the environment. The flag should be set to False 1315# before running a new test. 1316# 1317# For example, threading_helper.threading_cleanup() sets the flag is the function fails 1318# to cleanup threads. 1319environment_altered = False 1320 1321def reap_children(): 1322 """Use this function at the end of test_main() whenever sub-processes 1323 are started. This will help ensure that no extra children (zombies) 1324 stick around to hog resources and create problems when looking 1325 for refleaks. 1326 """ 1327 global environment_altered 1328 1329 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 1330 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 1331 return 1332 elif not has_subprocess_support: 1333 return 1334 1335 # Reap all our dead child processes so we don't leave zombies around. 1336 # These hog resources and might be causing some of the buildbots to die. 1337 while True: 1338 try: 1339 # Read the exit status of any child process which already completed 1340 pid, status = os.waitpid(-1, os.WNOHANG) 1341 except OSError: 1342 break 1343 1344 if pid == 0: 1345 break 1346 1347 print_warning(f"reap_children() reaped child process {pid}") 1348 environment_altered = True 1349 1350 1351@contextlib.contextmanager 1352def swap_attr(obj, attr, new_val): 1353 """Temporary swap out an attribute with a new object. 1354 1355 Usage: 1356 with swap_attr(obj, "attr", 5): 1357 ... 1358 1359 This will set obj.attr to 5 for the duration of the with: block, 1360 restoring the old value at the end of the block. If `attr` doesn't 1361 exist on `obj`, it will be created and then deleted at the end of the 1362 block. 1363 1364 The old value (or None if it doesn't exist) will be assigned to the 1365 target of the "as" clause, if there is one. 1366 """ 1367 if hasattr(obj, attr): 1368 real_val = getattr(obj, attr) 1369 setattr(obj, attr, new_val) 1370 try: 1371 yield real_val 1372 finally: 1373 setattr(obj, attr, real_val) 1374 else: 1375 setattr(obj, attr, new_val) 1376 try: 1377 yield 1378 finally: 1379 if hasattr(obj, attr): 1380 delattr(obj, attr) 1381 1382@contextlib.contextmanager 1383def swap_item(obj, item, new_val): 1384 """Temporary swap out an item with a new object. 1385 1386 Usage: 1387 with swap_item(obj, "item", 5): 1388 ... 1389 1390 This will set obj["item"] to 5 for the duration of the with: block, 1391 restoring the old value at the end of the block. If `item` doesn't 1392 exist on `obj`, it will be created and then deleted at the end of the 1393 block. 1394 1395 The old value (or None if it doesn't exist) will be assigned to the 1396 target of the "as" clause, if there is one. 1397 """ 1398 if item in obj: 1399 real_val = obj[item] 1400 obj[item] = new_val 1401 try: 1402 yield real_val 1403 finally: 1404 obj[item] = real_val 1405 else: 1406 obj[item] = new_val 1407 try: 1408 yield 1409 finally: 1410 if item in obj: 1411 del obj[item] 1412 1413def args_from_interpreter_flags(): 1414 """Return a list of command-line arguments reproducing the current 1415 settings in sys.flags and sys.warnoptions.""" 1416 import subprocess 1417 return subprocess._args_from_interpreter_flags() 1418 1419def optim_args_from_interpreter_flags(): 1420 """Return a list of command-line arguments reproducing the current 1421 optimization settings in sys.flags.""" 1422 import subprocess 1423 return subprocess._optim_args_from_interpreter_flags() 1424 1425 1426class Matcher(object): 1427 1428 _partial_matches = ('msg', 'message') 1429 1430 def matches(self, d, **kwargs): 1431 """ 1432 Try to match a single dict with the supplied arguments. 1433 1434 Keys whose values are strings and which are in self._partial_matches 1435 will be checked for partial (i.e. substring) matches. You can extend 1436 this scheme to (for example) do regular expression matching, etc. 1437 """ 1438 result = True 1439 for k in kwargs: 1440 v = kwargs[k] 1441 dv = d.get(k) 1442 if not self.match_value(k, dv, v): 1443 result = False 1444 break 1445 return result 1446 1447 def match_value(self, k, dv, v): 1448 """ 1449 Try to match a single stored value (dv) with a supplied value (v). 1450 """ 1451 if type(v) != type(dv): 1452 result = False 1453 elif type(dv) is not str or k not in self._partial_matches: 1454 result = (v == dv) 1455 else: 1456 result = dv.find(v) >= 0 1457 return result 1458 1459 1460_buggy_ucrt = None 1461def skip_if_buggy_ucrt_strfptime(test): 1462 """ 1463 Skip decorator for tests that use buggy strptime/strftime 1464 1465 If the UCRT bugs are present time.localtime().tm_zone will be 1466 an empty string, otherwise we assume the UCRT bugs are fixed 1467 1468 See bpo-37552 [Windows] strptime/strftime return invalid 1469 results with UCRT version 17763.615 1470 """ 1471 import locale 1472 global _buggy_ucrt 1473 if _buggy_ucrt is None: 1474 if(sys.platform == 'win32' and 1475 locale.getencoding() == 'cp65001' and 1476 time.localtime().tm_zone == ''): 1477 _buggy_ucrt = True 1478 else: 1479 _buggy_ucrt = False 1480 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 1481 1482class PythonSymlink: 1483 """Creates a symlink for the current Python executable""" 1484 def __init__(self, link=None): 1485 from .os_helper import TESTFN 1486 1487 self.link = link or os.path.abspath(TESTFN) 1488 self._linked = [] 1489 self.real = os.path.realpath(sys.executable) 1490 self._also_link = [] 1491 1492 self._env = None 1493 1494 self._platform_specific() 1495 1496 if sys.platform == "win32": 1497 def _platform_specific(self): 1498 import glob 1499 import _winapi 1500 1501 if os.path.lexists(self.real) and not os.path.exists(self.real): 1502 # App symlink appears to not exist, but we want the 1503 # real executable here anyway 1504 self.real = _winapi.GetModuleFileName(0) 1505 1506 dll = _winapi.GetModuleFileName(sys.dllhandle) 1507 src_dir = os.path.dirname(dll) 1508 dest_dir = os.path.dirname(self.link) 1509 self._also_link.append(( 1510 dll, 1511 os.path.join(dest_dir, os.path.basename(dll)) 1512 )) 1513 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 1514 self._also_link.append(( 1515 runtime, 1516 os.path.join(dest_dir, os.path.basename(runtime)) 1517 )) 1518 1519 self._env = {k.upper(): os.getenv(k) for k in os.environ} 1520 self._env["PYTHONHOME"] = os.path.dirname(self.real) 1521 if sysconfig.is_python_build(): 1522 self._env["PYTHONPATH"] = STDLIB_DIR 1523 else: 1524 def _platform_specific(self): 1525 pass 1526 1527 def __enter__(self): 1528 os.symlink(self.real, self.link) 1529 self._linked.append(self.link) 1530 for real, link in self._also_link: 1531 os.symlink(real, link) 1532 self._linked.append(link) 1533 return self 1534 1535 def __exit__(self, exc_type, exc_value, exc_tb): 1536 for link in self._linked: 1537 try: 1538 os.remove(link) 1539 except IOError as ex: 1540 if verbose: 1541 print("failed to clean up {}: {}".format(link, ex)) 1542 1543 def _call(self, python, args, env, returncode): 1544 import subprocess 1545 cmd = [python, *args] 1546 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 1547 stderr=subprocess.PIPE, env=env) 1548 r = p.communicate() 1549 if p.returncode != returncode: 1550 if verbose: 1551 print(repr(r[0])) 1552 print(repr(r[1]), file=sys.stderr) 1553 raise RuntimeError( 1554 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 1555 return r 1556 1557 def call_real(self, *args, returncode=0): 1558 return self._call(self.real, args, None, returncode) 1559 1560 def call_link(self, *args, returncode=0): 1561 return self._call(self.link, args, self._env, returncode) 1562 1563 1564def skip_if_pgo_task(test): 1565 """Skip decorator for tests not run in (non-extended) PGO task""" 1566 ok = not PGO or PGO_EXTENDED 1567 msg = "Not run for (non-extended) PGO task" 1568 return test if ok else unittest.skip(msg)(test) 1569 1570 1571def detect_api_mismatch(ref_api, other_api, *, ignore=()): 1572 """Returns the set of items in ref_api not in other_api, except for a 1573 defined list of items to be ignored in this check. 1574 1575 By default this skips private attributes beginning with '_' but 1576 includes all magic methods, i.e. those starting and ending in '__'. 1577 """ 1578 missing_items = set(dir(ref_api)) - set(dir(other_api)) 1579 if ignore: 1580 missing_items -= set(ignore) 1581 missing_items = set(m for m in missing_items 1582 if not m.startswith('_') or m.endswith('__')) 1583 return missing_items 1584 1585 1586def check__all__(test_case, module, name_of_module=None, extra=(), 1587 not_exported=()): 1588 """Assert that the __all__ variable of 'module' contains all public names. 1589 1590 The module's public names (its API) are detected automatically based on 1591 whether they match the public name convention and were defined in 1592 'module'. 1593 1594 The 'name_of_module' argument can specify (as a string or tuple thereof) 1595 what module(s) an API could be defined in in order to be detected as a 1596 public API. One case for this is when 'module' imports part of its public 1597 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 1598 1599 The 'extra' argument can be a set of names that wouldn't otherwise be 1600 automatically detected as "public", like objects without a proper 1601 '__module__' attribute. If provided, it will be added to the 1602 automatically detected ones. 1603 1604 The 'not_exported' argument can be a set of names that must not be treated 1605 as part of the public API even though their names indicate otherwise. 1606 1607 Usage: 1608 import bar 1609 import foo 1610 import unittest 1611 from test import support 1612 1613 class MiscTestCase(unittest.TestCase): 1614 def test__all__(self): 1615 support.check__all__(self, foo) 1616 1617 class OtherTestCase(unittest.TestCase): 1618 def test__all__(self): 1619 extra = {'BAR_CONST', 'FOO_CONST'} 1620 not_exported = {'baz'} # Undocumented name. 1621 # bar imports part of its API from _bar. 1622 support.check__all__(self, bar, ('bar', '_bar'), 1623 extra=extra, not_exported=not_exported) 1624 1625 """ 1626 1627 if name_of_module is None: 1628 name_of_module = (module.__name__, ) 1629 elif isinstance(name_of_module, str): 1630 name_of_module = (name_of_module, ) 1631 1632 expected = set(extra) 1633 1634 for name in dir(module): 1635 if name.startswith('_') or name in not_exported: 1636 continue 1637 obj = getattr(module, name) 1638 if (getattr(obj, '__module__', None) in name_of_module or 1639 (not hasattr(obj, '__module__') and 1640 not isinstance(obj, types.ModuleType))): 1641 expected.add(name) 1642 test_case.assertCountEqual(module.__all__, expected) 1643 1644 1645def suppress_msvcrt_asserts(verbose=False): 1646 try: 1647 import msvcrt 1648 except ImportError: 1649 return 1650 1651 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 1652 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 1653 | msvcrt.SEM_NOGPFAULTERRORBOX 1654 | msvcrt.SEM_NOOPENFILEERRORBOX) 1655 1656 # CrtSetReportMode() is only available in debug build 1657 if hasattr(msvcrt, 'CrtSetReportMode'): 1658 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 1659 if verbose: 1660 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 1661 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 1662 else: 1663 msvcrt.CrtSetReportMode(m, 0) 1664 1665 1666class SuppressCrashReport: 1667 """Try to prevent a crash report from popping up. 1668 1669 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1670 disable the creation of coredump file. 1671 """ 1672 old_value = None 1673 old_modes = None 1674 1675 def __enter__(self): 1676 """On Windows, disable Windows Error Reporting dialogs using 1677 SetErrorMode() and CrtSetReportMode(). 1678 1679 On UNIX, try to save the previous core file size limit, then set 1680 soft limit to 0. 1681 """ 1682 if sys.platform.startswith('win'): 1683 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1684 try: 1685 import msvcrt 1686 except ImportError: 1687 return 1688 1689 self.old_value = msvcrt.GetErrorMode() 1690 1691 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 1692 1693 # bpo-23314: Suppress assert dialogs in debug builds. 1694 # CrtSetReportMode() is only available in debug build. 1695 if hasattr(msvcrt, 'CrtSetReportMode'): 1696 self.old_modes = {} 1697 for report_type in [msvcrt.CRT_WARN, 1698 msvcrt.CRT_ERROR, 1699 msvcrt.CRT_ASSERT]: 1700 old_mode = msvcrt.CrtSetReportMode(report_type, 1701 msvcrt.CRTDBG_MODE_FILE) 1702 old_file = msvcrt.CrtSetReportFile(report_type, 1703 msvcrt.CRTDBG_FILE_STDERR) 1704 self.old_modes[report_type] = old_mode, old_file 1705 1706 else: 1707 try: 1708 import resource 1709 self.resource = resource 1710 except ImportError: 1711 self.resource = None 1712 if self.resource is not None: 1713 try: 1714 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 1715 self.resource.setrlimit(self.resource.RLIMIT_CORE, 1716 (0, self.old_value[1])) 1717 except (ValueError, OSError): 1718 pass 1719 1720 if sys.platform == 'darwin': 1721 import subprocess 1722 # Check if the 'Crash Reporter' on OSX was configured 1723 # in 'Developer' mode and warn that it will get triggered 1724 # when it is. 1725 # 1726 # This assumes that this context manager is used in tests 1727 # that might trigger the next manager. 1728 cmd = ['/usr/bin/defaults', 'read', 1729 'com.apple.CrashReporter', 'DialogType'] 1730 proc = subprocess.Popen(cmd, 1731 stdout=subprocess.PIPE, 1732 stderr=subprocess.PIPE) 1733 with proc: 1734 stdout = proc.communicate()[0] 1735 if stdout.strip() == b'developer': 1736 print("this test triggers the Crash Reporter, " 1737 "that is intentional", end='', flush=True) 1738 1739 return self 1740 1741 def __exit__(self, *ignore_exc): 1742 """Restore Windows ErrorMode or core file behavior to initial value.""" 1743 if self.old_value is None: 1744 return 1745 1746 if sys.platform.startswith('win'): 1747 import msvcrt 1748 msvcrt.SetErrorMode(self.old_value) 1749 1750 if self.old_modes: 1751 for report_type, (old_mode, old_file) in self.old_modes.items(): 1752 msvcrt.CrtSetReportMode(report_type, old_mode) 1753 msvcrt.CrtSetReportFile(report_type, old_file) 1754 else: 1755 if self.resource is not None: 1756 try: 1757 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 1758 except (ValueError, OSError): 1759 pass 1760 1761 1762def patch(test_instance, object_to_patch, attr_name, new_value): 1763 """Override 'object_to_patch'.'attr_name' with 'new_value'. 1764 1765 Also, add a cleanup procedure to 'test_instance' to restore 1766 'object_to_patch' value for 'attr_name'. 1767 The 'attr_name' should be a valid attribute for 'object_to_patch'. 1768 1769 """ 1770 # check that 'attr_name' is a real attribute for 'object_to_patch' 1771 # will raise AttributeError if it does not exist 1772 getattr(object_to_patch, attr_name) 1773 1774 # keep a copy of the old value 1775 attr_is_local = False 1776 try: 1777 old_value = object_to_patch.__dict__[attr_name] 1778 except (AttributeError, KeyError): 1779 old_value = getattr(object_to_patch, attr_name, None) 1780 else: 1781 attr_is_local = True 1782 1783 # restore the value when the test is done 1784 def cleanup(): 1785 if attr_is_local: 1786 setattr(object_to_patch, attr_name, old_value) 1787 else: 1788 delattr(object_to_patch, attr_name) 1789 1790 test_instance.addCleanup(cleanup) 1791 1792 # actually override the attribute 1793 setattr(object_to_patch, attr_name, new_value) 1794 1795 1796@contextlib.contextmanager 1797def patch_list(orig): 1798 """Like unittest.mock.patch.dict, but for lists.""" 1799 try: 1800 saved = orig[:] 1801 yield 1802 finally: 1803 orig[:] = saved 1804 1805 1806def run_in_subinterp(code): 1807 """ 1808 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1809 module is enabled. 1810 """ 1811 _check_tracemalloc() 1812 try: 1813 import _testcapi 1814 except ImportError: 1815 raise unittest.SkipTest("requires _testcapi") 1816 return _testcapi.run_in_subinterp(code) 1817 1818 1819def run_in_subinterp_with_config(code, *, own_gil=None, **config): 1820 """ 1821 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1822 module is enabled. 1823 """ 1824 _check_tracemalloc() 1825 try: 1826 import _testinternalcapi 1827 except ImportError: 1828 raise unittest.SkipTest("requires _testinternalcapi") 1829 if own_gil is not None: 1830 assert 'gil' not in config, (own_gil, config) 1831 config['gil'] = 'own' if own_gil else 'shared' 1832 else: 1833 gil = config['gil'] 1834 if gil == 0: 1835 config['gil'] = 'default' 1836 elif gil == 1: 1837 config['gil'] = 'shared' 1838 elif gil == 2: 1839 config['gil'] = 'own' 1840 elif not isinstance(gil, str): 1841 raise NotImplementedError(gil) 1842 config = types.SimpleNamespace(**config) 1843 return _testinternalcapi.run_in_subinterp_with_config(code, config) 1844 1845 1846def _check_tracemalloc(): 1847 # Issue #10915, #15751: PyGILState_*() functions don't work with 1848 # sub-interpreters, the tracemalloc module uses these functions internally 1849 try: 1850 import tracemalloc 1851 except ImportError: 1852 pass 1853 else: 1854 if tracemalloc.is_tracing(): 1855 raise unittest.SkipTest("run_in_subinterp() cannot be used " 1856 "if tracemalloc module is tracing " 1857 "memory allocations") 1858 1859 1860def check_free_after_iterating(test, iter, cls, args=()): 1861 done = False 1862 def wrapper(): 1863 class A(cls): 1864 def __del__(self): 1865 nonlocal done 1866 done = True 1867 try: 1868 next(it) 1869 except StopIteration: 1870 pass 1871 1872 it = iter(A(*args)) 1873 # Issue 26494: Shouldn't crash 1874 test.assertRaises(StopIteration, next, it) 1875 1876 wrapper() 1877 # The sequence should be deallocated just after the end of iterating 1878 gc_collect() 1879 test.assertTrue(done) 1880 1881 1882def missing_compiler_executable(cmd_names=[]): 1883 """Check if the compiler components used to build the interpreter exist. 1884 1885 Check for the existence of the compiler executables whose names are listed 1886 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 1887 and return the first missing executable or None when none is found 1888 missing. 1889 1890 """ 1891 from setuptools._distutils import ccompiler, sysconfig, spawn 1892 from setuptools import errors 1893 1894 compiler = ccompiler.new_compiler() 1895 sysconfig.customize_compiler(compiler) 1896 if compiler.compiler_type == "msvc": 1897 # MSVC has no executables, so check whether initialization succeeds 1898 try: 1899 compiler.initialize() 1900 except errors.PlatformError: 1901 return "msvc" 1902 for name in compiler.executables: 1903 if cmd_names and name not in cmd_names: 1904 continue 1905 cmd = getattr(compiler, name) 1906 if cmd_names: 1907 assert cmd is not None, \ 1908 "the '%s' executable is not configured" % name 1909 elif not cmd: 1910 continue 1911 if spawn.find_executable(cmd[0]) is None: 1912 return cmd[0] 1913 1914 1915_old_android_emulator = None 1916def setswitchinterval(interval): 1917 # Setting a very low gil interval on the Android emulator causes python 1918 # to hang (issue #26939). 1919 minimum_interval = 1e-4 # 100 us 1920 if is_android and interval < minimum_interval: 1921 global _old_android_emulator 1922 if _old_android_emulator is None: 1923 import platform 1924 av = platform.android_ver() 1925 _old_android_emulator = av.is_emulator and av.api_level < 24 1926 if _old_android_emulator: 1927 interval = minimum_interval 1928 return sys.setswitchinterval(interval) 1929 1930 1931def get_pagesize(): 1932 """Get size of a page in bytes.""" 1933 try: 1934 page_size = os.sysconf('SC_PAGESIZE') 1935 except (ValueError, AttributeError): 1936 try: 1937 page_size = os.sysconf('SC_PAGE_SIZE') 1938 except (ValueError, AttributeError): 1939 page_size = 4096 1940 return page_size 1941 1942 1943@contextlib.contextmanager 1944def disable_faulthandler(): 1945 import faulthandler 1946 1947 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 1948 # sys.stderr with a StringIO which has no file descriptor when a test 1949 # is run with -W/--verbose3. 1950 fd = sys.__stderr__.fileno() 1951 1952 is_enabled = faulthandler.is_enabled() 1953 try: 1954 faulthandler.disable() 1955 yield 1956 finally: 1957 if is_enabled: 1958 faulthandler.enable(file=fd, all_threads=True) 1959 1960 1961class SaveSignals: 1962 """ 1963 Save and restore signal handlers. 1964 1965 This class is only able to save/restore signal handlers registered 1966 by the Python signal module: see bpo-13285 for "external" signal 1967 handlers. 1968 """ 1969 1970 def __init__(self): 1971 import signal 1972 self.signal = signal 1973 self.signals = signal.valid_signals() 1974 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 1975 for signame in ('SIGKILL', 'SIGSTOP'): 1976 try: 1977 signum = getattr(signal, signame) 1978 except AttributeError: 1979 continue 1980 self.signals.remove(signum) 1981 self.handlers = {} 1982 1983 def save(self): 1984 for signum in self.signals: 1985 handler = self.signal.getsignal(signum) 1986 if handler is None: 1987 # getsignal() returns None if a signal handler was not 1988 # registered by the Python signal module, 1989 # and the handler is not SIG_DFL nor SIG_IGN. 1990 # 1991 # Ignore the signal: we cannot restore the handler. 1992 continue 1993 self.handlers[signum] = handler 1994 1995 def restore(self): 1996 for signum, handler in self.handlers.items(): 1997 self.signal.signal(signum, handler) 1998 1999 2000def with_pymalloc(): 2001 try: 2002 import _testcapi 2003 except ImportError: 2004 raise unittest.SkipTest("requires _testcapi") 2005 return _testcapi.WITH_PYMALLOC and not Py_GIL_DISABLED 2006 2007 2008def with_mimalloc(): 2009 try: 2010 import _testcapi 2011 except ImportError: 2012 raise unittest.SkipTest("requires _testcapi") 2013 return _testcapi.WITH_MIMALLOC 2014 2015 2016class _ALWAYS_EQ: 2017 """ 2018 Object that is equal to anything. 2019 """ 2020 def __eq__(self, other): 2021 return True 2022 def __ne__(self, other): 2023 return False 2024 2025ALWAYS_EQ = _ALWAYS_EQ() 2026 2027class _NEVER_EQ: 2028 """ 2029 Object that is not equal to anything. 2030 """ 2031 def __eq__(self, other): 2032 return False 2033 def __ne__(self, other): 2034 return True 2035 def __hash__(self): 2036 return 1 2037 2038NEVER_EQ = _NEVER_EQ() 2039 2040@functools.total_ordering 2041class _LARGEST: 2042 """ 2043 Object that is greater than anything (except itself). 2044 """ 2045 def __eq__(self, other): 2046 return isinstance(other, _LARGEST) 2047 def __lt__(self, other): 2048 return False 2049 2050LARGEST = _LARGEST() 2051 2052@functools.total_ordering 2053class _SMALLEST: 2054 """ 2055 Object that is less than anything (except itself). 2056 """ 2057 def __eq__(self, other): 2058 return isinstance(other, _SMALLEST) 2059 def __gt__(self, other): 2060 return False 2061 2062SMALLEST = _SMALLEST() 2063 2064def maybe_get_event_loop_policy(): 2065 """Return the global event loop policy if one is set, else return None.""" 2066 import asyncio.events 2067 return asyncio.events._event_loop_policy 2068 2069# Helpers for testing hashing. 2070NHASHBITS = sys.hash_info.width # number of bits in hash() result 2071assert NHASHBITS in (32, 64) 2072 2073# Return mean and sdev of number of collisions when tossing nballs balls 2074# uniformly at random into nbins bins. By definition, the number of 2075# collisions is the number of balls minus the number of occupied bins at 2076# the end. 2077def collision_stats(nbins, nballs): 2078 n, k = nbins, nballs 2079 # prob a bin empty after k trials = (1 - 1/n)**k 2080 # mean # empty is then n * (1 - 1/n)**k 2081 # so mean # occupied is n - n * (1 - 1/n)**k 2082 # so collisions = k - (n - n*(1 - 1/n)**k) 2083 # 2084 # For the variance: 2085 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 2086 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 2087 # 2088 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 2089 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 2090 # error-prone) efforts to rework the naive formulas to avoid those, 2091 # we use the `decimal` module to get plenty of extra precision. 2092 # 2093 # Note: the exact values are straightforward to compute with 2094 # rationals, but in context that's unbearably slow, requiring 2095 # multi-million bit arithmetic. 2096 import decimal 2097 with decimal.localcontext() as ctx: 2098 bits = n.bit_length() * 2 # bits in n**2 2099 # At least that many bits will likely cancel out. 2100 # Use that many decimal digits instead. 2101 ctx.prec = max(bits, 30) 2102 dn = decimal.Decimal(n) 2103 p1empty = ((dn - 1) / dn) ** k 2104 meanempty = n * p1empty 2105 occupied = n - meanempty 2106 collisions = k - occupied 2107 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 2108 return float(collisions), float(var.sqrt()) 2109 2110 2111class catch_unraisable_exception: 2112 """ 2113 Context manager catching unraisable exception using sys.unraisablehook. 2114 2115 Storing the exception value (cm.unraisable.exc_value) creates a reference 2116 cycle. The reference cycle is broken explicitly when the context manager 2117 exits. 2118 2119 Storing the object (cm.unraisable.object) can resurrect it if it is set to 2120 an object which is being finalized. Exiting the context manager clears the 2121 stored object. 2122 2123 Usage: 2124 2125 with support.catch_unraisable_exception() as cm: 2126 # code creating an "unraisable exception" 2127 ... 2128 2129 # check the unraisable exception: use cm.unraisable 2130 ... 2131 2132 # cm.unraisable attribute no longer exists at this point 2133 # (to break a reference cycle) 2134 """ 2135 2136 def __init__(self): 2137 self.unraisable = None 2138 self._old_hook = None 2139 2140 def _hook(self, unraisable): 2141 # Storing unraisable.object can resurrect an object which is being 2142 # finalized. Storing unraisable.exc_value creates a reference cycle. 2143 self.unraisable = unraisable 2144 2145 def __enter__(self): 2146 self._old_hook = sys.unraisablehook 2147 sys.unraisablehook = self._hook 2148 return self 2149 2150 def __exit__(self, *exc_info): 2151 sys.unraisablehook = self._old_hook 2152 del self.unraisable 2153 2154 2155def wait_process(pid, *, exitcode, timeout=None): 2156 """ 2157 Wait until process pid completes and check that the process exit code is 2158 exitcode. 2159 2160 Raise an AssertionError if the process exit code is not equal to exitcode. 2161 2162 If the process runs longer than timeout seconds (LONG_TIMEOUT by default), 2163 kill the process (if signal.SIGKILL is available) and raise an 2164 AssertionError. The timeout feature is not available on Windows. 2165 """ 2166 if os.name != "nt": 2167 import signal 2168 2169 if timeout is None: 2170 timeout = LONG_TIMEOUT 2171 2172 start_time = time.monotonic() 2173 for _ in sleeping_retry(timeout, error=False): 2174 pid2, status = os.waitpid(pid, os.WNOHANG) 2175 if pid2 != 0: 2176 break 2177 # rety: the process is still running 2178 else: 2179 try: 2180 os.kill(pid, signal.SIGKILL) 2181 os.waitpid(pid, 0) 2182 except OSError: 2183 # Ignore errors like ChildProcessError or PermissionError 2184 pass 2185 2186 dt = time.monotonic() - start_time 2187 raise AssertionError(f"process {pid} is still running " 2188 f"after {dt:.1f} seconds") 2189 else: 2190 # Windows implementation: don't support timeout :-( 2191 pid2, status = os.waitpid(pid, 0) 2192 2193 exitcode2 = os.waitstatus_to_exitcode(status) 2194 if exitcode2 != exitcode: 2195 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 2196 f"but exit code {exitcode} is expected") 2197 2198 # sanity check: it should not fail in practice 2199 if pid2 != pid: 2200 raise AssertionError(f"pid {pid2} != pid {pid}") 2201 2202def skip_if_broken_multiprocessing_synchronize(): 2203 """ 2204 Skip tests if the multiprocessing.synchronize module is missing, if there 2205 is no available semaphore implementation, or if creating a lock raises an 2206 OSError (on Linux only). 2207 """ 2208 from .import_helper import import_module 2209 2210 # Skip tests if the _multiprocessing extension is missing. 2211 import_module('_multiprocessing') 2212 2213 # Skip tests if there is no available semaphore implementation: 2214 # multiprocessing.synchronize requires _multiprocessing.SemLock. 2215 synchronize = import_module('multiprocessing.synchronize') 2216 2217 if sys.platform == "linux": 2218 try: 2219 # bpo-38377: On Linux, creating a semaphore fails with OSError 2220 # if the current user does not have the permission to create 2221 # a file in /dev/shm/ directory. 2222 synchronize.Lock(ctx=None) 2223 except OSError as exc: 2224 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 2225 2226 2227def check_disallow_instantiation(testcase, tp, *args, **kwds): 2228 """ 2229 Check that given type cannot be instantiated using *args and **kwds. 2230 2231 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. 2232 """ 2233 mod = tp.__module__ 2234 name = tp.__name__ 2235 if mod != 'builtins': 2236 qualname = f"{mod}.{name}" 2237 else: 2238 qualname = f"{name}" 2239 msg = f"cannot create '{re.escape(qualname)}' instances" 2240 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) 2241 2242def get_recursion_depth(): 2243 """Get the recursion depth of the caller function. 2244 2245 In the __main__ module, at the module level, it should be 1. 2246 """ 2247 try: 2248 import _testinternalcapi 2249 depth = _testinternalcapi.get_recursion_depth() 2250 except (ImportError, RecursionError) as exc: 2251 # sys._getframe() + frame.f_back implementation. 2252 try: 2253 depth = 0 2254 frame = sys._getframe() 2255 while frame is not None: 2256 depth += 1 2257 frame = frame.f_back 2258 finally: 2259 # Break any reference cycles. 2260 frame = None 2261 2262 # Ignore get_recursion_depth() frame. 2263 return max(depth - 1, 1) 2264 2265def get_recursion_available(): 2266 """Get the number of available frames before RecursionError. 2267 2268 It depends on the current recursion depth of the caller function and 2269 sys.getrecursionlimit(). 2270 """ 2271 limit = sys.getrecursionlimit() 2272 depth = get_recursion_depth() 2273 return limit - depth 2274 2275@contextlib.contextmanager 2276def set_recursion_limit(limit): 2277 """Temporarily change the recursion limit.""" 2278 original_limit = sys.getrecursionlimit() 2279 try: 2280 sys.setrecursionlimit(limit) 2281 yield 2282 finally: 2283 sys.setrecursionlimit(original_limit) 2284 2285def infinite_recursion(max_depth=None): 2286 if max_depth is None: 2287 # Pick a number large enough to cause problems 2288 # but not take too long for code that can handle 2289 # very deep recursion. 2290 max_depth = 20_000 2291 elif max_depth < 3: 2292 raise ValueError("max_depth must be at least 3, got {max_depth}") 2293 depth = get_recursion_depth() 2294 depth = max(depth - 1, 1) # Ignore infinite_recursion() frame. 2295 limit = depth + max_depth 2296 return set_recursion_limit(limit) 2297 2298def ignore_deprecations_from(module: str, *, like: str) -> object: 2299 token = object() 2300 warnings.filterwarnings( 2301 "ignore", 2302 category=DeprecationWarning, 2303 module=module, 2304 message=like + fr"(?#support{id(token)})", 2305 ) 2306 return token 2307 2308def clear_ignored_deprecations(*tokens: object) -> None: 2309 if not tokens: 2310 raise ValueError("Provide token or tokens returned by ignore_deprecations_from") 2311 2312 new_filters = [] 2313 endswith = tuple(rf"(?#support{id(token)})" for token in tokens) 2314 for action, message, category, module, lineno in warnings.filters: 2315 if action == "ignore" and category is DeprecationWarning: 2316 if isinstance(message, re.Pattern): 2317 msg = message.pattern 2318 else: 2319 msg = message or "" 2320 if msg.endswith(endswith): 2321 continue 2322 new_filters.append((action, message, category, module, lineno)) 2323 if warnings.filters != new_filters: 2324 warnings.filters[:] = new_filters 2325 warnings._filters_mutated() 2326 2327 2328# Skip a test if venv with pip is known to not work. 2329def requires_venv_with_pip(): 2330 # ensurepip requires zlib to open ZIP archives (.whl binary wheel packages) 2331 try: 2332 import zlib 2333 except ImportError: 2334 return unittest.skipIf(True, "venv: ensurepip requires zlib") 2335 2336 # bpo-26610: pip/pep425tags.py requires ctypes. 2337 # gh-92820: setuptools/windows_support.py uses ctypes (setuptools 58.1). 2338 try: 2339 import ctypes 2340 except ImportError: 2341 ctypes = None 2342 return unittest.skipUnless(ctypes, 'venv: pip requires ctypes') 2343 2344 2345@functools.cache 2346def _findwheel(pkgname): 2347 """Try to find a wheel with the package specified as pkgname. 2348 2349 If set, the wheels are searched for in WHEEL_PKG_DIR (see ensurepip). 2350 Otherwise, they are searched for in the test directory. 2351 """ 2352 wheel_dir = sysconfig.get_config_var('WHEEL_PKG_DIR') or os.path.join( 2353 TEST_HOME_DIR, 'wheeldata', 2354 ) 2355 filenames = os.listdir(wheel_dir) 2356 filenames = sorted(filenames, reverse=True) # approximate "newest" first 2357 for filename in filenames: 2358 # filename is like 'setuptools-67.6.1-py3-none-any.whl' 2359 if not filename.endswith(".whl"): 2360 continue 2361 prefix = pkgname + '-' 2362 if filename.startswith(prefix): 2363 return os.path.join(wheel_dir, filename) 2364 raise FileNotFoundError(f"No wheel for {pkgname} found in {wheel_dir}") 2365 2366 2367# Context manager that creates a virtual environment, install setuptools and wheel in it 2368# and returns the path to the venv directory and the path to the python executable 2369@contextlib.contextmanager 2370def setup_venv_with_pip_setuptools_wheel(venv_dir): 2371 import shlex 2372 import subprocess 2373 from .os_helper import temp_cwd 2374 2375 def run_command(cmd): 2376 if verbose: 2377 print() 2378 print('Run:', ' '.join(map(shlex.quote, cmd))) 2379 subprocess.run(cmd, check=True) 2380 else: 2381 subprocess.run(cmd, 2382 stdout=subprocess.PIPE, 2383 stderr=subprocess.STDOUT, 2384 check=True) 2385 2386 with temp_cwd() as temp_dir: 2387 # Create virtual environment to get setuptools 2388 cmd = [sys.executable, '-X', 'dev', '-m', 'venv', venv_dir] 2389 run_command(cmd) 2390 2391 venv = os.path.join(temp_dir, venv_dir) 2392 2393 # Get the Python executable of the venv 2394 python_exe = os.path.basename(sys.executable) 2395 if sys.platform == 'win32': 2396 python = os.path.join(venv, 'Scripts', python_exe) 2397 else: 2398 python = os.path.join(venv, 'bin', python_exe) 2399 2400 cmd = [python, '-X', 'dev', 2401 '-m', 'pip', 'install', 2402 _findwheel('setuptools'), 2403 _findwheel('wheel')] 2404 run_command(cmd) 2405 2406 yield python 2407 2408 2409# True if Python is built with the Py_DEBUG macro defined: if 2410# Python is built in debug mode (./configure --with-pydebug). 2411Py_DEBUG = hasattr(sys, 'gettotalrefcount') 2412 2413 2414def late_deletion(obj): 2415 """ 2416 Keep a Python alive as long as possible. 2417 2418 Create a reference cycle and store the cycle in an object deleted late in 2419 Python finalization. Try to keep the object alive until the very last 2420 garbage collection. 2421 2422 The function keeps a strong reference by design. It should be called in a 2423 subprocess to not mark a test as "leaking a reference". 2424 """ 2425 2426 # Late CPython finalization: 2427 # - finalize_interp_clear() 2428 # - _PyInterpreterState_Clear(): Clear PyInterpreterState members 2429 # (ex: codec_search_path, before_forkers) 2430 # - clear os.register_at_fork() callbacks 2431 # - clear codecs.register() callbacks 2432 2433 ref_cycle = [obj] 2434 ref_cycle.append(ref_cycle) 2435 2436 # Store a reference in PyInterpreterState.codec_search_path 2437 import codecs 2438 def search_func(encoding): 2439 return None 2440 search_func.reference = ref_cycle 2441 codecs.register(search_func) 2442 2443 if hasattr(os, 'register_at_fork'): 2444 # Store a reference in PyInterpreterState.before_forkers 2445 def atfork_func(): 2446 pass 2447 atfork_func.reference = ref_cycle 2448 os.register_at_fork(before=atfork_func) 2449 2450 2451def busy_retry(timeout, err_msg=None, /, *, error=True): 2452 """ 2453 Run the loop body until "break" stops the loop. 2454 2455 After *timeout* seconds, raise an AssertionError if *error* is true, 2456 or just stop if *error is false. 2457 2458 Example: 2459 2460 for _ in support.busy_retry(support.SHORT_TIMEOUT): 2461 if check(): 2462 break 2463 2464 Example of error=False usage: 2465 2466 for _ in support.busy_retry(support.SHORT_TIMEOUT, error=False): 2467 if check(): 2468 break 2469 else: 2470 raise RuntimeError('my custom error') 2471 2472 """ 2473 if timeout <= 0: 2474 raise ValueError("timeout must be greater than zero") 2475 2476 start_time = time.monotonic() 2477 deadline = start_time + timeout 2478 2479 while True: 2480 yield 2481 2482 if time.monotonic() >= deadline: 2483 break 2484 2485 if error: 2486 dt = time.monotonic() - start_time 2487 msg = f"timeout ({dt:.1f} seconds)" 2488 if err_msg: 2489 msg = f"{msg}: {err_msg}" 2490 raise AssertionError(msg) 2491 2492 2493def sleeping_retry(timeout, err_msg=None, /, 2494 *, init_delay=0.010, max_delay=1.0, error=True): 2495 """ 2496 Wait strategy that applies exponential backoff. 2497 2498 Run the loop body until "break" stops the loop. Sleep at each loop 2499 iteration, but not at the first iteration. The sleep delay is doubled at 2500 each iteration (up to *max_delay* seconds). 2501 2502 See busy_retry() documentation for the parameters usage. 2503 2504 Example raising an exception after SHORT_TIMEOUT seconds: 2505 2506 for _ in support.sleeping_retry(support.SHORT_TIMEOUT): 2507 if check(): 2508 break 2509 2510 Example of error=False usage: 2511 2512 for _ in support.sleeping_retry(support.SHORT_TIMEOUT, error=False): 2513 if check(): 2514 break 2515 else: 2516 raise RuntimeError('my custom error') 2517 """ 2518 2519 delay = init_delay 2520 for _ in busy_retry(timeout, err_msg, error=error): 2521 yield 2522 2523 time.sleep(delay) 2524 delay = min(delay * 2, max_delay) 2525 2526 2527class CPUStopwatch: 2528 """Context manager to roughly time a CPU-bound operation. 2529 2530 Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of 2531 other processes). 2532 2533 N.B.: 2534 - This *includes* time spent in other threads. 2535 - Some systems only have a coarse resolution; check 2536 stopwatch.clock_info.rseolution if. 2537 2538 Usage: 2539 2540 with ProcessStopwatch() as stopwatch: 2541 ... 2542 elapsed = stopwatch.seconds 2543 resolution = stopwatch.clock_info.resolution 2544 """ 2545 def __enter__(self): 2546 get_time = time.process_time 2547 clock_info = time.get_clock_info('process_time') 2548 if get_time() <= 0: # some platforms like WASM lack process_time() 2549 get_time = time.monotonic 2550 clock_info = time.get_clock_info('monotonic') 2551 self.context = disable_gc() 2552 self.context.__enter__() 2553 self.get_time = get_time 2554 self.clock_info = clock_info 2555 self.start_time = get_time() 2556 return self 2557 2558 def __exit__(self, *exc): 2559 try: 2560 end_time = self.get_time() 2561 finally: 2562 result = self.context.__exit__(*exc) 2563 self.seconds = end_time - self.start_time 2564 return result 2565 2566 2567@contextlib.contextmanager 2568def adjust_int_max_str_digits(max_digits): 2569 """Temporarily change the integer string conversion length limit.""" 2570 current = sys.get_int_max_str_digits() 2571 try: 2572 sys.set_int_max_str_digits(max_digits) 2573 yield 2574 finally: 2575 sys.set_int_max_str_digits(current) 2576 2577 2578def get_c_recursion_limit(): 2579 try: 2580 import _testcapi 2581 return _testcapi.Py_C_RECURSION_LIMIT 2582 except ImportError: 2583 raise unittest.SkipTest('requires _testcapi') 2584 2585 2586def exceeds_recursion_limit(): 2587 """For recursion tests, easily exceeds default recursion limit.""" 2588 return get_c_recursion_limit() * 3 2589 2590 2591# Windows doesn't have os.uname() but it doesn't support s390x. 2592is_s390x = hasattr(os, 'uname') and os.uname().machine == 's390x' 2593skip_on_s390x = unittest.skipIf(is_s390x, 'skipped on s390x') 2594 2595Py_TRACE_REFS = hasattr(sys, 'getobjects') 2596 2597# Decorator to disable optimizer while a function run 2598def without_optimizer(func): 2599 try: 2600 from _testinternalcapi import get_optimizer, set_optimizer 2601 except ImportError: 2602 return func 2603 @functools.wraps(func) 2604 def wrapper(*args, **kwargs): 2605 save_opt = get_optimizer() 2606 try: 2607 set_optimizer(None) 2608 return func(*args, **kwargs) 2609 finally: 2610 set_optimizer(save_opt) 2611 return wrapper 2612 2613 2614_BASE_COPY_SRC_DIR_IGNORED_NAMES = frozenset({ 2615 # SRC_DIR/.git 2616 '.git', 2617 # ignore all __pycache__/ sub-directories 2618 '__pycache__', 2619}) 2620 2621# Ignore function for shutil.copytree() to copy the Python source code. 2622def copy_python_src_ignore(path, names): 2623 ignored = _BASE_COPY_SRC_DIR_IGNORED_NAMES 2624 if os.path.basename(path) == 'Doc': 2625 ignored |= { 2626 # SRC_DIR/Doc/build/ 2627 'build', 2628 # SRC_DIR/Doc/venv/ 2629 'venv', 2630 } 2631 2632 # check if we are at the root of the source code 2633 elif 'Modules' in names: 2634 ignored |= { 2635 # SRC_DIR/build/ 2636 'build', 2637 } 2638 return ignored 2639 2640 2641def iter_builtin_types(): 2642 for obj in __builtins__.values(): 2643 if not isinstance(obj, type): 2644 continue 2645 cls = obj 2646 if cls.__module__ != 'builtins': 2647 continue 2648 yield cls 2649 2650 2651def iter_slot_wrappers(cls): 2652 assert cls.__module__ == 'builtins', cls 2653 2654 def is_slot_wrapper(name, value): 2655 if not isinstance(value, types.WrapperDescriptorType): 2656 assert not repr(value).startswith('<slot wrapper '), (cls, name, value) 2657 return False 2658 assert repr(value).startswith('<slot wrapper '), (cls, name, value) 2659 assert callable(value), (cls, name, value) 2660 assert name.startswith('__') and name.endswith('__'), (cls, name, value) 2661 return True 2662 2663 ns = vars(cls) 2664 unused = set(ns) 2665 for name in dir(cls): 2666 if name in ns: 2667 unused.remove(name) 2668 2669 try: 2670 value = getattr(cls, name) 2671 except AttributeError: 2672 # It's as though it weren't in __dir__. 2673 assert name in ('__annotate__', '__annotations__', '__abstractmethods__'), (cls, name) 2674 if name in ns and is_slot_wrapper(name, ns[name]): 2675 unused.add(name) 2676 continue 2677 2678 if not name.startswith('__') or not name.endswith('__'): 2679 assert not is_slot_wrapper(name, value), (cls, name, value) 2680 if not is_slot_wrapper(name, value): 2681 if name in ns: 2682 assert not is_slot_wrapper(name, ns[name]), (cls, name, value, ns[name]) 2683 else: 2684 if name in ns: 2685 assert ns[name] is value, (cls, name, value, ns[name]) 2686 yield name, True 2687 else: 2688 yield name, False 2689 2690 for name in unused: 2691 value = ns[name] 2692 if is_slot_wrapper(cls, name, value): 2693 yield name, True 2694 2695 2696def force_not_colorized(func): 2697 """Force the terminal not to be colorized.""" 2698 @functools.wraps(func) 2699 def wrapper(*args, **kwargs): 2700 import _colorize 2701 original_fn = _colorize.can_colorize 2702 variables: dict[str, str | None] = { 2703 "PYTHON_COLORS": None, "FORCE_COLOR": None, "NO_COLOR": None 2704 } 2705 try: 2706 for key in variables: 2707 variables[key] = os.environ.pop(key, None) 2708 os.environ["NO_COLOR"] = "1" 2709 _colorize.can_colorize = lambda: False 2710 return func(*args, **kwargs) 2711 finally: 2712 _colorize.can_colorize = original_fn 2713 del os.environ["NO_COLOR"] 2714 for key, value in variables.items(): 2715 if value is not None: 2716 os.environ[key] = value 2717 return wrapper 2718 2719 2720def initialized_with_pyrepl(): 2721 """Detect whether PyREPL was used during Python initialization.""" 2722 # If the main module has a __file__ attribute it's a Python module, which means PyREPL. 2723 return hasattr(sys.modules["__main__"], "__file__") 2724 2725 2726class BrokenIter: 2727 def __init__(self, init_raises=False, next_raises=False, iter_raises=False): 2728 if init_raises: 2729 1/0 2730 self.next_raises = next_raises 2731 self.iter_raises = iter_raises 2732 2733 def __next__(self): 2734 if self.next_raises: 2735 1/0 2736 2737 def __iter__(self): 2738 if self.iter_raises: 2739 1/0 2740 return self 2741