1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import contextlib 7import functools 8import os 9import re 10import stat 11import sys 12import sysconfig 13import time 14import types 15import unittest 16import warnings 17 18from .testresult import get_test_runner 19 20 21try: 22 from _testcapi import unicode_legacy_string 23except ImportError: 24 unicode_legacy_string = None 25 26__all__ = [ 27 # globals 28 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 29 # exceptions 30 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 31 # io 32 "record_original_stdout", "get_original_stdout", "captured_stdout", 33 "captured_stdin", "captured_stderr", 34 # unittest 35 "is_resource_enabled", "requires", "requires_freebsd_version", 36 "requires_linux_version", "requires_mac_ver", 37 "check_syntax_error", 38 "BasicTestRunner", "run_unittest", "run_doctest", 39 "requires_gzip", "requires_bz2", "requires_lzma", 40 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 41 "requires_IEEE_754", "requires_zlib", 42 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 43 "check__all__", "skip_if_buggy_ucrt_strfptime", 44 "check_disallow_instantiation", 45 # sys 46 "is_jython", "is_android", "check_impl_detail", "unix_shell", 47 "setswitchinterval", 48 # network 49 "open_urlresource", 50 # processes 51 "reap_children", 52 # miscellaneous 53 "run_with_locale", "swap_item", "findfile", 54 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 55 "run_with_tz", "PGO", "missing_compiler_executable", 56 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 57 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 58 ] 59 60 61# Timeout in seconds for tests using a network server listening on the network 62# local loopback interface like 127.0.0.1. 63# 64# The timeout is long enough to prevent test failure: it takes into account 65# that the client and the server can run in different threads or even different 66# processes. 67# 68# The timeout should be long enough for connect(), recv() and send() methods 69# of socket.socket. 70LOOPBACK_TIMEOUT = 5.0 71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version: 72 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 73 # seconds on Windows ARM32 buildbot 74 LOOPBACK_TIMEOUT = 10 75elif sys.platform == 'vxworks': 76 LOOPBACK_TIMEOUT = 10 77 78# Timeout in seconds for network requests going to the internet. The timeout is 79# short enough to prevent a test to wait for too long if the internet request 80# is blocked for whatever reason. 81# 82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 83# but skip the test instead: see transient_internet(). 84INTERNET_TIMEOUT = 60.0 85 86# Timeout in seconds to mark a test as failed if the test takes "too long". 87# 88# The timeout value depends on the regrtest --timeout command line option. 89# 90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 91# LONG_TIMEOUT instead. 92SHORT_TIMEOUT = 30.0 93 94# Timeout in seconds to detect when a test hangs. 95# 96# It is long enough to reduce the risk of test failure on the slowest Python 97# buildbots. It should not be used to mark a test as failed if the test takes 98# "too long". The timeout value depends on the regrtest --timeout command line 99# option. 100LONG_TIMEOUT = 5 * 60.0 101 102 103class Error(Exception): 104 """Base class for regression test exceptions.""" 105 106class TestFailed(Error): 107 """Test failed.""" 108 109class TestFailedWithDetails(TestFailed): 110 """Test failed.""" 111 def __init__(self, msg, errors, failures): 112 self.msg = msg 113 self.errors = errors 114 self.failures = failures 115 super().__init__(msg, errors, failures) 116 117 def __str__(self): 118 return self.msg 119 120class TestDidNotRun(Error): 121 """Test did not run any subtests.""" 122 123class ResourceDenied(unittest.SkipTest): 124 """Test skipped because it requested a disallowed resource. 125 126 This is raised when a test calls requires() for a resource that 127 has not be enabled. It is used to distinguish between expected 128 and unexpected skips. 129 """ 130 131def anticipate_failure(condition): 132 """Decorator to mark a test that is known to be broken in some cases 133 134 Any use of this decorator should have a comment identifying the 135 associated tracker issue. 136 """ 137 if condition: 138 return unittest.expectedFailure 139 return lambda f: f 140 141def load_package_tests(pkg_dir, loader, standard_tests, pattern): 142 """Generic load_tests implementation for simple test packages. 143 144 Most packages can implement load_tests using this function as follows: 145 146 def load_tests(*args): 147 return load_package_tests(os.path.dirname(__file__), *args) 148 """ 149 if pattern is None: 150 pattern = "test*" 151 top_dir = os.path.dirname( # Lib 152 os.path.dirname( # test 153 os.path.dirname(__file__))) # support 154 package_tests = loader.discover(start_dir=pkg_dir, 155 top_level_dir=top_dir, 156 pattern=pattern) 157 standard_tests.addTests(package_tests) 158 return standard_tests 159 160 161def get_attribute(obj, name): 162 """Get an attribute, raising SkipTest if AttributeError is raised.""" 163 try: 164 attribute = getattr(obj, name) 165 except AttributeError: 166 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 167 else: 168 return attribute 169 170verbose = 1 # Flag set to 0 by regrtest.py 171use_resources = None # Flag set to [] by regrtest.py 172max_memuse = 0 # Disable bigmem tests (they will still be run with 173 # small sizes, to make sure they work.) 174real_max_memuse = 0 175junit_xml_list = None # list of testsuite XML elements 176failfast = False 177 178# _original_stdout is meant to hold stdout at the time regrtest began. 179# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 180# The point is to have some flavor of stdout the user can actually see. 181_original_stdout = None 182def record_original_stdout(stdout): 183 global _original_stdout 184 _original_stdout = stdout 185 186def get_original_stdout(): 187 return _original_stdout or sys.stdout 188 189 190def _force_run(path, func, *args): 191 try: 192 return func(*args) 193 except OSError as err: 194 if verbose >= 2: 195 print('%s: %s' % (err.__class__.__name__, err)) 196 print('re-run %s%r' % (func.__name__, args)) 197 os.chmod(path, stat.S_IRWXU) 198 return func(*args) 199 200 201# Check whether a gui is actually available 202def _is_gui_available(): 203 if hasattr(_is_gui_available, 'result'): 204 return _is_gui_available.result 205 import platform 206 reason = None 207 if sys.platform.startswith('win') and platform.win32_is_iot(): 208 reason = "gui is not available on Windows IoT Core" 209 elif sys.platform.startswith('win'): 210 # if Python is running as a service (such as the buildbot service), 211 # gui interaction may be disallowed 212 import ctypes 213 import ctypes.wintypes 214 UOI_FLAGS = 1 215 WSF_VISIBLE = 0x0001 216 class USEROBJECTFLAGS(ctypes.Structure): 217 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 218 ("fReserved", ctypes.wintypes.BOOL), 219 ("dwFlags", ctypes.wintypes.DWORD)] 220 dll = ctypes.windll.user32 221 h = dll.GetProcessWindowStation() 222 if not h: 223 raise ctypes.WinError() 224 uof = USEROBJECTFLAGS() 225 needed = ctypes.wintypes.DWORD() 226 res = dll.GetUserObjectInformationW(h, 227 UOI_FLAGS, 228 ctypes.byref(uof), 229 ctypes.sizeof(uof), 230 ctypes.byref(needed)) 231 if not res: 232 raise ctypes.WinError() 233 if not bool(uof.dwFlags & WSF_VISIBLE): 234 reason = "gui not available (WSF_VISIBLE flag not set)" 235 elif sys.platform == 'darwin': 236 # The Aqua Tk implementations on OS X can abort the process if 237 # being called in an environment where a window server connection 238 # cannot be made, for instance when invoked by a buildbot or ssh 239 # process not running under the same user id as the current console 240 # user. To avoid that, raise an exception if the window manager 241 # connection is not available. 242 from ctypes import cdll, c_int, pointer, Structure 243 from ctypes.util import find_library 244 245 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 246 247 if app_services.CGMainDisplayID() == 0: 248 reason = "gui tests cannot run without OS X window manager" 249 else: 250 class ProcessSerialNumber(Structure): 251 _fields_ = [("highLongOfPSN", c_int), 252 ("lowLongOfPSN", c_int)] 253 psn = ProcessSerialNumber() 254 psn_p = pointer(psn) 255 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 256 (app_services.SetFrontProcess(psn_p) < 0) ): 257 reason = "cannot run without OS X gui process" 258 259 # check on every platform whether tkinter can actually do anything 260 if not reason: 261 try: 262 from tkinter import Tk 263 root = Tk() 264 root.withdraw() 265 root.update() 266 root.destroy() 267 except Exception as e: 268 err_string = str(e) 269 if len(err_string) > 50: 270 err_string = err_string[:50] + ' [...]' 271 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 272 err_string) 273 274 _is_gui_available.reason = reason 275 _is_gui_available.result = not reason 276 277 return _is_gui_available.result 278 279def is_resource_enabled(resource): 280 """Test whether a resource is enabled. 281 282 Known resources are set by regrtest.py. If not running under regrtest.py, 283 all resources are assumed enabled unless use_resources has been set. 284 """ 285 return use_resources is None or resource in use_resources 286 287def requires(resource, msg=None): 288 """Raise ResourceDenied if the specified resource is not available.""" 289 if not is_resource_enabled(resource): 290 if msg is None: 291 msg = "Use of the %r resource not enabled" % resource 292 raise ResourceDenied(msg) 293 if resource == 'gui' and not _is_gui_available(): 294 raise ResourceDenied(_is_gui_available.reason) 295 296def _requires_unix_version(sysname, min_version): 297 """Decorator raising SkipTest if the OS is `sysname` and the version is less 298 than `min_version`. 299 300 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 301 the FreeBSD version is less than 7.2. 302 """ 303 import platform 304 min_version_txt = '.'.join(map(str, min_version)) 305 version_txt = platform.release().split('-', 1)[0] 306 if platform.system() == sysname: 307 try: 308 version = tuple(map(int, version_txt.split('.'))) 309 except ValueError: 310 skip = False 311 else: 312 skip = version < min_version 313 else: 314 skip = False 315 316 return unittest.skipIf( 317 skip, 318 f"{sysname} version {min_version_txt} or higher required, not " 319 f"{version_txt}" 320 ) 321 322 323def requires_freebsd_version(*min_version): 324 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 325 less than `min_version`. 326 327 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 328 version is less than 7.2. 329 """ 330 return _requires_unix_version('FreeBSD', min_version) 331 332def requires_linux_version(*min_version): 333 """Decorator raising SkipTest if the OS is Linux and the Linux version is 334 less than `min_version`. 335 336 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 337 version is less than 2.6.32. 338 """ 339 return _requires_unix_version('Linux', min_version) 340 341def requires_mac_ver(*min_version): 342 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 343 version if less than min_version. 344 345 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 346 is lesser than 10.5. 347 """ 348 def decorator(func): 349 @functools.wraps(func) 350 def wrapper(*args, **kw): 351 if sys.platform == 'darwin': 352 import platform 353 version_txt = platform.mac_ver()[0] 354 try: 355 version = tuple(map(int, version_txt.split('.'))) 356 except ValueError: 357 pass 358 else: 359 if version < min_version: 360 min_version_txt = '.'.join(map(str, min_version)) 361 raise unittest.SkipTest( 362 "Mac OS X %s or higher required, not %s" 363 % (min_version_txt, version_txt)) 364 return func(*args, **kw) 365 wrapper.min_version = min_version 366 return wrapper 367 return decorator 368 369 370def system_must_validate_cert(f): 371 """Skip the test on TLS certificate validation failures.""" 372 @functools.wraps(f) 373 def dec(*args, **kwargs): 374 try: 375 f(*args, **kwargs) 376 except OSError as e: 377 if "CERTIFICATE_VERIFY_FAILED" in str(e): 378 raise unittest.SkipTest("system does not contain " 379 "necessary certificates") 380 raise 381 return dec 382 383# A constant likely larger than the underlying OS pipe buffer size, to 384# make writes blocking. 385# Windows limit seems to be around 512 B, and many Unix kernels have a 386# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 387# (see issue #17835 for a discussion of this number). 388PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 389 390# A constant likely larger than the underlying OS socket buffer size, to make 391# writes blocking. 392# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 393# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 394# for a discussion of this number. 395SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 396 397# decorator for skipping tests on non-IEEE 754 platforms 398requires_IEEE_754 = unittest.skipUnless( 399 float.__getformat__("double").startswith("IEEE"), 400 "test requires IEEE 754 doubles") 401 402def requires_zlib(reason='requires zlib'): 403 try: 404 import zlib 405 except ImportError: 406 zlib = None 407 return unittest.skipUnless(zlib, reason) 408 409def requires_gzip(reason='requires gzip'): 410 try: 411 import gzip 412 except ImportError: 413 gzip = None 414 return unittest.skipUnless(gzip, reason) 415 416def requires_bz2(reason='requires bz2'): 417 try: 418 import bz2 419 except ImportError: 420 bz2 = None 421 return unittest.skipUnless(bz2, reason) 422 423def requires_lzma(reason='requires lzma'): 424 try: 425 import lzma 426 except ImportError: 427 lzma = None 428 return unittest.skipUnless(lzma, reason) 429 430requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, 431 'requires legacy Unicode C API') 432 433is_jython = sys.platform.startswith('java') 434 435is_android = hasattr(sys, 'getandroidapilevel') 436 437if sys.platform not in ('win32', 'vxworks'): 438 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 439else: 440 unix_shell = None 441 442# Define the URL of a dedicated HTTP server for the network tests. 443# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 444TEST_HTTP_URL = "http://www.pythontest.net" 445 446# Set by libregrtest/main.py so we can skip tests that are not 447# useful for PGO 448PGO = False 449 450# Set by libregrtest/main.py if we are running the extended (time consuming) 451# PGO task. If this is True, PGO is also True. 452PGO_EXTENDED = False 453 454# TEST_HOME_DIR refers to the top level directory of the "test" package 455# that contains Python's regression test suite 456TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 457TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 458 459# TEST_DATA_DIR is used as a target download location for remote resources 460TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 461 462 463def darwin_malloc_err_warning(test_name): 464 """Assure user that loud errors generated by macOS libc's malloc are 465 expected.""" 466 if sys.platform != 'darwin': 467 return 468 469 import shutil 470 msg = ' NOTICE ' 471 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' 472 'warnings on macOS systems. This behavior is known. Do not\n' 473 'report a bug unless tests are also failing. See bpo-40928.') 474 475 padding, _ = shutil.get_terminal_size() 476 print(msg.center(padding, '-')) 477 print(detail) 478 print('-' * padding) 479 480 481def findfile(filename, subdir=None): 482 """Try to find a file on sys.path or in the test directory. If it is not 483 found the argument passed to the function is returned (this does not 484 necessarily signal failure; could still be the legitimate path). 485 486 Setting *subdir* indicates a relative path to use to find the file 487 rather than looking directly in the path directories. 488 """ 489 if os.path.isabs(filename): 490 return filename 491 if subdir is not None: 492 filename = os.path.join(subdir, filename) 493 path = [TEST_HOME_DIR] + sys.path 494 for dn in path: 495 fn = os.path.join(dn, filename) 496 if os.path.exists(fn): return fn 497 return filename 498 499 500def sortdict(dict): 501 "Like repr(dict), but in sorted order." 502 items = sorted(dict.items()) 503 reprpairs = ["%r: %r" % pair for pair in items] 504 withcommas = ", ".join(reprpairs) 505 return "{%s}" % withcommas 506 507def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 508 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 509 compile(statement, '<test string>', 'exec') 510 err = cm.exception 511 testcase.assertIsNotNone(err.lineno) 512 if lineno is not None: 513 testcase.assertEqual(err.lineno, lineno) 514 testcase.assertIsNotNone(err.offset) 515 if offset is not None: 516 testcase.assertEqual(err.offset, offset) 517 518 519def open_urlresource(url, *args, **kw): 520 import urllib.request, urllib.parse 521 from .os_helper import unlink 522 try: 523 import gzip 524 except ImportError: 525 gzip = None 526 527 check = kw.pop('check', None) 528 529 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 530 531 fn = os.path.join(TEST_DATA_DIR, filename) 532 533 def check_valid_file(fn): 534 f = open(fn, *args, **kw) 535 if check is None: 536 return f 537 elif check(f): 538 f.seek(0) 539 return f 540 f.close() 541 542 if os.path.exists(fn): 543 f = check_valid_file(fn) 544 if f is not None: 545 return f 546 unlink(fn) 547 548 # Verify the requirement before downloading the file 549 requires('urlfetch') 550 551 if verbose: 552 print('\tfetching %s ...' % url, file=get_original_stdout()) 553 opener = urllib.request.build_opener() 554 if gzip: 555 opener.addheaders.append(('Accept-Encoding', 'gzip')) 556 f = opener.open(url, timeout=INTERNET_TIMEOUT) 557 if gzip and f.headers.get('Content-Encoding') == 'gzip': 558 f = gzip.GzipFile(fileobj=f) 559 try: 560 with open(fn, "wb") as out: 561 s = f.read() 562 while s: 563 out.write(s) 564 s = f.read() 565 finally: 566 f.close() 567 568 f = check_valid_file(fn) 569 if f is not None: 570 return f 571 raise TestFailed('invalid resource %r' % fn) 572 573 574@contextlib.contextmanager 575def captured_output(stream_name): 576 """Return a context manager used by captured_stdout/stdin/stderr 577 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 578 import io 579 orig_stdout = getattr(sys, stream_name) 580 setattr(sys, stream_name, io.StringIO()) 581 try: 582 yield getattr(sys, stream_name) 583 finally: 584 setattr(sys, stream_name, orig_stdout) 585 586def captured_stdout(): 587 """Capture the output of sys.stdout: 588 589 with captured_stdout() as stdout: 590 print("hello") 591 self.assertEqual(stdout.getvalue(), "hello\\n") 592 """ 593 return captured_output("stdout") 594 595def captured_stderr(): 596 """Capture the output of sys.stderr: 597 598 with captured_stderr() as stderr: 599 print("hello", file=sys.stderr) 600 self.assertEqual(stderr.getvalue(), "hello\\n") 601 """ 602 return captured_output("stderr") 603 604def captured_stdin(): 605 """Capture the input to sys.stdin: 606 607 with captured_stdin() as stdin: 608 stdin.write('hello\\n') 609 stdin.seek(0) 610 # call test code that consumes from sys.stdin 611 captured = input() 612 self.assertEqual(captured, "hello") 613 """ 614 return captured_output("stdin") 615 616 617def gc_collect(): 618 """Force as many objects as possible to be collected. 619 620 In non-CPython implementations of Python, this is needed because timely 621 deallocation is not guaranteed by the garbage collector. (Even in CPython 622 this can be the case in case of reference cycles.) This means that __del__ 623 methods may be called later than expected and weakrefs may remain alive for 624 longer than expected. This function tries its best to force all garbage 625 objects to disappear. 626 """ 627 import gc 628 gc.collect() 629 if is_jython: 630 time.sleep(0.1) 631 gc.collect() 632 gc.collect() 633 634@contextlib.contextmanager 635def disable_gc(): 636 import gc 637 have_gc = gc.isenabled() 638 gc.disable() 639 try: 640 yield 641 finally: 642 if have_gc: 643 gc.enable() 644 645 646def python_is_optimized(): 647 """Find if Python was built with optimizations.""" 648 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 649 final_opt = "" 650 for opt in cflags.split(): 651 if opt.startswith('-O'): 652 final_opt = opt 653 return final_opt not in ('', '-O0', '-Og') 654 655 656_header = 'nP' 657_align = '0n' 658if hasattr(sys, "getobjects"): 659 _header = '2P' + _header 660 _align = '0P' 661_vheader = _header + 'n' 662 663def calcobjsize(fmt): 664 import struct 665 return struct.calcsize(_header + fmt + _align) 666 667def calcvobjsize(fmt): 668 import struct 669 return struct.calcsize(_vheader + fmt + _align) 670 671 672_TPFLAGS_HAVE_GC = 1<<14 673_TPFLAGS_HEAPTYPE = 1<<9 674 675def check_sizeof(test, o, size): 676 import _testinternalcapi 677 result = sys.getsizeof(o) 678 # add GC header size 679 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 680 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 681 size += _testinternalcapi.SIZEOF_PYGC_HEAD 682 msg = 'wrong size for %s: got %d, expected %d' \ 683 % (type(o), result, size) 684 test.assertEqual(result, size, msg) 685 686#======================================================================= 687# Decorator for running a function in a different locale, correctly resetting 688# it afterwards. 689 690@contextlib.contextmanager 691def run_with_locale(catstr, *locales): 692 try: 693 import locale 694 category = getattr(locale, catstr) 695 orig_locale = locale.setlocale(category) 696 except AttributeError: 697 # if the test author gives us an invalid category string 698 raise 699 except: 700 # cannot retrieve original locale, so do nothing 701 locale = orig_locale = None 702 else: 703 for loc in locales: 704 try: 705 locale.setlocale(category, loc) 706 break 707 except: 708 pass 709 710 try: 711 yield 712 finally: 713 if locale and orig_locale: 714 locale.setlocale(category, orig_locale) 715 716#======================================================================= 717# Decorator for running a function in a specific timezone, correctly 718# resetting it afterwards. 719 720def run_with_tz(tz): 721 def decorator(func): 722 def inner(*args, **kwds): 723 try: 724 tzset = time.tzset 725 except AttributeError: 726 raise unittest.SkipTest("tzset required") 727 if 'TZ' in os.environ: 728 orig_tz = os.environ['TZ'] 729 else: 730 orig_tz = None 731 os.environ['TZ'] = tz 732 tzset() 733 734 # now run the function, resetting the tz on exceptions 735 try: 736 return func(*args, **kwds) 737 finally: 738 if orig_tz is None: 739 del os.environ['TZ'] 740 else: 741 os.environ['TZ'] = orig_tz 742 time.tzset() 743 744 inner.__name__ = func.__name__ 745 inner.__doc__ = func.__doc__ 746 return inner 747 return decorator 748 749#======================================================================= 750# Big-memory-test support. Separate from 'resources' because memory use 751# should be configurable. 752 753# Some handy shorthands. Note that these are used for byte-limits as well 754# as size-limits, in the various bigmem tests 755_1M = 1024*1024 756_1G = 1024 * _1M 757_2G = 2 * _1G 758_4G = 4 * _1G 759 760MAX_Py_ssize_t = sys.maxsize 761 762def set_memlimit(limit): 763 global max_memuse 764 global real_max_memuse 765 sizes = { 766 'k': 1024, 767 'm': _1M, 768 'g': _1G, 769 't': 1024*_1G, 770 } 771 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 772 re.IGNORECASE | re.VERBOSE) 773 if m is None: 774 raise ValueError('Invalid memory limit %r' % (limit,)) 775 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 776 real_max_memuse = memlimit 777 if memlimit > MAX_Py_ssize_t: 778 memlimit = MAX_Py_ssize_t 779 if memlimit < _2G - 1: 780 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 781 max_memuse = memlimit 782 783class _MemoryWatchdog: 784 """An object which periodically watches the process' memory consumption 785 and prints it out. 786 """ 787 788 def __init__(self): 789 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 790 self.started = False 791 792 def start(self): 793 import warnings 794 try: 795 f = open(self.procfile, 'r') 796 except OSError as e: 797 warnings.warn('/proc not available for stats: {}'.format(e), 798 RuntimeWarning) 799 sys.stderr.flush() 800 return 801 802 import subprocess 803 with f: 804 watchdog_script = findfile("memory_watchdog.py") 805 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 806 stdin=f, 807 stderr=subprocess.DEVNULL) 808 self.started = True 809 810 def stop(self): 811 if self.started: 812 self.mem_watchdog.terminate() 813 self.mem_watchdog.wait() 814 815 816def bigmemtest(size, memuse, dry_run=True): 817 """Decorator for bigmem tests. 818 819 'size' is a requested size for the test (in arbitrary, test-interpreted 820 units.) 'memuse' is the number of bytes per unit for the test, or a good 821 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 822 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 823 824 The 'size' argument is normally passed to the decorated test method as an 825 extra argument. If 'dry_run' is true, the value passed to the test method 826 may be less than the requested value. If 'dry_run' is false, it means the 827 test doesn't support dummy runs when -M is not specified. 828 """ 829 def decorator(f): 830 def wrapper(self): 831 size = wrapper.size 832 memuse = wrapper.memuse 833 if not real_max_memuse: 834 maxsize = 5147 835 else: 836 maxsize = size 837 838 if ((real_max_memuse or not dry_run) 839 and real_max_memuse < maxsize * memuse): 840 raise unittest.SkipTest( 841 "not enough memory: %.1fG minimum needed" 842 % (size * memuse / (1024 ** 3))) 843 844 if real_max_memuse and verbose: 845 print() 846 print(" ... expected peak memory use: {peak:.1f}G" 847 .format(peak=size * memuse / (1024 ** 3))) 848 watchdog = _MemoryWatchdog() 849 watchdog.start() 850 else: 851 watchdog = None 852 853 try: 854 return f(self, maxsize) 855 finally: 856 if watchdog: 857 watchdog.stop() 858 859 wrapper.size = size 860 wrapper.memuse = memuse 861 return wrapper 862 return decorator 863 864def bigaddrspacetest(f): 865 """Decorator for tests that fill the address space.""" 866 def wrapper(self): 867 if max_memuse < MAX_Py_ssize_t: 868 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 869 raise unittest.SkipTest( 870 "not enough memory: try a 32-bit build instead") 871 else: 872 raise unittest.SkipTest( 873 "not enough memory: %.1fG minimum needed" 874 % (MAX_Py_ssize_t / (1024 ** 3))) 875 else: 876 return f(self) 877 return wrapper 878 879#======================================================================= 880# unittest integration. 881 882class BasicTestRunner: 883 def run(self, test): 884 result = unittest.TestResult() 885 test(result) 886 return result 887 888def _id(obj): 889 return obj 890 891def requires_resource(resource): 892 if resource == 'gui' and not _is_gui_available(): 893 return unittest.skip(_is_gui_available.reason) 894 if is_resource_enabled(resource): 895 return _id 896 else: 897 return unittest.skip("resource {0!r} is not enabled".format(resource)) 898 899def cpython_only(test): 900 """ 901 Decorator for tests only applicable on CPython. 902 """ 903 return impl_detail(cpython=True)(test) 904 905def impl_detail(msg=None, **guards): 906 if check_impl_detail(**guards): 907 return _id 908 if msg is None: 909 guardnames, default = _parse_guards(guards) 910 if default: 911 msg = "implementation detail not available on {0}" 912 else: 913 msg = "implementation detail specific to {0}" 914 guardnames = sorted(guardnames.keys()) 915 msg = msg.format(' or '.join(guardnames)) 916 return unittest.skip(msg) 917 918def _parse_guards(guards): 919 # Returns a tuple ({platform_name: run_me}, default_value) 920 if not guards: 921 return ({'cpython': True}, False) 922 is_true = list(guards.values())[0] 923 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 924 return (guards, not is_true) 925 926# Use the following check to guard CPython's implementation-specific tests -- 927# or to run them only on the implementation(s) guarded by the arguments. 928def check_impl_detail(**guards): 929 """This function returns True or False depending on the host platform. 930 Examples: 931 if check_impl_detail(): # only on CPython (default) 932 if check_impl_detail(jython=True): # only on Jython 933 if check_impl_detail(cpython=False): # everywhere except on CPython 934 """ 935 guards, default = _parse_guards(guards) 936 return guards.get(sys.implementation.name, default) 937 938 939def no_tracing(func): 940 """Decorator to temporarily turn off tracing for the duration of a test.""" 941 if not hasattr(sys, 'gettrace'): 942 return func 943 else: 944 @functools.wraps(func) 945 def wrapper(*args, **kwargs): 946 original_trace = sys.gettrace() 947 try: 948 sys.settrace(None) 949 return func(*args, **kwargs) 950 finally: 951 sys.settrace(original_trace) 952 return wrapper 953 954 955def refcount_test(test): 956 """Decorator for tests which involve reference counting. 957 958 To start, the decorator does not run the test if is not run by CPython. 959 After that, any trace function is unset during the test to prevent 960 unexpected refcounts caused by the trace function. 961 962 """ 963 return no_tracing(cpython_only(test)) 964 965 966def _filter_suite(suite, pred): 967 """Recursively filter test cases in a suite based on a predicate.""" 968 newtests = [] 969 for test in suite._tests: 970 if isinstance(test, unittest.TestSuite): 971 _filter_suite(test, pred) 972 newtests.append(test) 973 else: 974 if pred(test): 975 newtests.append(test) 976 suite._tests = newtests 977 978def _run_suite(suite): 979 """Run tests from a unittest.TestSuite-derived class.""" 980 runner = get_test_runner(sys.stdout, 981 verbosity=verbose, 982 capture_output=(junit_xml_list is not None)) 983 984 result = runner.run(suite) 985 986 if junit_xml_list is not None: 987 junit_xml_list.append(result.get_xml_element()) 988 989 if not result.testsRun and not result.skipped: 990 raise TestDidNotRun 991 if not result.wasSuccessful(): 992 if len(result.errors) == 1 and not result.failures: 993 err = result.errors[0][1] 994 elif len(result.failures) == 1 and not result.errors: 995 err = result.failures[0][1] 996 else: 997 err = "multiple errors occurred" 998 if not verbose: err += "; run in verbose mode for details" 999 errors = [(str(tc), exc_str) for tc, exc_str in result.errors] 1000 failures = [(str(tc), exc_str) for tc, exc_str in result.failures] 1001 raise TestFailedWithDetails(err, errors, failures) 1002 1003 1004# By default, don't filter tests 1005_match_test_func = None 1006 1007_accept_test_patterns = None 1008_ignore_test_patterns = None 1009 1010 1011def match_test(test): 1012 # Function used by support.run_unittest() and regrtest --list-cases 1013 if _match_test_func is None: 1014 return True 1015 else: 1016 return _match_test_func(test.id()) 1017 1018 1019def _is_full_match_test(pattern): 1020 # If a pattern contains at least one dot, it's considered 1021 # as a full test identifier. 1022 # Example: 'test.test_os.FileTests.test_access'. 1023 # 1024 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 1025 # or '[!...]'. For example, ignore 'test_access*'. 1026 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1027 1028 1029def set_match_tests(accept_patterns=None, ignore_patterns=None): 1030 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 1031 1032 1033 if accept_patterns is None: 1034 accept_patterns = () 1035 if ignore_patterns is None: 1036 ignore_patterns = () 1037 1038 accept_func = ignore_func = None 1039 1040 if accept_patterns != _accept_test_patterns: 1041 accept_patterns, accept_func = _compile_match_function(accept_patterns) 1042 if ignore_patterns != _ignore_test_patterns: 1043 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 1044 1045 # Create a copy since patterns can be mutable and so modified later 1046 _accept_test_patterns = tuple(accept_patterns) 1047 _ignore_test_patterns = tuple(ignore_patterns) 1048 1049 if accept_func is not None or ignore_func is not None: 1050 def match_function(test_id): 1051 accept = True 1052 ignore = False 1053 if accept_func: 1054 accept = accept_func(test_id) 1055 if ignore_func: 1056 ignore = ignore_func(test_id) 1057 return accept and not ignore 1058 1059 _match_test_func = match_function 1060 1061 1062def _compile_match_function(patterns): 1063 if not patterns: 1064 func = None 1065 # set_match_tests(None) behaves as set_match_tests(()) 1066 patterns = () 1067 elif all(map(_is_full_match_test, patterns)): 1068 # Simple case: all patterns are full test identifier. 1069 # The test.bisect_cmd utility only uses such full test identifiers. 1070 func = set(patterns).__contains__ 1071 else: 1072 import fnmatch 1073 regex = '|'.join(map(fnmatch.translate, patterns)) 1074 # The search *is* case sensitive on purpose: 1075 # don't use flags=re.IGNORECASE 1076 regex_match = re.compile(regex).match 1077 1078 def match_test_regex(test_id): 1079 if regex_match(test_id): 1080 # The regex matches the whole identifier, for example 1081 # 'test.test_os.FileTests.test_access'. 1082 return True 1083 else: 1084 # Try to match parts of the test identifier. 1085 # For example, split 'test.test_os.FileTests.test_access' 1086 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1087 return any(map(regex_match, test_id.split("."))) 1088 1089 func = match_test_regex 1090 1091 return patterns, func 1092 1093 1094def run_unittest(*classes): 1095 """Run tests from unittest.TestCase-derived classes.""" 1096 valid_types = (unittest.TestSuite, unittest.TestCase) 1097 suite = unittest.TestSuite() 1098 for cls in classes: 1099 if isinstance(cls, str): 1100 if cls in sys.modules: 1101 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1102 else: 1103 raise ValueError("str arguments must be keys in sys.modules") 1104 elif isinstance(cls, valid_types): 1105 suite.addTest(cls) 1106 else: 1107 suite.addTest(unittest.makeSuite(cls)) 1108 _filter_suite(suite, match_test) 1109 _run_suite(suite) 1110 1111#======================================================================= 1112# Check for the presence of docstrings. 1113 1114# Rather than trying to enumerate all the cases where docstrings may be 1115# disabled, we just check for that directly 1116 1117def _check_docstrings(): 1118 """Just used to check if docstrings are enabled""" 1119 1120MISSING_C_DOCSTRINGS = (check_impl_detail() and 1121 sys.platform != 'win32' and 1122 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1123 1124HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1125 not MISSING_C_DOCSTRINGS) 1126 1127requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1128 "test requires docstrings") 1129 1130 1131#======================================================================= 1132# doctest driver. 1133 1134def run_doctest(module, verbosity=None, optionflags=0): 1135 """Run doctest on the given module. Return (#failures, #tests). 1136 1137 If optional argument verbosity is not specified (or is None), pass 1138 support's belief about verbosity on to doctest. Else doctest's 1139 usual behavior is used (it searches sys.argv for -v). 1140 """ 1141 1142 import doctest 1143 1144 if verbosity is None: 1145 verbosity = verbose 1146 else: 1147 verbosity = None 1148 1149 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1150 if f: 1151 raise TestFailed("%d of %d doctests failed" % (f, t)) 1152 if verbose: 1153 print('doctest (%s) ... %d tests with zero failures' % 1154 (module.__name__, t)) 1155 return f, t 1156 1157 1158#======================================================================= 1159# Support for saving and restoring the imported modules. 1160 1161def print_warning(msg): 1162 # bpo-39983: Print into sys.__stderr__ to display the warning even 1163 # when sys.stderr is captured temporarily by a test 1164 for line in msg.splitlines(): 1165 print(f"Warning -- {line}", file=sys.__stderr__, flush=True) 1166 1167 1168# Flag used by saved_test_environment of test.libregrtest.save_env, 1169# to check if a test modified the environment. The flag should be set to False 1170# before running a new test. 1171# 1172# For example, threading_helper.threading_cleanup() sets the flag is the function fails 1173# to cleanup threads. 1174environment_altered = False 1175 1176def reap_children(): 1177 """Use this function at the end of test_main() whenever sub-processes 1178 are started. This will help ensure that no extra children (zombies) 1179 stick around to hog resources and create problems when looking 1180 for refleaks. 1181 """ 1182 global environment_altered 1183 1184 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 1185 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 1186 return 1187 1188 # Reap all our dead child processes so we don't leave zombies around. 1189 # These hog resources and might be causing some of the buildbots to die. 1190 while True: 1191 try: 1192 # Read the exit status of any child process which already completed 1193 pid, status = os.waitpid(-1, os.WNOHANG) 1194 except OSError: 1195 break 1196 1197 if pid == 0: 1198 break 1199 1200 print_warning(f"reap_children() reaped child process {pid}") 1201 environment_altered = True 1202 1203 1204@contextlib.contextmanager 1205def swap_attr(obj, attr, new_val): 1206 """Temporary swap out an attribute with a new object. 1207 1208 Usage: 1209 with swap_attr(obj, "attr", 5): 1210 ... 1211 1212 This will set obj.attr to 5 for the duration of the with: block, 1213 restoring the old value at the end of the block. If `attr` doesn't 1214 exist on `obj`, it will be created and then deleted at the end of the 1215 block. 1216 1217 The old value (or None if it doesn't exist) will be assigned to the 1218 target of the "as" clause, if there is one. 1219 """ 1220 if hasattr(obj, attr): 1221 real_val = getattr(obj, attr) 1222 setattr(obj, attr, new_val) 1223 try: 1224 yield real_val 1225 finally: 1226 setattr(obj, attr, real_val) 1227 else: 1228 setattr(obj, attr, new_val) 1229 try: 1230 yield 1231 finally: 1232 if hasattr(obj, attr): 1233 delattr(obj, attr) 1234 1235@contextlib.contextmanager 1236def swap_item(obj, item, new_val): 1237 """Temporary swap out an item with a new object. 1238 1239 Usage: 1240 with swap_item(obj, "item", 5): 1241 ... 1242 1243 This will set obj["item"] to 5 for the duration of the with: block, 1244 restoring the old value at the end of the block. If `item` doesn't 1245 exist on `obj`, it will be created and then deleted at the end of the 1246 block. 1247 1248 The old value (or None if it doesn't exist) will be assigned to the 1249 target of the "as" clause, if there is one. 1250 """ 1251 if item in obj: 1252 real_val = obj[item] 1253 obj[item] = new_val 1254 try: 1255 yield real_val 1256 finally: 1257 obj[item] = real_val 1258 else: 1259 obj[item] = new_val 1260 try: 1261 yield 1262 finally: 1263 if item in obj: 1264 del obj[item] 1265 1266def args_from_interpreter_flags(): 1267 """Return a list of command-line arguments reproducing the current 1268 settings in sys.flags and sys.warnoptions.""" 1269 import subprocess 1270 return subprocess._args_from_interpreter_flags() 1271 1272def optim_args_from_interpreter_flags(): 1273 """Return a list of command-line arguments reproducing the current 1274 optimization settings in sys.flags.""" 1275 import subprocess 1276 return subprocess._optim_args_from_interpreter_flags() 1277 1278 1279class Matcher(object): 1280 1281 _partial_matches = ('msg', 'message') 1282 1283 def matches(self, d, **kwargs): 1284 """ 1285 Try to match a single dict with the supplied arguments. 1286 1287 Keys whose values are strings and which are in self._partial_matches 1288 will be checked for partial (i.e. substring) matches. You can extend 1289 this scheme to (for example) do regular expression matching, etc. 1290 """ 1291 result = True 1292 for k in kwargs: 1293 v = kwargs[k] 1294 dv = d.get(k) 1295 if not self.match_value(k, dv, v): 1296 result = False 1297 break 1298 return result 1299 1300 def match_value(self, k, dv, v): 1301 """ 1302 Try to match a single stored value (dv) with a supplied value (v). 1303 """ 1304 if type(v) != type(dv): 1305 result = False 1306 elif type(dv) is not str or k not in self._partial_matches: 1307 result = (v == dv) 1308 else: 1309 result = dv.find(v) >= 0 1310 return result 1311 1312 1313_buggy_ucrt = None 1314def skip_if_buggy_ucrt_strfptime(test): 1315 """ 1316 Skip decorator for tests that use buggy strptime/strftime 1317 1318 If the UCRT bugs are present time.localtime().tm_zone will be 1319 an empty string, otherwise we assume the UCRT bugs are fixed 1320 1321 See bpo-37552 [Windows] strptime/strftime return invalid 1322 results with UCRT version 17763.615 1323 """ 1324 import locale 1325 global _buggy_ucrt 1326 if _buggy_ucrt is None: 1327 if(sys.platform == 'win32' and 1328 locale.getdefaultlocale()[1] == 'cp65001' and 1329 time.localtime().tm_zone == ''): 1330 _buggy_ucrt = True 1331 else: 1332 _buggy_ucrt = False 1333 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 1334 1335class PythonSymlink: 1336 """Creates a symlink for the current Python executable""" 1337 def __init__(self, link=None): 1338 from .os_helper import TESTFN 1339 1340 self.link = link or os.path.abspath(TESTFN) 1341 self._linked = [] 1342 self.real = os.path.realpath(sys.executable) 1343 self._also_link = [] 1344 1345 self._env = None 1346 1347 self._platform_specific() 1348 1349 def _platform_specific(self): 1350 pass 1351 1352 if sys.platform == "win32": 1353 def _platform_specific(self): 1354 import glob 1355 import _winapi 1356 1357 if os.path.lexists(self.real) and not os.path.exists(self.real): 1358 # App symlink appears to not exist, but we want the 1359 # real executable here anyway 1360 self.real = _winapi.GetModuleFileName(0) 1361 1362 dll = _winapi.GetModuleFileName(sys.dllhandle) 1363 src_dir = os.path.dirname(dll) 1364 dest_dir = os.path.dirname(self.link) 1365 self._also_link.append(( 1366 dll, 1367 os.path.join(dest_dir, os.path.basename(dll)) 1368 )) 1369 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 1370 self._also_link.append(( 1371 runtime, 1372 os.path.join(dest_dir, os.path.basename(runtime)) 1373 )) 1374 1375 self._env = {k.upper(): os.getenv(k) for k in os.environ} 1376 self._env["PYTHONHOME"] = os.path.dirname(self.real) 1377 if sysconfig.is_python_build(True): 1378 self._env["PYTHONPATH"] = os.path.dirname(os.__file__) 1379 1380 def __enter__(self): 1381 os.symlink(self.real, self.link) 1382 self._linked.append(self.link) 1383 for real, link in self._also_link: 1384 os.symlink(real, link) 1385 self._linked.append(link) 1386 return self 1387 1388 def __exit__(self, exc_type, exc_value, exc_tb): 1389 for link in self._linked: 1390 try: 1391 os.remove(link) 1392 except IOError as ex: 1393 if verbose: 1394 print("failed to clean up {}: {}".format(link, ex)) 1395 1396 def _call(self, python, args, env, returncode): 1397 import subprocess 1398 cmd = [python, *args] 1399 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 1400 stderr=subprocess.PIPE, env=env) 1401 r = p.communicate() 1402 if p.returncode != returncode: 1403 if verbose: 1404 print(repr(r[0])) 1405 print(repr(r[1]), file=sys.stderr) 1406 raise RuntimeError( 1407 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 1408 return r 1409 1410 def call_real(self, *args, returncode=0): 1411 return self._call(self.real, args, None, returncode) 1412 1413 def call_link(self, *args, returncode=0): 1414 return self._call(self.link, args, self._env, returncode) 1415 1416 1417def skip_if_pgo_task(test): 1418 """Skip decorator for tests not run in (non-extended) PGO task""" 1419 ok = not PGO or PGO_EXTENDED 1420 msg = "Not run for (non-extended) PGO task" 1421 return test if ok else unittest.skip(msg)(test) 1422 1423 1424def detect_api_mismatch(ref_api, other_api, *, ignore=()): 1425 """Returns the set of items in ref_api not in other_api, except for a 1426 defined list of items to be ignored in this check. 1427 1428 By default this skips private attributes beginning with '_' but 1429 includes all magic methods, i.e. those starting and ending in '__'. 1430 """ 1431 missing_items = set(dir(ref_api)) - set(dir(other_api)) 1432 if ignore: 1433 missing_items -= set(ignore) 1434 missing_items = set(m for m in missing_items 1435 if not m.startswith('_') or m.endswith('__')) 1436 return missing_items 1437 1438 1439def check__all__(test_case, module, name_of_module=None, extra=(), 1440 not_exported=()): 1441 """Assert that the __all__ variable of 'module' contains all public names. 1442 1443 The module's public names (its API) are detected automatically based on 1444 whether they match the public name convention and were defined in 1445 'module'. 1446 1447 The 'name_of_module' argument can specify (as a string or tuple thereof) 1448 what module(s) an API could be defined in in order to be detected as a 1449 public API. One case for this is when 'module' imports part of its public 1450 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 1451 1452 The 'extra' argument can be a set of names that wouldn't otherwise be 1453 automatically detected as "public", like objects without a proper 1454 '__module__' attribute. If provided, it will be added to the 1455 automatically detected ones. 1456 1457 The 'not_exported' argument can be a set of names that must not be treated 1458 as part of the public API even though their names indicate otherwise. 1459 1460 Usage: 1461 import bar 1462 import foo 1463 import unittest 1464 from test import support 1465 1466 class MiscTestCase(unittest.TestCase): 1467 def test__all__(self): 1468 support.check__all__(self, foo) 1469 1470 class OtherTestCase(unittest.TestCase): 1471 def test__all__(self): 1472 extra = {'BAR_CONST', 'FOO_CONST'} 1473 not_exported = {'baz'} # Undocumented name. 1474 # bar imports part of its API from _bar. 1475 support.check__all__(self, bar, ('bar', '_bar'), 1476 extra=extra, not_exported=not_exported) 1477 1478 """ 1479 1480 if name_of_module is None: 1481 name_of_module = (module.__name__, ) 1482 elif isinstance(name_of_module, str): 1483 name_of_module = (name_of_module, ) 1484 1485 expected = set(extra) 1486 1487 for name in dir(module): 1488 if name.startswith('_') or name in not_exported: 1489 continue 1490 obj = getattr(module, name) 1491 if (getattr(obj, '__module__', None) in name_of_module or 1492 (not hasattr(obj, '__module__') and 1493 not isinstance(obj, types.ModuleType))): 1494 expected.add(name) 1495 test_case.assertCountEqual(module.__all__, expected) 1496 1497 1498def suppress_msvcrt_asserts(verbose=False): 1499 try: 1500 import msvcrt 1501 except ImportError: 1502 return 1503 1504 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 1505 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 1506 | msvcrt.SEM_NOGPFAULTERRORBOX 1507 | msvcrt.SEM_NOOPENFILEERRORBOX) 1508 1509 # CrtSetReportMode() is only available in debug build 1510 if hasattr(msvcrt, 'CrtSetReportMode'): 1511 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 1512 if verbose: 1513 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 1514 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 1515 else: 1516 msvcrt.CrtSetReportMode(m, 0) 1517 1518 1519class SuppressCrashReport: 1520 """Try to prevent a crash report from popping up. 1521 1522 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1523 disable the creation of coredump file. 1524 """ 1525 old_value = None 1526 old_modes = None 1527 1528 def __enter__(self): 1529 """On Windows, disable Windows Error Reporting dialogs using 1530 SetErrorMode() and CrtSetReportMode(). 1531 1532 On UNIX, try to save the previous core file size limit, then set 1533 soft limit to 0. 1534 """ 1535 if sys.platform.startswith('win'): 1536 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1537 try: 1538 import msvcrt 1539 except ImportError: 1540 return 1541 1542 self.old_value = msvcrt.GetErrorMode() 1543 1544 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 1545 1546 # bpo-23314: Suppress assert dialogs in debug builds. 1547 # CrtSetReportMode() is only available in debug build. 1548 if hasattr(msvcrt, 'CrtSetReportMode'): 1549 self.old_modes = {} 1550 for report_type in [msvcrt.CRT_WARN, 1551 msvcrt.CRT_ERROR, 1552 msvcrt.CRT_ASSERT]: 1553 old_mode = msvcrt.CrtSetReportMode(report_type, 1554 msvcrt.CRTDBG_MODE_FILE) 1555 old_file = msvcrt.CrtSetReportFile(report_type, 1556 msvcrt.CRTDBG_FILE_STDERR) 1557 self.old_modes[report_type] = old_mode, old_file 1558 1559 else: 1560 try: 1561 import resource 1562 self.resource = resource 1563 except ImportError: 1564 self.resource = None 1565 if self.resource is not None: 1566 try: 1567 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 1568 self.resource.setrlimit(self.resource.RLIMIT_CORE, 1569 (0, self.old_value[1])) 1570 except (ValueError, OSError): 1571 pass 1572 1573 if sys.platform == 'darwin': 1574 import subprocess 1575 # Check if the 'Crash Reporter' on OSX was configured 1576 # in 'Developer' mode and warn that it will get triggered 1577 # when it is. 1578 # 1579 # This assumes that this context manager is used in tests 1580 # that might trigger the next manager. 1581 cmd = ['/usr/bin/defaults', 'read', 1582 'com.apple.CrashReporter', 'DialogType'] 1583 proc = subprocess.Popen(cmd, 1584 stdout=subprocess.PIPE, 1585 stderr=subprocess.PIPE) 1586 with proc: 1587 stdout = proc.communicate()[0] 1588 if stdout.strip() == b'developer': 1589 print("this test triggers the Crash Reporter, " 1590 "that is intentional", end='', flush=True) 1591 1592 return self 1593 1594 def __exit__(self, *ignore_exc): 1595 """Restore Windows ErrorMode or core file behavior to initial value.""" 1596 if self.old_value is None: 1597 return 1598 1599 if sys.platform.startswith('win'): 1600 import msvcrt 1601 msvcrt.SetErrorMode(self.old_value) 1602 1603 if self.old_modes: 1604 for report_type, (old_mode, old_file) in self.old_modes.items(): 1605 msvcrt.CrtSetReportMode(report_type, old_mode) 1606 msvcrt.CrtSetReportFile(report_type, old_file) 1607 else: 1608 if self.resource is not None: 1609 try: 1610 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 1611 except (ValueError, OSError): 1612 pass 1613 1614 1615def patch(test_instance, object_to_patch, attr_name, new_value): 1616 """Override 'object_to_patch'.'attr_name' with 'new_value'. 1617 1618 Also, add a cleanup procedure to 'test_instance' to restore 1619 'object_to_patch' value for 'attr_name'. 1620 The 'attr_name' should be a valid attribute for 'object_to_patch'. 1621 1622 """ 1623 # check that 'attr_name' is a real attribute for 'object_to_patch' 1624 # will raise AttributeError if it does not exist 1625 getattr(object_to_patch, attr_name) 1626 1627 # keep a copy of the old value 1628 attr_is_local = False 1629 try: 1630 old_value = object_to_patch.__dict__[attr_name] 1631 except (AttributeError, KeyError): 1632 old_value = getattr(object_to_patch, attr_name, None) 1633 else: 1634 attr_is_local = True 1635 1636 # restore the value when the test is done 1637 def cleanup(): 1638 if attr_is_local: 1639 setattr(object_to_patch, attr_name, old_value) 1640 else: 1641 delattr(object_to_patch, attr_name) 1642 1643 test_instance.addCleanup(cleanup) 1644 1645 # actually override the attribute 1646 setattr(object_to_patch, attr_name, new_value) 1647 1648 1649def run_in_subinterp(code): 1650 """ 1651 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1652 module is enabled. 1653 """ 1654 # Issue #10915, #15751: PyGILState_*() functions don't work with 1655 # sub-interpreters, the tracemalloc module uses these functions internally 1656 try: 1657 import tracemalloc 1658 except ImportError: 1659 pass 1660 else: 1661 if tracemalloc.is_tracing(): 1662 raise unittest.SkipTest("run_in_subinterp() cannot be used " 1663 "if tracemalloc module is tracing " 1664 "memory allocations") 1665 import _testcapi 1666 return _testcapi.run_in_subinterp(code) 1667 1668 1669def check_free_after_iterating(test, iter, cls, args=()): 1670 class A(cls): 1671 def __del__(self): 1672 nonlocal done 1673 done = True 1674 try: 1675 next(it) 1676 except StopIteration: 1677 pass 1678 1679 done = False 1680 it = iter(A(*args)) 1681 # Issue 26494: Shouldn't crash 1682 test.assertRaises(StopIteration, next, it) 1683 # The sequence should be deallocated just after the end of iterating 1684 gc_collect() 1685 test.assertTrue(done) 1686 1687 1688def missing_compiler_executable(cmd_names=[]): 1689 """Check if the compiler components used to build the interpreter exist. 1690 1691 Check for the existence of the compiler executables whose names are listed 1692 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 1693 and return the first missing executable or None when none is found 1694 missing. 1695 1696 """ 1697 # TODO (PEP 632): alternate check without using distutils 1698 from distutils import ccompiler, sysconfig, spawn, errors 1699 compiler = ccompiler.new_compiler() 1700 sysconfig.customize_compiler(compiler) 1701 if compiler.compiler_type == "msvc": 1702 # MSVC has no executables, so check whether initialization succeeds 1703 try: 1704 compiler.initialize() 1705 except errors.DistutilsPlatformError: 1706 return "msvc" 1707 for name in compiler.executables: 1708 if cmd_names and name not in cmd_names: 1709 continue 1710 cmd = getattr(compiler, name) 1711 if cmd_names: 1712 assert cmd is not None, \ 1713 "the '%s' executable is not configured" % name 1714 elif not cmd: 1715 continue 1716 if spawn.find_executable(cmd[0]) is None: 1717 return cmd[0] 1718 1719 1720_is_android_emulator = None 1721def setswitchinterval(interval): 1722 # Setting a very low gil interval on the Android emulator causes python 1723 # to hang (issue #26939). 1724 minimum_interval = 1e-5 1725 if is_android and interval < minimum_interval: 1726 global _is_android_emulator 1727 if _is_android_emulator is None: 1728 import subprocess 1729 _is_android_emulator = (subprocess.check_output( 1730 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 1731 if _is_android_emulator: 1732 interval = minimum_interval 1733 return sys.setswitchinterval(interval) 1734 1735 1736@contextlib.contextmanager 1737def disable_faulthandler(): 1738 import faulthandler 1739 1740 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 1741 # sys.stderr with a StringIO which has no file descriptor when a test 1742 # is run with -W/--verbose3. 1743 fd = sys.__stderr__.fileno() 1744 1745 is_enabled = faulthandler.is_enabled() 1746 try: 1747 faulthandler.disable() 1748 yield 1749 finally: 1750 if is_enabled: 1751 faulthandler.enable(file=fd, all_threads=True) 1752 1753 1754class SaveSignals: 1755 """ 1756 Save and restore signal handlers. 1757 1758 This class is only able to save/restore signal handlers registered 1759 by the Python signal module: see bpo-13285 for "external" signal 1760 handlers. 1761 """ 1762 1763 def __init__(self): 1764 import signal 1765 self.signal = signal 1766 self.signals = signal.valid_signals() 1767 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 1768 for signame in ('SIGKILL', 'SIGSTOP'): 1769 try: 1770 signum = getattr(signal, signame) 1771 except AttributeError: 1772 continue 1773 self.signals.remove(signum) 1774 self.handlers = {} 1775 1776 def save(self): 1777 for signum in self.signals: 1778 handler = self.signal.getsignal(signum) 1779 if handler is None: 1780 # getsignal() returns None if a signal handler was not 1781 # registered by the Python signal module, 1782 # and the handler is not SIG_DFL nor SIG_IGN. 1783 # 1784 # Ignore the signal: we cannot restore the handler. 1785 continue 1786 self.handlers[signum] = handler 1787 1788 def restore(self): 1789 for signum, handler in self.handlers.items(): 1790 self.signal.signal(signum, handler) 1791 1792 1793def with_pymalloc(): 1794 import _testcapi 1795 return _testcapi.WITH_PYMALLOC 1796 1797 1798class _ALWAYS_EQ: 1799 """ 1800 Object that is equal to anything. 1801 """ 1802 def __eq__(self, other): 1803 return True 1804 def __ne__(self, other): 1805 return False 1806 1807ALWAYS_EQ = _ALWAYS_EQ() 1808 1809class _NEVER_EQ: 1810 """ 1811 Object that is not equal to anything. 1812 """ 1813 def __eq__(self, other): 1814 return False 1815 def __ne__(self, other): 1816 return True 1817 def __hash__(self): 1818 return 1 1819 1820NEVER_EQ = _NEVER_EQ() 1821 1822@functools.total_ordering 1823class _LARGEST: 1824 """ 1825 Object that is greater than anything (except itself). 1826 """ 1827 def __eq__(self, other): 1828 return isinstance(other, _LARGEST) 1829 def __lt__(self, other): 1830 return False 1831 1832LARGEST = _LARGEST() 1833 1834@functools.total_ordering 1835class _SMALLEST: 1836 """ 1837 Object that is less than anything (except itself). 1838 """ 1839 def __eq__(self, other): 1840 return isinstance(other, _SMALLEST) 1841 def __gt__(self, other): 1842 return False 1843 1844SMALLEST = _SMALLEST() 1845 1846def maybe_get_event_loop_policy(): 1847 """Return the global event loop policy if one is set, else return None.""" 1848 import asyncio.events 1849 return asyncio.events._event_loop_policy 1850 1851# Helpers for testing hashing. 1852NHASHBITS = sys.hash_info.width # number of bits in hash() result 1853assert NHASHBITS in (32, 64) 1854 1855# Return mean and sdev of number of collisions when tossing nballs balls 1856# uniformly at random into nbins bins. By definition, the number of 1857# collisions is the number of balls minus the number of occupied bins at 1858# the end. 1859def collision_stats(nbins, nballs): 1860 n, k = nbins, nballs 1861 # prob a bin empty after k trials = (1 - 1/n)**k 1862 # mean # empty is then n * (1 - 1/n)**k 1863 # so mean # occupied is n - n * (1 - 1/n)**k 1864 # so collisions = k - (n - n*(1 - 1/n)**k) 1865 # 1866 # For the variance: 1867 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 1868 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 1869 # 1870 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 1871 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 1872 # error-prone) efforts to rework the naive formulas to avoid those, 1873 # we use the `decimal` module to get plenty of extra precision. 1874 # 1875 # Note: the exact values are straightforward to compute with 1876 # rationals, but in context that's unbearably slow, requiring 1877 # multi-million bit arithmetic. 1878 import decimal 1879 with decimal.localcontext() as ctx: 1880 bits = n.bit_length() * 2 # bits in n**2 1881 # At least that many bits will likely cancel out. 1882 # Use that many decimal digits instead. 1883 ctx.prec = max(bits, 30) 1884 dn = decimal.Decimal(n) 1885 p1empty = ((dn - 1) / dn) ** k 1886 meanempty = n * p1empty 1887 occupied = n - meanempty 1888 collisions = k - occupied 1889 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 1890 return float(collisions), float(var.sqrt()) 1891 1892 1893class catch_unraisable_exception: 1894 """ 1895 Context manager catching unraisable exception using sys.unraisablehook. 1896 1897 Storing the exception value (cm.unraisable.exc_value) creates a reference 1898 cycle. The reference cycle is broken explicitly when the context manager 1899 exits. 1900 1901 Storing the object (cm.unraisable.object) can resurrect it if it is set to 1902 an object which is being finalized. Exiting the context manager clears the 1903 stored object. 1904 1905 Usage: 1906 1907 with support.catch_unraisable_exception() as cm: 1908 # code creating an "unraisable exception" 1909 ... 1910 1911 # check the unraisable exception: use cm.unraisable 1912 ... 1913 1914 # cm.unraisable attribute no longer exists at this point 1915 # (to break a reference cycle) 1916 """ 1917 1918 def __init__(self): 1919 self.unraisable = None 1920 self._old_hook = None 1921 1922 def _hook(self, unraisable): 1923 # Storing unraisable.object can resurrect an object which is being 1924 # finalized. Storing unraisable.exc_value creates a reference cycle. 1925 self.unraisable = unraisable 1926 1927 def __enter__(self): 1928 self._old_hook = sys.unraisablehook 1929 sys.unraisablehook = self._hook 1930 return self 1931 1932 def __exit__(self, *exc_info): 1933 sys.unraisablehook = self._old_hook 1934 del self.unraisable 1935 1936 1937def wait_process(pid, *, exitcode, timeout=None): 1938 """ 1939 Wait until process pid completes and check that the process exit code is 1940 exitcode. 1941 1942 Raise an AssertionError if the process exit code is not equal to exitcode. 1943 1944 If the process runs longer than timeout seconds (SHORT_TIMEOUT by default), 1945 kill the process (if signal.SIGKILL is available) and raise an 1946 AssertionError. The timeout feature is not available on Windows. 1947 """ 1948 if os.name != "nt": 1949 import signal 1950 1951 if timeout is None: 1952 timeout = SHORT_TIMEOUT 1953 t0 = time.monotonic() 1954 sleep = 0.001 1955 max_sleep = 0.1 1956 while True: 1957 pid2, status = os.waitpid(pid, os.WNOHANG) 1958 if pid2 != 0: 1959 break 1960 # process is still running 1961 1962 dt = time.monotonic() - t0 1963 if dt > SHORT_TIMEOUT: 1964 try: 1965 os.kill(pid, signal.SIGKILL) 1966 os.waitpid(pid, 0) 1967 except OSError: 1968 # Ignore errors like ChildProcessError or PermissionError 1969 pass 1970 1971 raise AssertionError(f"process {pid} is still running " 1972 f"after {dt:.1f} seconds") 1973 1974 sleep = min(sleep * 2, max_sleep) 1975 time.sleep(sleep) 1976 else: 1977 # Windows implementation 1978 pid2, status = os.waitpid(pid, 0) 1979 1980 exitcode2 = os.waitstatus_to_exitcode(status) 1981 if exitcode2 != exitcode: 1982 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 1983 f"but exit code {exitcode} is expected") 1984 1985 # sanity check: it should not fail in practice 1986 if pid2 != pid: 1987 raise AssertionError(f"pid {pid2} != pid {pid}") 1988 1989def skip_if_broken_multiprocessing_synchronize(): 1990 """ 1991 Skip tests if the multiprocessing.synchronize module is missing, if there 1992 is no available semaphore implementation, or if creating a lock raises an 1993 OSError (on Linux only). 1994 """ 1995 from .import_helper import import_module 1996 1997 # Skip tests if the _multiprocessing extension is missing. 1998 import_module('_multiprocessing') 1999 2000 # Skip tests if there is no available semaphore implementation: 2001 # multiprocessing.synchronize requires _multiprocessing.SemLock. 2002 synchronize = import_module('multiprocessing.synchronize') 2003 2004 if sys.platform == "linux": 2005 try: 2006 # bpo-38377: On Linux, creating a semaphore fails with OSError 2007 # if the current user does not have the permission to create 2008 # a file in /dev/shm/ directory. 2009 synchronize.Lock(ctx=None) 2010 except OSError as exc: 2011 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 2012 2013 2014@contextlib.contextmanager 2015def infinite_recursion(max_depth=75): 2016 original_depth = sys.getrecursionlimit() 2017 try: 2018 sys.setrecursionlimit(max_depth) 2019 yield 2020 finally: 2021 sys.setrecursionlimit(original_depth) 2022 2023 2024def check_disallow_instantiation(testcase, tp, *args, **kwds): 2025 """ 2026 Check that given type cannot be instantiated using *args and **kwds. 2027 2028 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. 2029 """ 2030 mod = tp.__module__ 2031 name = tp.__name__ 2032 if mod != 'builtins': 2033 qualname = f"{mod}.{name}" 2034 else: 2035 qualname = f"{name}" 2036 msg = f"cannot create '{re.escape(qualname)}' instances" 2037 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) 2038 2039 2040def ignore_deprecations_from(module: str, *, like: str) -> object: 2041 token = object() 2042 warnings.filterwarnings( 2043 "ignore", 2044 category=DeprecationWarning, 2045 module=module, 2046 message=like + fr"(?#support{id(token)})", 2047 ) 2048 return token 2049 2050 2051def clear_ignored_deprecations(*tokens: object) -> None: 2052 if not tokens: 2053 raise ValueError("Provide token or tokens returned by ignore_deprecations_from") 2054 2055 new_filters = [] 2056 endswith = tuple(rf"(?#support{id(token)})" for token in tokens) 2057 for action, message, category, module, lineno in warnings.filters: 2058 if action == "ignore" and category is DeprecationWarning: 2059 if isinstance(message, re.Pattern): 2060 msg = message.pattern 2061 else: 2062 msg = message or "" 2063 if msg.endswith(endswith): 2064 continue 2065 new_filters.append((action, message, category, module, lineno)) 2066 if warnings.filters != new_filters: 2067 warnings.filters[:] = new_filters 2068 warnings._filters_mutated() 2069 2070 2071@contextlib.contextmanager 2072def adjust_int_max_str_digits(max_digits): 2073 """Temporarily change the integer string conversion length limit.""" 2074 current = sys.get_int_max_str_digits() 2075 try: 2076 sys.set_int_max_str_digits(max_digits) 2077 yield 2078 finally: 2079 sys.set_int_max_str_digits(current) 2080