1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import contextlib 7import functools 8import os 9import re 10import stat 11import sys 12import sysconfig 13import time 14import types 15import unittest 16import warnings 17 18from .testresult import get_test_runner 19 20 21try: 22 from _testcapi import unicode_legacy_string 23except ImportError: 24 unicode_legacy_string = None 25 26__all__ = [ 27 # globals 28 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 29 # exceptions 30 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 31 # io 32 "record_original_stdout", "get_original_stdout", "captured_stdout", 33 "captured_stdin", "captured_stderr", 34 # unittest 35 "is_resource_enabled", "requires", "requires_freebsd_version", 36 "requires_linux_version", "requires_mac_ver", 37 "check_syntax_error", 38 "BasicTestRunner", "run_unittest", "run_doctest", 39 "requires_gzip", "requires_bz2", "requires_lzma", 40 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 41 "requires_IEEE_754", "requires_zlib", 42 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 43 "check__all__", "skip_if_buggy_ucrt_strfptime", 44 "check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer", 45 # sys 46 "is_jython", "is_android", "check_impl_detail", "unix_shell", 47 "setswitchinterval", 48 # network 49 "open_urlresource", 50 # processes 51 "reap_children", 52 # miscellaneous 53 "run_with_locale", "swap_item", "findfile", 54 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 55 "run_with_tz", "PGO", "missing_compiler_executable", 56 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 57 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 58 ] 59 60 61# Timeout in seconds for tests using a network server listening on the network 62# local loopback interface like 127.0.0.1. 63# 64# The timeout is long enough to prevent test failure: it takes into account 65# that the client and the server can run in different threads or even different 66# processes. 67# 68# The timeout should be long enough for connect(), recv() and send() methods 69# of socket.socket. 70LOOPBACK_TIMEOUT = 5.0 71if sys.platform == 'win32' and ' 32 bit (ARM)' in sys.version: 72 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 73 # seconds on Windows ARM32 buildbot 74 LOOPBACK_TIMEOUT = 10 75elif sys.platform == 'vxworks': 76 LOOPBACK_TIMEOUT = 10 77 78# Timeout in seconds for network requests going to the internet. The timeout is 79# short enough to prevent a test to wait for too long if the internet request 80# is blocked for whatever reason. 81# 82# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 83# but skip the test instead: see transient_internet(). 84INTERNET_TIMEOUT = 60.0 85 86# Timeout in seconds to mark a test as failed if the test takes "too long". 87# 88# The timeout value depends on the regrtest --timeout command line option. 89# 90# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 91# LONG_TIMEOUT instead. 92SHORT_TIMEOUT = 30.0 93 94# Timeout in seconds to detect when a test hangs. 95# 96# It is long enough to reduce the risk of test failure on the slowest Python 97# buildbots. It should not be used to mark a test as failed if the test takes 98# "too long". The timeout value depends on the regrtest --timeout command line 99# option. 100LONG_TIMEOUT = 5 * 60.0 101 102 103class Error(Exception): 104 """Base class for regression test exceptions.""" 105 106class TestFailed(Error): 107 """Test failed.""" 108 109class TestFailedWithDetails(TestFailed): 110 """Test failed.""" 111 def __init__(self, msg, errors, failures): 112 self.msg = msg 113 self.errors = errors 114 self.failures = failures 115 super().__init__(msg, errors, failures) 116 117 def __str__(self): 118 return self.msg 119 120class TestDidNotRun(Error): 121 """Test did not run any subtests.""" 122 123class ResourceDenied(unittest.SkipTest): 124 """Test skipped because it requested a disallowed resource. 125 126 This is raised when a test calls requires() for a resource that 127 has not be enabled. It is used to distinguish between expected 128 and unexpected skips. 129 """ 130 131def anticipate_failure(condition): 132 """Decorator to mark a test that is known to be broken in some cases 133 134 Any use of this decorator should have a comment identifying the 135 associated tracker issue. 136 """ 137 if condition: 138 return unittest.expectedFailure 139 return lambda f: f 140 141def load_package_tests(pkg_dir, loader, standard_tests, pattern): 142 """Generic load_tests implementation for simple test packages. 143 144 Most packages can implement load_tests using this function as follows: 145 146 def load_tests(*args): 147 return load_package_tests(os.path.dirname(__file__), *args) 148 """ 149 if pattern is None: 150 pattern = "test*" 151 top_dir = os.path.dirname( # Lib 152 os.path.dirname( # test 153 os.path.dirname(__file__))) # support 154 package_tests = loader.discover(start_dir=pkg_dir, 155 top_level_dir=top_dir, 156 pattern=pattern) 157 standard_tests.addTests(package_tests) 158 return standard_tests 159 160 161def get_attribute(obj, name): 162 """Get an attribute, raising SkipTest if AttributeError is raised.""" 163 try: 164 attribute = getattr(obj, name) 165 except AttributeError: 166 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 167 else: 168 return attribute 169 170verbose = 1 # Flag set to 0 by regrtest.py 171use_resources = None # Flag set to [] by regrtest.py 172max_memuse = 0 # Disable bigmem tests (they will still be run with 173 # small sizes, to make sure they work.) 174real_max_memuse = 0 175junit_xml_list = None # list of testsuite XML elements 176failfast = False 177 178# _original_stdout is meant to hold stdout at the time regrtest began. 179# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 180# The point is to have some flavor of stdout the user can actually see. 181_original_stdout = None 182def record_original_stdout(stdout): 183 global _original_stdout 184 _original_stdout = stdout 185 186def get_original_stdout(): 187 return _original_stdout or sys.stdout 188 189 190def _force_run(path, func, *args): 191 try: 192 return func(*args) 193 except OSError as err: 194 if verbose >= 2: 195 print('%s: %s' % (err.__class__.__name__, err)) 196 print('re-run %s%r' % (func.__name__, args)) 197 os.chmod(path, stat.S_IRWXU) 198 return func(*args) 199 200 201# Check whether a gui is actually available 202def _is_gui_available(): 203 if hasattr(_is_gui_available, 'result'): 204 return _is_gui_available.result 205 import platform 206 reason = None 207 if sys.platform.startswith('win') and platform.win32_is_iot(): 208 reason = "gui is not available on Windows IoT Core" 209 elif sys.platform.startswith('win'): 210 # if Python is running as a service (such as the buildbot service), 211 # gui interaction may be disallowed 212 import ctypes 213 import ctypes.wintypes 214 UOI_FLAGS = 1 215 WSF_VISIBLE = 0x0001 216 class USEROBJECTFLAGS(ctypes.Structure): 217 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 218 ("fReserved", ctypes.wintypes.BOOL), 219 ("dwFlags", ctypes.wintypes.DWORD)] 220 dll = ctypes.windll.user32 221 h = dll.GetProcessWindowStation() 222 if not h: 223 raise ctypes.WinError() 224 uof = USEROBJECTFLAGS() 225 needed = ctypes.wintypes.DWORD() 226 res = dll.GetUserObjectInformationW(h, 227 UOI_FLAGS, 228 ctypes.byref(uof), 229 ctypes.sizeof(uof), 230 ctypes.byref(needed)) 231 if not res: 232 raise ctypes.WinError() 233 if not bool(uof.dwFlags & WSF_VISIBLE): 234 reason = "gui not available (WSF_VISIBLE flag not set)" 235 elif sys.platform == 'darwin': 236 # The Aqua Tk implementations on OS X can abort the process if 237 # being called in an environment where a window server connection 238 # cannot be made, for instance when invoked by a buildbot or ssh 239 # process not running under the same user id as the current console 240 # user. To avoid that, raise an exception if the window manager 241 # connection is not available. 242 from ctypes import cdll, c_int, pointer, Structure 243 from ctypes.util import find_library 244 245 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 246 247 if app_services.CGMainDisplayID() == 0: 248 reason = "gui tests cannot run without OS X window manager" 249 else: 250 class ProcessSerialNumber(Structure): 251 _fields_ = [("highLongOfPSN", c_int), 252 ("lowLongOfPSN", c_int)] 253 psn = ProcessSerialNumber() 254 psn_p = pointer(psn) 255 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 256 (app_services.SetFrontProcess(psn_p) < 0) ): 257 reason = "cannot run without OS X gui process" 258 259 # check on every platform whether tkinter can actually do anything 260 if not reason: 261 try: 262 from tkinter import Tk 263 root = Tk() 264 root.withdraw() 265 root.update() 266 root.destroy() 267 except Exception as e: 268 err_string = str(e) 269 if len(err_string) > 50: 270 err_string = err_string[:50] + ' [...]' 271 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 272 err_string) 273 274 _is_gui_available.reason = reason 275 _is_gui_available.result = not reason 276 277 return _is_gui_available.result 278 279def is_resource_enabled(resource): 280 """Test whether a resource is enabled. 281 282 Known resources are set by regrtest.py. If not running under regrtest.py, 283 all resources are assumed enabled unless use_resources has been set. 284 """ 285 return use_resources is None or resource in use_resources 286 287def requires(resource, msg=None): 288 """Raise ResourceDenied if the specified resource is not available.""" 289 if not is_resource_enabled(resource): 290 if msg is None: 291 msg = "Use of the %r resource not enabled" % resource 292 raise ResourceDenied(msg) 293 if resource == 'gui' and not _is_gui_available(): 294 raise ResourceDenied(_is_gui_available.reason) 295 296def _requires_unix_version(sysname, min_version): 297 """Decorator raising SkipTest if the OS is `sysname` and the version is less 298 than `min_version`. 299 300 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 301 the FreeBSD version is less than 7.2. 302 """ 303 import platform 304 min_version_txt = '.'.join(map(str, min_version)) 305 version_txt = platform.release().split('-', 1)[0] 306 if platform.system() == sysname: 307 try: 308 version = tuple(map(int, version_txt.split('.'))) 309 except ValueError: 310 skip = False 311 else: 312 skip = version < min_version 313 else: 314 skip = False 315 316 return unittest.skipIf( 317 skip, 318 f"{sysname} version {min_version_txt} or higher required, not " 319 f"{version_txt}" 320 ) 321 322 323def requires_freebsd_version(*min_version): 324 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 325 less than `min_version`. 326 327 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 328 version is less than 7.2. 329 """ 330 return _requires_unix_version('FreeBSD', min_version) 331 332def requires_linux_version(*min_version): 333 """Decorator raising SkipTest if the OS is Linux and the Linux version is 334 less than `min_version`. 335 336 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 337 version is less than 2.6.32. 338 """ 339 return _requires_unix_version('Linux', min_version) 340 341def requires_mac_ver(*min_version): 342 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 343 version if less than min_version. 344 345 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 346 is lesser than 10.5. 347 """ 348 def decorator(func): 349 @functools.wraps(func) 350 def wrapper(*args, **kw): 351 if sys.platform == 'darwin': 352 import platform 353 version_txt = platform.mac_ver()[0] 354 try: 355 version = tuple(map(int, version_txt.split('.'))) 356 except ValueError: 357 pass 358 else: 359 if version < min_version: 360 min_version_txt = '.'.join(map(str, min_version)) 361 raise unittest.SkipTest( 362 "Mac OS X %s or higher required, not %s" 363 % (min_version_txt, version_txt)) 364 return func(*args, **kw) 365 wrapper.min_version = min_version 366 return wrapper 367 return decorator 368 369 370def check_sanitizer(*, address=False, memory=False, ub=False): 371 """Returns True if Python is compiled with sanitizer support""" 372 if not (address or memory or ub): 373 raise ValueError('At least one of address, memory, or ub must be True') 374 375 376 _cflags = sysconfig.get_config_var('CFLAGS') or '' 377 _config_args = sysconfig.get_config_var('CONFIG_ARGS') or '' 378 memory_sanitizer = ( 379 '-fsanitize=memory' in _cflags or 380 '--with-memory-sanitizer' in _config_args 381 ) 382 address_sanitizer = ( 383 '-fsanitize=address' in _cflags or 384 '--with-memory-sanitizer' in _config_args 385 ) 386 ub_sanitizer = ( 387 '-fsanitize=undefined' in _cflags or 388 '--with-undefined-behavior-sanitizer' in _config_args 389 ) 390 return ( 391 (memory and memory_sanitizer) or 392 (address and address_sanitizer) or 393 (ub and ub_sanitizer) 394 ) 395 396 397def skip_if_sanitizer(reason=None, *, address=False, memory=False, ub=False): 398 """Decorator raising SkipTest if running with a sanitizer active.""" 399 if not reason: 400 reason = 'not working with sanitizers active' 401 skip = check_sanitizer(address=address, memory=memory, ub=ub) 402 return unittest.skipIf(skip, reason) 403 404 405def system_must_validate_cert(f): 406 """Skip the test on TLS certificate validation failures.""" 407 @functools.wraps(f) 408 def dec(*args, **kwargs): 409 try: 410 f(*args, **kwargs) 411 except OSError as e: 412 if "CERTIFICATE_VERIFY_FAILED" in str(e): 413 raise unittest.SkipTest("system does not contain " 414 "necessary certificates") 415 raise 416 return dec 417 418# A constant likely larger than the underlying OS pipe buffer size, to 419# make writes blocking. 420# Windows limit seems to be around 512 B, and many Unix kernels have a 421# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 422# (see issue #17835 for a discussion of this number). 423PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 424 425# A constant likely larger than the underlying OS socket buffer size, to make 426# writes blocking. 427# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 428# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 429# for a discussion of this number. 430SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 431 432# decorator for skipping tests on non-IEEE 754 platforms 433requires_IEEE_754 = unittest.skipUnless( 434 float.__getformat__("double").startswith("IEEE"), 435 "test requires IEEE 754 doubles") 436 437def requires_zlib(reason='requires zlib'): 438 try: 439 import zlib 440 except ImportError: 441 zlib = None 442 return unittest.skipUnless(zlib, reason) 443 444def requires_gzip(reason='requires gzip'): 445 try: 446 import gzip 447 except ImportError: 448 gzip = None 449 return unittest.skipUnless(gzip, reason) 450 451def requires_bz2(reason='requires bz2'): 452 try: 453 import bz2 454 except ImportError: 455 bz2 = None 456 return unittest.skipUnless(bz2, reason) 457 458def requires_lzma(reason='requires lzma'): 459 try: 460 import lzma 461 except ImportError: 462 lzma = None 463 return unittest.skipUnless(lzma, reason) 464 465requires_legacy_unicode_capi = unittest.skipUnless(unicode_legacy_string, 466 'requires legacy Unicode C API') 467 468is_jython = sys.platform.startswith('java') 469 470is_android = hasattr(sys, 'getandroidapilevel') 471 472if sys.platform not in ('win32', 'vxworks'): 473 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 474else: 475 unix_shell = None 476 477# Define the URL of a dedicated HTTP server for the network tests. 478# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 479TEST_HTTP_URL = "http://www.pythontest.net" 480 481# Set by libregrtest/main.py so we can skip tests that are not 482# useful for PGO 483PGO = False 484 485# Set by libregrtest/main.py if we are running the extended (time consuming) 486# PGO task. If this is True, PGO is also True. 487PGO_EXTENDED = False 488 489# TEST_HOME_DIR refers to the top level directory of the "test" package 490# that contains Python's regression test suite 491TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 492TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 493 494# TEST_DATA_DIR is used as a target download location for remote resources 495TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 496 497 498def darwin_malloc_err_warning(test_name): 499 """Assure user that loud errors generated by macOS libc's malloc are 500 expected.""" 501 if sys.platform != 'darwin': 502 return 503 504 import shutil 505 msg = ' NOTICE ' 506 detail = (f'{test_name} may generate "malloc can\'t allocate region"\n' 507 'warnings on macOS systems. This behavior is known. Do not\n' 508 'report a bug unless tests are also failing. See bpo-40928.') 509 510 padding, _ = shutil.get_terminal_size() 511 print(msg.center(padding, '-')) 512 print(detail) 513 print('-' * padding) 514 515 516def findfile(filename, subdir=None): 517 """Try to find a file on sys.path or in the test directory. If it is not 518 found the argument passed to the function is returned (this does not 519 necessarily signal failure; could still be the legitimate path). 520 521 Setting *subdir* indicates a relative path to use to find the file 522 rather than looking directly in the path directories. 523 """ 524 if os.path.isabs(filename): 525 return filename 526 if subdir is not None: 527 filename = os.path.join(subdir, filename) 528 path = [TEST_HOME_DIR] + sys.path 529 for dn in path: 530 fn = os.path.join(dn, filename) 531 if os.path.exists(fn): return fn 532 return filename 533 534 535def sortdict(dict): 536 "Like repr(dict), but in sorted order." 537 items = sorted(dict.items()) 538 reprpairs = ["%r: %r" % pair for pair in items] 539 withcommas = ", ".join(reprpairs) 540 return "{%s}" % withcommas 541 542def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 543 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 544 compile(statement, '<test string>', 'exec') 545 err = cm.exception 546 testcase.assertIsNotNone(err.lineno) 547 if lineno is not None: 548 testcase.assertEqual(err.lineno, lineno) 549 testcase.assertIsNotNone(err.offset) 550 if offset is not None: 551 testcase.assertEqual(err.offset, offset) 552 553 554def open_urlresource(url, *args, **kw): 555 import urllib.request, urllib.parse 556 from .os_helper import unlink 557 try: 558 import gzip 559 except ImportError: 560 gzip = None 561 562 check = kw.pop('check', None) 563 564 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 565 566 fn = os.path.join(TEST_DATA_DIR, filename) 567 568 def check_valid_file(fn): 569 f = open(fn, *args, **kw) 570 if check is None: 571 return f 572 elif check(f): 573 f.seek(0) 574 return f 575 f.close() 576 577 if os.path.exists(fn): 578 f = check_valid_file(fn) 579 if f is not None: 580 return f 581 unlink(fn) 582 583 # Verify the requirement before downloading the file 584 requires('urlfetch') 585 586 if verbose: 587 print('\tfetching %s ...' % url, file=get_original_stdout()) 588 opener = urllib.request.build_opener() 589 if gzip: 590 opener.addheaders.append(('Accept-Encoding', 'gzip')) 591 f = opener.open(url, timeout=INTERNET_TIMEOUT) 592 if gzip and f.headers.get('Content-Encoding') == 'gzip': 593 f = gzip.GzipFile(fileobj=f) 594 try: 595 with open(fn, "wb") as out: 596 s = f.read() 597 while s: 598 out.write(s) 599 s = f.read() 600 finally: 601 f.close() 602 603 f = check_valid_file(fn) 604 if f is not None: 605 return f 606 raise TestFailed('invalid resource %r' % fn) 607 608 609@contextlib.contextmanager 610def captured_output(stream_name): 611 """Return a context manager used by captured_stdout/stdin/stderr 612 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 613 import io 614 orig_stdout = getattr(sys, stream_name) 615 setattr(sys, stream_name, io.StringIO()) 616 try: 617 yield getattr(sys, stream_name) 618 finally: 619 setattr(sys, stream_name, orig_stdout) 620 621def captured_stdout(): 622 """Capture the output of sys.stdout: 623 624 with captured_stdout() as stdout: 625 print("hello") 626 self.assertEqual(stdout.getvalue(), "hello\\n") 627 """ 628 return captured_output("stdout") 629 630def captured_stderr(): 631 """Capture the output of sys.stderr: 632 633 with captured_stderr() as stderr: 634 print("hello", file=sys.stderr) 635 self.assertEqual(stderr.getvalue(), "hello\\n") 636 """ 637 return captured_output("stderr") 638 639def captured_stdin(): 640 """Capture the input to sys.stdin: 641 642 with captured_stdin() as stdin: 643 stdin.write('hello\\n') 644 stdin.seek(0) 645 # call test code that consumes from sys.stdin 646 captured = input() 647 self.assertEqual(captured, "hello") 648 """ 649 return captured_output("stdin") 650 651 652def gc_collect(): 653 """Force as many objects as possible to be collected. 654 655 In non-CPython implementations of Python, this is needed because timely 656 deallocation is not guaranteed by the garbage collector. (Even in CPython 657 this can be the case in case of reference cycles.) This means that __del__ 658 methods may be called later than expected and weakrefs may remain alive for 659 longer than expected. This function tries its best to force all garbage 660 objects to disappear. 661 """ 662 import gc 663 gc.collect() 664 if is_jython: 665 time.sleep(0.1) 666 gc.collect() 667 gc.collect() 668 669@contextlib.contextmanager 670def disable_gc(): 671 import gc 672 have_gc = gc.isenabled() 673 gc.disable() 674 try: 675 yield 676 finally: 677 if have_gc: 678 gc.enable() 679 680 681def python_is_optimized(): 682 """Find if Python was built with optimizations.""" 683 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 684 final_opt = "" 685 for opt in cflags.split(): 686 if opt.startswith('-O'): 687 final_opt = opt 688 return final_opt not in ('', '-O0', '-Og') 689 690 691_header = 'nP' 692_align = '0n' 693if hasattr(sys, "getobjects"): 694 _header = '2P' + _header 695 _align = '0P' 696_vheader = _header + 'n' 697 698def calcobjsize(fmt): 699 import struct 700 return struct.calcsize(_header + fmt + _align) 701 702def calcvobjsize(fmt): 703 import struct 704 return struct.calcsize(_vheader + fmt + _align) 705 706 707_TPFLAGS_HAVE_GC = 1<<14 708_TPFLAGS_HEAPTYPE = 1<<9 709 710def check_sizeof(test, o, size): 711 import _testinternalcapi 712 result = sys.getsizeof(o) 713 # add GC header size 714 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 715 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 716 size += _testinternalcapi.SIZEOF_PYGC_HEAD 717 msg = 'wrong size for %s: got %d, expected %d' \ 718 % (type(o), result, size) 719 test.assertEqual(result, size, msg) 720 721#======================================================================= 722# Decorator for running a function in a different locale, correctly resetting 723# it afterwards. 724 725@contextlib.contextmanager 726def run_with_locale(catstr, *locales): 727 try: 728 import locale 729 category = getattr(locale, catstr) 730 orig_locale = locale.setlocale(category) 731 except AttributeError: 732 # if the test author gives us an invalid category string 733 raise 734 except: 735 # cannot retrieve original locale, so do nothing 736 locale = orig_locale = None 737 else: 738 for loc in locales: 739 try: 740 locale.setlocale(category, loc) 741 break 742 except: 743 pass 744 745 try: 746 yield 747 finally: 748 if locale and orig_locale: 749 locale.setlocale(category, orig_locale) 750 751#======================================================================= 752# Decorator for running a function in a specific timezone, correctly 753# resetting it afterwards. 754 755def run_with_tz(tz): 756 def decorator(func): 757 def inner(*args, **kwds): 758 try: 759 tzset = time.tzset 760 except AttributeError: 761 raise unittest.SkipTest("tzset required") 762 if 'TZ' in os.environ: 763 orig_tz = os.environ['TZ'] 764 else: 765 orig_tz = None 766 os.environ['TZ'] = tz 767 tzset() 768 769 # now run the function, resetting the tz on exceptions 770 try: 771 return func(*args, **kwds) 772 finally: 773 if orig_tz is None: 774 del os.environ['TZ'] 775 else: 776 os.environ['TZ'] = orig_tz 777 time.tzset() 778 779 inner.__name__ = func.__name__ 780 inner.__doc__ = func.__doc__ 781 return inner 782 return decorator 783 784#======================================================================= 785# Big-memory-test support. Separate from 'resources' because memory use 786# should be configurable. 787 788# Some handy shorthands. Note that these are used for byte-limits as well 789# as size-limits, in the various bigmem tests 790_1M = 1024*1024 791_1G = 1024 * _1M 792_2G = 2 * _1G 793_4G = 4 * _1G 794 795MAX_Py_ssize_t = sys.maxsize 796 797def set_memlimit(limit): 798 global max_memuse 799 global real_max_memuse 800 sizes = { 801 'k': 1024, 802 'm': _1M, 803 'g': _1G, 804 't': 1024*_1G, 805 } 806 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 807 re.IGNORECASE | re.VERBOSE) 808 if m is None: 809 raise ValueError('Invalid memory limit %r' % (limit,)) 810 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 811 real_max_memuse = memlimit 812 if memlimit > MAX_Py_ssize_t: 813 memlimit = MAX_Py_ssize_t 814 if memlimit < _2G - 1: 815 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 816 max_memuse = memlimit 817 818class _MemoryWatchdog: 819 """An object which periodically watches the process' memory consumption 820 and prints it out. 821 """ 822 823 def __init__(self): 824 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 825 self.started = False 826 827 def start(self): 828 import warnings 829 try: 830 f = open(self.procfile, 'r') 831 except OSError as e: 832 warnings.warn('/proc not available for stats: {}'.format(e), 833 RuntimeWarning) 834 sys.stderr.flush() 835 return 836 837 import subprocess 838 with f: 839 watchdog_script = findfile("memory_watchdog.py") 840 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 841 stdin=f, 842 stderr=subprocess.DEVNULL) 843 self.started = True 844 845 def stop(self): 846 if self.started: 847 self.mem_watchdog.terminate() 848 self.mem_watchdog.wait() 849 850 851def bigmemtest(size, memuse, dry_run=True): 852 """Decorator for bigmem tests. 853 854 'size' is a requested size for the test (in arbitrary, test-interpreted 855 units.) 'memuse' is the number of bytes per unit for the test, or a good 856 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 857 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 858 859 The 'size' argument is normally passed to the decorated test method as an 860 extra argument. If 'dry_run' is true, the value passed to the test method 861 may be less than the requested value. If 'dry_run' is false, it means the 862 test doesn't support dummy runs when -M is not specified. 863 """ 864 def decorator(f): 865 def wrapper(self): 866 size = wrapper.size 867 memuse = wrapper.memuse 868 if not real_max_memuse: 869 maxsize = 5147 870 else: 871 maxsize = size 872 873 if ((real_max_memuse or not dry_run) 874 and real_max_memuse < maxsize * memuse): 875 raise unittest.SkipTest( 876 "not enough memory: %.1fG minimum needed" 877 % (size * memuse / (1024 ** 3))) 878 879 if real_max_memuse and verbose: 880 print() 881 print(" ... expected peak memory use: {peak:.1f}G" 882 .format(peak=size * memuse / (1024 ** 3))) 883 watchdog = _MemoryWatchdog() 884 watchdog.start() 885 else: 886 watchdog = None 887 888 try: 889 return f(self, maxsize) 890 finally: 891 if watchdog: 892 watchdog.stop() 893 894 wrapper.size = size 895 wrapper.memuse = memuse 896 return wrapper 897 return decorator 898 899def bigaddrspacetest(f): 900 """Decorator for tests that fill the address space.""" 901 def wrapper(self): 902 if max_memuse < MAX_Py_ssize_t: 903 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 904 raise unittest.SkipTest( 905 "not enough memory: try a 32-bit build instead") 906 else: 907 raise unittest.SkipTest( 908 "not enough memory: %.1fG minimum needed" 909 % (MAX_Py_ssize_t / (1024 ** 3))) 910 else: 911 return f(self) 912 return wrapper 913 914#======================================================================= 915# unittest integration. 916 917class BasicTestRunner: 918 def run(self, test): 919 result = unittest.TestResult() 920 test(result) 921 return result 922 923def _id(obj): 924 return obj 925 926def requires_resource(resource): 927 if resource == 'gui' and not _is_gui_available(): 928 return unittest.skip(_is_gui_available.reason) 929 if is_resource_enabled(resource): 930 return _id 931 else: 932 return unittest.skip("resource {0!r} is not enabled".format(resource)) 933 934def cpython_only(test): 935 """ 936 Decorator for tests only applicable on CPython. 937 """ 938 return impl_detail(cpython=True)(test) 939 940def impl_detail(msg=None, **guards): 941 if check_impl_detail(**guards): 942 return _id 943 if msg is None: 944 guardnames, default = _parse_guards(guards) 945 if default: 946 msg = "implementation detail not available on {0}" 947 else: 948 msg = "implementation detail specific to {0}" 949 guardnames = sorted(guardnames.keys()) 950 msg = msg.format(' or '.join(guardnames)) 951 return unittest.skip(msg) 952 953def _parse_guards(guards): 954 # Returns a tuple ({platform_name: run_me}, default_value) 955 if not guards: 956 return ({'cpython': True}, False) 957 is_true = list(guards.values())[0] 958 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 959 return (guards, not is_true) 960 961# Use the following check to guard CPython's implementation-specific tests -- 962# or to run them only on the implementation(s) guarded by the arguments. 963def check_impl_detail(**guards): 964 """This function returns True or False depending on the host platform. 965 Examples: 966 if check_impl_detail(): # only on CPython (default) 967 if check_impl_detail(jython=True): # only on Jython 968 if check_impl_detail(cpython=False): # everywhere except on CPython 969 """ 970 guards, default = _parse_guards(guards) 971 return guards.get(sys.implementation.name, default) 972 973 974def no_tracing(func): 975 """Decorator to temporarily turn off tracing for the duration of a test.""" 976 if not hasattr(sys, 'gettrace'): 977 return func 978 else: 979 @functools.wraps(func) 980 def wrapper(*args, **kwargs): 981 original_trace = sys.gettrace() 982 try: 983 sys.settrace(None) 984 return func(*args, **kwargs) 985 finally: 986 sys.settrace(original_trace) 987 return wrapper 988 989 990def refcount_test(test): 991 """Decorator for tests which involve reference counting. 992 993 To start, the decorator does not run the test if is not run by CPython. 994 After that, any trace function is unset during the test to prevent 995 unexpected refcounts caused by the trace function. 996 997 """ 998 return no_tracing(cpython_only(test)) 999 1000 1001def _filter_suite(suite, pred): 1002 """Recursively filter test cases in a suite based on a predicate.""" 1003 newtests = [] 1004 for test in suite._tests: 1005 if isinstance(test, unittest.TestSuite): 1006 _filter_suite(test, pred) 1007 newtests.append(test) 1008 else: 1009 if pred(test): 1010 newtests.append(test) 1011 suite._tests = newtests 1012 1013def _run_suite(suite): 1014 """Run tests from a unittest.TestSuite-derived class.""" 1015 runner = get_test_runner(sys.stdout, 1016 verbosity=verbose, 1017 capture_output=(junit_xml_list is not None)) 1018 1019 result = runner.run(suite) 1020 1021 if junit_xml_list is not None: 1022 junit_xml_list.append(result.get_xml_element()) 1023 1024 if not result.testsRun and not result.skipped: 1025 raise TestDidNotRun 1026 if not result.wasSuccessful(): 1027 if len(result.errors) == 1 and not result.failures: 1028 err = result.errors[0][1] 1029 elif len(result.failures) == 1 and not result.errors: 1030 err = result.failures[0][1] 1031 else: 1032 err = "multiple errors occurred" 1033 if not verbose: err += "; run in verbose mode for details" 1034 errors = [(str(tc), exc_str) for tc, exc_str in result.errors] 1035 failures = [(str(tc), exc_str) for tc, exc_str in result.failures] 1036 raise TestFailedWithDetails(err, errors, failures) 1037 1038 1039# By default, don't filter tests 1040_match_test_func = None 1041 1042_accept_test_patterns = None 1043_ignore_test_patterns = None 1044 1045 1046def match_test(test): 1047 # Function used by support.run_unittest() and regrtest --list-cases 1048 if _match_test_func is None: 1049 return True 1050 else: 1051 return _match_test_func(test.id()) 1052 1053 1054def _is_full_match_test(pattern): 1055 # If a pattern contains at least one dot, it's considered 1056 # as a full test identifier. 1057 # Example: 'test.test_os.FileTests.test_access'. 1058 # 1059 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 1060 # or '[!...]'. For example, ignore 'test_access*'. 1061 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1062 1063 1064def set_match_tests(accept_patterns=None, ignore_patterns=None): 1065 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 1066 1067 1068 if accept_patterns is None: 1069 accept_patterns = () 1070 if ignore_patterns is None: 1071 ignore_patterns = () 1072 1073 accept_func = ignore_func = None 1074 1075 if accept_patterns != _accept_test_patterns: 1076 accept_patterns, accept_func = _compile_match_function(accept_patterns) 1077 if ignore_patterns != _ignore_test_patterns: 1078 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 1079 1080 # Create a copy since patterns can be mutable and so modified later 1081 _accept_test_patterns = tuple(accept_patterns) 1082 _ignore_test_patterns = tuple(ignore_patterns) 1083 1084 if accept_func is not None or ignore_func is not None: 1085 def match_function(test_id): 1086 accept = True 1087 ignore = False 1088 if accept_func: 1089 accept = accept_func(test_id) 1090 if ignore_func: 1091 ignore = ignore_func(test_id) 1092 return accept and not ignore 1093 1094 _match_test_func = match_function 1095 1096 1097def _compile_match_function(patterns): 1098 if not patterns: 1099 func = None 1100 # set_match_tests(None) behaves as set_match_tests(()) 1101 patterns = () 1102 elif all(map(_is_full_match_test, patterns)): 1103 # Simple case: all patterns are full test identifier. 1104 # The test.bisect_cmd utility only uses such full test identifiers. 1105 func = set(patterns).__contains__ 1106 else: 1107 import fnmatch 1108 regex = '|'.join(map(fnmatch.translate, patterns)) 1109 # The search *is* case sensitive on purpose: 1110 # don't use flags=re.IGNORECASE 1111 regex_match = re.compile(regex).match 1112 1113 def match_test_regex(test_id): 1114 if regex_match(test_id): 1115 # The regex matches the whole identifier, for example 1116 # 'test.test_os.FileTests.test_access'. 1117 return True 1118 else: 1119 # Try to match parts of the test identifier. 1120 # For example, split 'test.test_os.FileTests.test_access' 1121 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1122 return any(map(regex_match, test_id.split("."))) 1123 1124 func = match_test_regex 1125 1126 return patterns, func 1127 1128 1129def run_unittest(*classes): 1130 """Run tests from unittest.TestCase-derived classes.""" 1131 valid_types = (unittest.TestSuite, unittest.TestCase) 1132 suite = unittest.TestSuite() 1133 for cls in classes: 1134 if isinstance(cls, str): 1135 if cls in sys.modules: 1136 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1137 else: 1138 raise ValueError("str arguments must be keys in sys.modules") 1139 elif isinstance(cls, valid_types): 1140 suite.addTest(cls) 1141 else: 1142 suite.addTest(unittest.makeSuite(cls)) 1143 _filter_suite(suite, match_test) 1144 _run_suite(suite) 1145 1146#======================================================================= 1147# Check for the presence of docstrings. 1148 1149# Rather than trying to enumerate all the cases where docstrings may be 1150# disabled, we just check for that directly 1151 1152def _check_docstrings(): 1153 """Just used to check if docstrings are enabled""" 1154 1155MISSING_C_DOCSTRINGS = (check_impl_detail() and 1156 sys.platform != 'win32' and 1157 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1158 1159HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1160 not MISSING_C_DOCSTRINGS) 1161 1162requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1163 "test requires docstrings") 1164 1165 1166#======================================================================= 1167# doctest driver. 1168 1169def run_doctest(module, verbosity=None, optionflags=0): 1170 """Run doctest on the given module. Return (#failures, #tests). 1171 1172 If optional argument verbosity is not specified (or is None), pass 1173 support's belief about verbosity on to doctest. Else doctest's 1174 usual behavior is used (it searches sys.argv for -v). 1175 """ 1176 1177 import doctest 1178 1179 if verbosity is None: 1180 verbosity = verbose 1181 else: 1182 verbosity = None 1183 1184 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1185 if f: 1186 raise TestFailed("%d of %d doctests failed" % (f, t)) 1187 if verbose: 1188 print('doctest (%s) ... %d tests with zero failures' % 1189 (module.__name__, t)) 1190 return f, t 1191 1192 1193#======================================================================= 1194# Support for saving and restoring the imported modules. 1195 1196def print_warning(msg): 1197 # bpo-39983: Print into sys.__stderr__ to display the warning even 1198 # when sys.stderr is captured temporarily by a test 1199 for line in msg.splitlines(): 1200 print(f"Warning -- {line}", file=sys.__stderr__, flush=True) 1201 1202 1203# Flag used by saved_test_environment of test.libregrtest.save_env, 1204# to check if a test modified the environment. The flag should be set to False 1205# before running a new test. 1206# 1207# For example, threading_helper.threading_cleanup() sets the flag is the function fails 1208# to cleanup threads. 1209environment_altered = False 1210 1211def reap_children(): 1212 """Use this function at the end of test_main() whenever sub-processes 1213 are started. This will help ensure that no extra children (zombies) 1214 stick around to hog resources and create problems when looking 1215 for refleaks. 1216 """ 1217 global environment_altered 1218 1219 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 1220 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 1221 return 1222 1223 # Reap all our dead child processes so we don't leave zombies around. 1224 # These hog resources and might be causing some of the buildbots to die. 1225 while True: 1226 try: 1227 # Read the exit status of any child process which already completed 1228 pid, status = os.waitpid(-1, os.WNOHANG) 1229 except OSError: 1230 break 1231 1232 if pid == 0: 1233 break 1234 1235 print_warning(f"reap_children() reaped child process {pid}") 1236 environment_altered = True 1237 1238 1239@contextlib.contextmanager 1240def swap_attr(obj, attr, new_val): 1241 """Temporary swap out an attribute with a new object. 1242 1243 Usage: 1244 with swap_attr(obj, "attr", 5): 1245 ... 1246 1247 This will set obj.attr to 5 for the duration of the with: block, 1248 restoring the old value at the end of the block. If `attr` doesn't 1249 exist on `obj`, it will be created and then deleted at the end of the 1250 block. 1251 1252 The old value (or None if it doesn't exist) will be assigned to the 1253 target of the "as" clause, if there is one. 1254 """ 1255 if hasattr(obj, attr): 1256 real_val = getattr(obj, attr) 1257 setattr(obj, attr, new_val) 1258 try: 1259 yield real_val 1260 finally: 1261 setattr(obj, attr, real_val) 1262 else: 1263 setattr(obj, attr, new_val) 1264 try: 1265 yield 1266 finally: 1267 if hasattr(obj, attr): 1268 delattr(obj, attr) 1269 1270@contextlib.contextmanager 1271def swap_item(obj, item, new_val): 1272 """Temporary swap out an item with a new object. 1273 1274 Usage: 1275 with swap_item(obj, "item", 5): 1276 ... 1277 1278 This will set obj["item"] to 5 for the duration of the with: block, 1279 restoring the old value at the end of the block. If `item` doesn't 1280 exist on `obj`, it will be created and then deleted at the end of the 1281 block. 1282 1283 The old value (or None if it doesn't exist) will be assigned to the 1284 target of the "as" clause, if there is one. 1285 """ 1286 if item in obj: 1287 real_val = obj[item] 1288 obj[item] = new_val 1289 try: 1290 yield real_val 1291 finally: 1292 obj[item] = real_val 1293 else: 1294 obj[item] = new_val 1295 try: 1296 yield 1297 finally: 1298 if item in obj: 1299 del obj[item] 1300 1301def args_from_interpreter_flags(): 1302 """Return a list of command-line arguments reproducing the current 1303 settings in sys.flags and sys.warnoptions.""" 1304 import subprocess 1305 return subprocess._args_from_interpreter_flags() 1306 1307def optim_args_from_interpreter_flags(): 1308 """Return a list of command-line arguments reproducing the current 1309 optimization settings in sys.flags.""" 1310 import subprocess 1311 return subprocess._optim_args_from_interpreter_flags() 1312 1313 1314class Matcher(object): 1315 1316 _partial_matches = ('msg', 'message') 1317 1318 def matches(self, d, **kwargs): 1319 """ 1320 Try to match a single dict with the supplied arguments. 1321 1322 Keys whose values are strings and which are in self._partial_matches 1323 will be checked for partial (i.e. substring) matches. You can extend 1324 this scheme to (for example) do regular expression matching, etc. 1325 """ 1326 result = True 1327 for k in kwargs: 1328 v = kwargs[k] 1329 dv = d.get(k) 1330 if not self.match_value(k, dv, v): 1331 result = False 1332 break 1333 return result 1334 1335 def match_value(self, k, dv, v): 1336 """ 1337 Try to match a single stored value (dv) with a supplied value (v). 1338 """ 1339 if type(v) != type(dv): 1340 result = False 1341 elif type(dv) is not str or k not in self._partial_matches: 1342 result = (v == dv) 1343 else: 1344 result = dv.find(v) >= 0 1345 return result 1346 1347 1348_buggy_ucrt = None 1349def skip_if_buggy_ucrt_strfptime(test): 1350 """ 1351 Skip decorator for tests that use buggy strptime/strftime 1352 1353 If the UCRT bugs are present time.localtime().tm_zone will be 1354 an empty string, otherwise we assume the UCRT bugs are fixed 1355 1356 See bpo-37552 [Windows] strptime/strftime return invalid 1357 results with UCRT version 17763.615 1358 """ 1359 import locale 1360 global _buggy_ucrt 1361 if _buggy_ucrt is None: 1362 if(sys.platform == 'win32' and 1363 locale.getdefaultlocale()[1] == 'cp65001' and 1364 time.localtime().tm_zone == ''): 1365 _buggy_ucrt = True 1366 else: 1367 _buggy_ucrt = False 1368 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 1369 1370class PythonSymlink: 1371 """Creates a symlink for the current Python executable""" 1372 def __init__(self, link=None): 1373 from .os_helper import TESTFN 1374 1375 self.link = link or os.path.abspath(TESTFN) 1376 self._linked = [] 1377 self.real = os.path.realpath(sys.executable) 1378 self._also_link = [] 1379 1380 self._env = None 1381 1382 self._platform_specific() 1383 1384 if sys.platform == "win32": 1385 def _platform_specific(self): 1386 import glob 1387 import _winapi 1388 1389 if os.path.lexists(self.real) and not os.path.exists(self.real): 1390 # App symlink appears to not exist, but we want the 1391 # real executable here anyway 1392 self.real = _winapi.GetModuleFileName(0) 1393 1394 dll = _winapi.GetModuleFileName(sys.dllhandle) 1395 src_dir = os.path.dirname(dll) 1396 dest_dir = os.path.dirname(self.link) 1397 self._also_link.append(( 1398 dll, 1399 os.path.join(dest_dir, os.path.basename(dll)) 1400 )) 1401 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 1402 self._also_link.append(( 1403 runtime, 1404 os.path.join(dest_dir, os.path.basename(runtime)) 1405 )) 1406 1407 self._env = {k.upper(): os.getenv(k) for k in os.environ} 1408 self._env["PYTHONHOME"] = os.path.dirname(self.real) 1409 if sysconfig.is_python_build(True): 1410 self._env["PYTHONPATH"] = os.path.dirname(os.__file__) 1411 else: 1412 def _platform_specific(self): 1413 pass 1414 1415 def __enter__(self): 1416 os.symlink(self.real, self.link) 1417 self._linked.append(self.link) 1418 for real, link in self._also_link: 1419 os.symlink(real, link) 1420 self._linked.append(link) 1421 return self 1422 1423 def __exit__(self, exc_type, exc_value, exc_tb): 1424 for link in self._linked: 1425 try: 1426 os.remove(link) 1427 except IOError as ex: 1428 if verbose: 1429 print("failed to clean up {}: {}".format(link, ex)) 1430 1431 def _call(self, python, args, env, returncode): 1432 import subprocess 1433 cmd = [python, *args] 1434 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 1435 stderr=subprocess.PIPE, env=env) 1436 r = p.communicate() 1437 if p.returncode != returncode: 1438 if verbose: 1439 print(repr(r[0])) 1440 print(repr(r[1]), file=sys.stderr) 1441 raise RuntimeError( 1442 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 1443 return r 1444 1445 def call_real(self, *args, returncode=0): 1446 return self._call(self.real, args, None, returncode) 1447 1448 def call_link(self, *args, returncode=0): 1449 return self._call(self.link, args, self._env, returncode) 1450 1451 1452def skip_if_pgo_task(test): 1453 """Skip decorator for tests not run in (non-extended) PGO task""" 1454 ok = not PGO or PGO_EXTENDED 1455 msg = "Not run for (non-extended) PGO task" 1456 return test if ok else unittest.skip(msg)(test) 1457 1458 1459def detect_api_mismatch(ref_api, other_api, *, ignore=()): 1460 """Returns the set of items in ref_api not in other_api, except for a 1461 defined list of items to be ignored in this check. 1462 1463 By default this skips private attributes beginning with '_' but 1464 includes all magic methods, i.e. those starting and ending in '__'. 1465 """ 1466 missing_items = set(dir(ref_api)) - set(dir(other_api)) 1467 if ignore: 1468 missing_items -= set(ignore) 1469 missing_items = set(m for m in missing_items 1470 if not m.startswith('_') or m.endswith('__')) 1471 return missing_items 1472 1473 1474def check__all__(test_case, module, name_of_module=None, extra=(), 1475 not_exported=()): 1476 """Assert that the __all__ variable of 'module' contains all public names. 1477 1478 The module's public names (its API) are detected automatically based on 1479 whether they match the public name convention and were defined in 1480 'module'. 1481 1482 The 'name_of_module' argument can specify (as a string or tuple thereof) 1483 what module(s) an API could be defined in in order to be detected as a 1484 public API. One case for this is when 'module' imports part of its public 1485 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 1486 1487 The 'extra' argument can be a set of names that wouldn't otherwise be 1488 automatically detected as "public", like objects without a proper 1489 '__module__' attribute. If provided, it will be added to the 1490 automatically detected ones. 1491 1492 The 'not_exported' argument can be a set of names that must not be treated 1493 as part of the public API even though their names indicate otherwise. 1494 1495 Usage: 1496 import bar 1497 import foo 1498 import unittest 1499 from test import support 1500 1501 class MiscTestCase(unittest.TestCase): 1502 def test__all__(self): 1503 support.check__all__(self, foo) 1504 1505 class OtherTestCase(unittest.TestCase): 1506 def test__all__(self): 1507 extra = {'BAR_CONST', 'FOO_CONST'} 1508 not_exported = {'baz'} # Undocumented name. 1509 # bar imports part of its API from _bar. 1510 support.check__all__(self, bar, ('bar', '_bar'), 1511 extra=extra, not_exported=not_exported) 1512 1513 """ 1514 1515 if name_of_module is None: 1516 name_of_module = (module.__name__, ) 1517 elif isinstance(name_of_module, str): 1518 name_of_module = (name_of_module, ) 1519 1520 expected = set(extra) 1521 1522 for name in dir(module): 1523 if name.startswith('_') or name in not_exported: 1524 continue 1525 obj = getattr(module, name) 1526 if (getattr(obj, '__module__', None) in name_of_module or 1527 (not hasattr(obj, '__module__') and 1528 not isinstance(obj, types.ModuleType))): 1529 expected.add(name) 1530 test_case.assertCountEqual(module.__all__, expected) 1531 1532 1533def suppress_msvcrt_asserts(verbose=False): 1534 try: 1535 import msvcrt 1536 except ImportError: 1537 return 1538 1539 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 1540 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 1541 | msvcrt.SEM_NOGPFAULTERRORBOX 1542 | msvcrt.SEM_NOOPENFILEERRORBOX) 1543 1544 # CrtSetReportMode() is only available in debug build 1545 if hasattr(msvcrt, 'CrtSetReportMode'): 1546 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 1547 if verbose: 1548 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 1549 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 1550 else: 1551 msvcrt.CrtSetReportMode(m, 0) 1552 1553 1554class SuppressCrashReport: 1555 """Try to prevent a crash report from popping up. 1556 1557 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1558 disable the creation of coredump file. 1559 """ 1560 old_value = None 1561 old_modes = None 1562 1563 def __enter__(self): 1564 """On Windows, disable Windows Error Reporting dialogs using 1565 SetErrorMode() and CrtSetReportMode(). 1566 1567 On UNIX, try to save the previous core file size limit, then set 1568 soft limit to 0. 1569 """ 1570 if sys.platform.startswith('win'): 1571 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1572 try: 1573 import msvcrt 1574 except ImportError: 1575 return 1576 1577 self.old_value = msvcrt.GetErrorMode() 1578 1579 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 1580 1581 # bpo-23314: Suppress assert dialogs in debug builds. 1582 # CrtSetReportMode() is only available in debug build. 1583 if hasattr(msvcrt, 'CrtSetReportMode'): 1584 self.old_modes = {} 1585 for report_type in [msvcrt.CRT_WARN, 1586 msvcrt.CRT_ERROR, 1587 msvcrt.CRT_ASSERT]: 1588 old_mode = msvcrt.CrtSetReportMode(report_type, 1589 msvcrt.CRTDBG_MODE_FILE) 1590 old_file = msvcrt.CrtSetReportFile(report_type, 1591 msvcrt.CRTDBG_FILE_STDERR) 1592 self.old_modes[report_type] = old_mode, old_file 1593 1594 else: 1595 try: 1596 import resource 1597 self.resource = resource 1598 except ImportError: 1599 self.resource = None 1600 if self.resource is not None: 1601 try: 1602 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 1603 self.resource.setrlimit(self.resource.RLIMIT_CORE, 1604 (0, self.old_value[1])) 1605 except (ValueError, OSError): 1606 pass 1607 1608 if sys.platform == 'darwin': 1609 import subprocess 1610 # Check if the 'Crash Reporter' on OSX was configured 1611 # in 'Developer' mode and warn that it will get triggered 1612 # when it is. 1613 # 1614 # This assumes that this context manager is used in tests 1615 # that might trigger the next manager. 1616 cmd = ['/usr/bin/defaults', 'read', 1617 'com.apple.CrashReporter', 'DialogType'] 1618 proc = subprocess.Popen(cmd, 1619 stdout=subprocess.PIPE, 1620 stderr=subprocess.PIPE) 1621 with proc: 1622 stdout = proc.communicate()[0] 1623 if stdout.strip() == b'developer': 1624 print("this test triggers the Crash Reporter, " 1625 "that is intentional", end='', flush=True) 1626 1627 return self 1628 1629 def __exit__(self, *ignore_exc): 1630 """Restore Windows ErrorMode or core file behavior to initial value.""" 1631 if self.old_value is None: 1632 return 1633 1634 if sys.platform.startswith('win'): 1635 import msvcrt 1636 msvcrt.SetErrorMode(self.old_value) 1637 1638 if self.old_modes: 1639 for report_type, (old_mode, old_file) in self.old_modes.items(): 1640 msvcrt.CrtSetReportMode(report_type, old_mode) 1641 msvcrt.CrtSetReportFile(report_type, old_file) 1642 else: 1643 if self.resource is not None: 1644 try: 1645 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 1646 except (ValueError, OSError): 1647 pass 1648 1649 1650def patch(test_instance, object_to_patch, attr_name, new_value): 1651 """Override 'object_to_patch'.'attr_name' with 'new_value'. 1652 1653 Also, add a cleanup procedure to 'test_instance' to restore 1654 'object_to_patch' value for 'attr_name'. 1655 The 'attr_name' should be a valid attribute for 'object_to_patch'. 1656 1657 """ 1658 # check that 'attr_name' is a real attribute for 'object_to_patch' 1659 # will raise AttributeError if it does not exist 1660 getattr(object_to_patch, attr_name) 1661 1662 # keep a copy of the old value 1663 attr_is_local = False 1664 try: 1665 old_value = object_to_patch.__dict__[attr_name] 1666 except (AttributeError, KeyError): 1667 old_value = getattr(object_to_patch, attr_name, None) 1668 else: 1669 attr_is_local = True 1670 1671 # restore the value when the test is done 1672 def cleanup(): 1673 if attr_is_local: 1674 setattr(object_to_patch, attr_name, old_value) 1675 else: 1676 delattr(object_to_patch, attr_name) 1677 1678 test_instance.addCleanup(cleanup) 1679 1680 # actually override the attribute 1681 setattr(object_to_patch, attr_name, new_value) 1682 1683 1684def run_in_subinterp(code): 1685 """ 1686 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 1687 module is enabled. 1688 """ 1689 # Issue #10915, #15751: PyGILState_*() functions don't work with 1690 # sub-interpreters, the tracemalloc module uses these functions internally 1691 try: 1692 import tracemalloc 1693 except ImportError: 1694 pass 1695 else: 1696 if tracemalloc.is_tracing(): 1697 raise unittest.SkipTest("run_in_subinterp() cannot be used " 1698 "if tracemalloc module is tracing " 1699 "memory allocations") 1700 import _testcapi 1701 return _testcapi.run_in_subinterp(code) 1702 1703 1704def check_free_after_iterating(test, iter, cls, args=()): 1705 class A(cls): 1706 def __del__(self): 1707 nonlocal done 1708 done = True 1709 try: 1710 next(it) 1711 except StopIteration: 1712 pass 1713 1714 done = False 1715 it = iter(A(*args)) 1716 # Issue 26494: Shouldn't crash 1717 test.assertRaises(StopIteration, next, it) 1718 # The sequence should be deallocated just after the end of iterating 1719 gc_collect() 1720 test.assertTrue(done) 1721 1722 1723def missing_compiler_executable(cmd_names=[]): 1724 """Check if the compiler components used to build the interpreter exist. 1725 1726 Check for the existence of the compiler executables whose names are listed 1727 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 1728 and return the first missing executable or None when none is found 1729 missing. 1730 1731 """ 1732 # TODO (PEP 632): alternate check without using distutils 1733 from distutils import ccompiler, sysconfig, spawn, errors 1734 compiler = ccompiler.new_compiler() 1735 sysconfig.customize_compiler(compiler) 1736 if compiler.compiler_type == "msvc": 1737 # MSVC has no executables, so check whether initialization succeeds 1738 try: 1739 compiler.initialize() 1740 except errors.DistutilsPlatformError: 1741 return "msvc" 1742 for name in compiler.executables: 1743 if cmd_names and name not in cmd_names: 1744 continue 1745 cmd = getattr(compiler, name) 1746 if cmd_names: 1747 assert cmd is not None, \ 1748 "the '%s' executable is not configured" % name 1749 elif not cmd: 1750 continue 1751 if spawn.find_executable(cmd[0]) is None: 1752 return cmd[0] 1753 1754 1755_is_android_emulator = None 1756def setswitchinterval(interval): 1757 # Setting a very low gil interval on the Android emulator causes python 1758 # to hang (issue #26939). 1759 minimum_interval = 1e-5 1760 if is_android and interval < minimum_interval: 1761 global _is_android_emulator 1762 if _is_android_emulator is None: 1763 import subprocess 1764 _is_android_emulator = (subprocess.check_output( 1765 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 1766 if _is_android_emulator: 1767 interval = minimum_interval 1768 return sys.setswitchinterval(interval) 1769 1770 1771@contextlib.contextmanager 1772def disable_faulthandler(): 1773 import faulthandler 1774 1775 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 1776 # sys.stderr with a StringIO which has no file descriptor when a test 1777 # is run with -W/--verbose3. 1778 fd = sys.__stderr__.fileno() 1779 1780 is_enabled = faulthandler.is_enabled() 1781 try: 1782 faulthandler.disable() 1783 yield 1784 finally: 1785 if is_enabled: 1786 faulthandler.enable(file=fd, all_threads=True) 1787 1788 1789class SaveSignals: 1790 """ 1791 Save and restore signal handlers. 1792 1793 This class is only able to save/restore signal handlers registered 1794 by the Python signal module: see bpo-13285 for "external" signal 1795 handlers. 1796 """ 1797 1798 def __init__(self): 1799 import signal 1800 self.signal = signal 1801 self.signals = signal.valid_signals() 1802 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 1803 for signame in ('SIGKILL', 'SIGSTOP'): 1804 try: 1805 signum = getattr(signal, signame) 1806 except AttributeError: 1807 continue 1808 self.signals.remove(signum) 1809 self.handlers = {} 1810 1811 def save(self): 1812 for signum in self.signals: 1813 handler = self.signal.getsignal(signum) 1814 if handler is None: 1815 # getsignal() returns None if a signal handler was not 1816 # registered by the Python signal module, 1817 # and the handler is not SIG_DFL nor SIG_IGN. 1818 # 1819 # Ignore the signal: we cannot restore the handler. 1820 continue 1821 self.handlers[signum] = handler 1822 1823 def restore(self): 1824 for signum, handler in self.handlers.items(): 1825 self.signal.signal(signum, handler) 1826 1827 1828def with_pymalloc(): 1829 import _testcapi 1830 return _testcapi.WITH_PYMALLOC 1831 1832 1833class _ALWAYS_EQ: 1834 """ 1835 Object that is equal to anything. 1836 """ 1837 def __eq__(self, other): 1838 return True 1839 def __ne__(self, other): 1840 return False 1841 1842ALWAYS_EQ = _ALWAYS_EQ() 1843 1844class _NEVER_EQ: 1845 """ 1846 Object that is not equal to anything. 1847 """ 1848 def __eq__(self, other): 1849 return False 1850 def __ne__(self, other): 1851 return True 1852 def __hash__(self): 1853 return 1 1854 1855NEVER_EQ = _NEVER_EQ() 1856 1857@functools.total_ordering 1858class _LARGEST: 1859 """ 1860 Object that is greater than anything (except itself). 1861 """ 1862 def __eq__(self, other): 1863 return isinstance(other, _LARGEST) 1864 def __lt__(self, other): 1865 return False 1866 1867LARGEST = _LARGEST() 1868 1869@functools.total_ordering 1870class _SMALLEST: 1871 """ 1872 Object that is less than anything (except itself). 1873 """ 1874 def __eq__(self, other): 1875 return isinstance(other, _SMALLEST) 1876 def __gt__(self, other): 1877 return False 1878 1879SMALLEST = _SMALLEST() 1880 1881def maybe_get_event_loop_policy(): 1882 """Return the global event loop policy if one is set, else return None.""" 1883 import asyncio.events 1884 return asyncio.events._event_loop_policy 1885 1886# Helpers for testing hashing. 1887NHASHBITS = sys.hash_info.width # number of bits in hash() result 1888assert NHASHBITS in (32, 64) 1889 1890# Return mean and sdev of number of collisions when tossing nballs balls 1891# uniformly at random into nbins bins. By definition, the number of 1892# collisions is the number of balls minus the number of occupied bins at 1893# the end. 1894def collision_stats(nbins, nballs): 1895 n, k = nbins, nballs 1896 # prob a bin empty after k trials = (1 - 1/n)**k 1897 # mean # empty is then n * (1 - 1/n)**k 1898 # so mean # occupied is n - n * (1 - 1/n)**k 1899 # so collisions = k - (n - n*(1 - 1/n)**k) 1900 # 1901 # For the variance: 1902 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 1903 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 1904 # 1905 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 1906 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 1907 # error-prone) efforts to rework the naive formulas to avoid those, 1908 # we use the `decimal` module to get plenty of extra precision. 1909 # 1910 # Note: the exact values are straightforward to compute with 1911 # rationals, but in context that's unbearably slow, requiring 1912 # multi-million bit arithmetic. 1913 import decimal 1914 with decimal.localcontext() as ctx: 1915 bits = n.bit_length() * 2 # bits in n**2 1916 # At least that many bits will likely cancel out. 1917 # Use that many decimal digits instead. 1918 ctx.prec = max(bits, 30) 1919 dn = decimal.Decimal(n) 1920 p1empty = ((dn - 1) / dn) ** k 1921 meanempty = n * p1empty 1922 occupied = n - meanempty 1923 collisions = k - occupied 1924 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 1925 return float(collisions), float(var.sqrt()) 1926 1927 1928class catch_unraisable_exception: 1929 """ 1930 Context manager catching unraisable exception using sys.unraisablehook. 1931 1932 Storing the exception value (cm.unraisable.exc_value) creates a reference 1933 cycle. The reference cycle is broken explicitly when the context manager 1934 exits. 1935 1936 Storing the object (cm.unraisable.object) can resurrect it if it is set to 1937 an object which is being finalized. Exiting the context manager clears the 1938 stored object. 1939 1940 Usage: 1941 1942 with support.catch_unraisable_exception() as cm: 1943 # code creating an "unraisable exception" 1944 ... 1945 1946 # check the unraisable exception: use cm.unraisable 1947 ... 1948 1949 # cm.unraisable attribute no longer exists at this point 1950 # (to break a reference cycle) 1951 """ 1952 1953 def __init__(self): 1954 self.unraisable = None 1955 self._old_hook = None 1956 1957 def _hook(self, unraisable): 1958 # Storing unraisable.object can resurrect an object which is being 1959 # finalized. Storing unraisable.exc_value creates a reference cycle. 1960 self.unraisable = unraisable 1961 1962 def __enter__(self): 1963 self._old_hook = sys.unraisablehook 1964 sys.unraisablehook = self._hook 1965 return self 1966 1967 def __exit__(self, *exc_info): 1968 sys.unraisablehook = self._old_hook 1969 del self.unraisable 1970 1971 1972def wait_process(pid, *, exitcode, timeout=None): 1973 """ 1974 Wait until process pid completes and check that the process exit code is 1975 exitcode. 1976 1977 Raise an AssertionError if the process exit code is not equal to exitcode. 1978 1979 If the process runs longer than timeout seconds (SHORT_TIMEOUT by default), 1980 kill the process (if signal.SIGKILL is available) and raise an 1981 AssertionError. The timeout feature is not available on Windows. 1982 """ 1983 if os.name != "nt": 1984 import signal 1985 1986 if timeout is None: 1987 timeout = SHORT_TIMEOUT 1988 t0 = time.monotonic() 1989 sleep = 0.001 1990 max_sleep = 0.1 1991 while True: 1992 pid2, status = os.waitpid(pid, os.WNOHANG) 1993 if pid2 != 0: 1994 break 1995 # process is still running 1996 1997 dt = time.monotonic() - t0 1998 if dt > SHORT_TIMEOUT: 1999 try: 2000 os.kill(pid, signal.SIGKILL) 2001 os.waitpid(pid, 0) 2002 except OSError: 2003 # Ignore errors like ChildProcessError or PermissionError 2004 pass 2005 2006 raise AssertionError(f"process {pid} is still running " 2007 f"after {dt:.1f} seconds") 2008 2009 sleep = min(sleep * 2, max_sleep) 2010 time.sleep(sleep) 2011 else: 2012 # Windows implementation 2013 pid2, status = os.waitpid(pid, 0) 2014 2015 exitcode2 = os.waitstatus_to_exitcode(status) 2016 if exitcode2 != exitcode: 2017 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 2018 f"but exit code {exitcode} is expected") 2019 2020 # sanity check: it should not fail in practice 2021 if pid2 != pid: 2022 raise AssertionError(f"pid {pid2} != pid {pid}") 2023 2024def skip_if_broken_multiprocessing_synchronize(): 2025 """ 2026 Skip tests if the multiprocessing.synchronize module is missing, if there 2027 is no available semaphore implementation, or if creating a lock raises an 2028 OSError (on Linux only). 2029 """ 2030 from .import_helper import import_module 2031 2032 # Skip tests if the _multiprocessing extension is missing. 2033 import_module('_multiprocessing') 2034 2035 # Skip tests if there is no available semaphore implementation: 2036 # multiprocessing.synchronize requires _multiprocessing.SemLock. 2037 synchronize = import_module('multiprocessing.synchronize') 2038 2039 if sys.platform == "linux": 2040 try: 2041 # bpo-38377: On Linux, creating a semaphore fails with OSError 2042 # if the current user does not have the permission to create 2043 # a file in /dev/shm/ directory. 2044 synchronize.Lock(ctx=None) 2045 except OSError as exc: 2046 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 2047 2048 2049@contextlib.contextmanager 2050def infinite_recursion(max_depth=75): 2051 original_depth = sys.getrecursionlimit() 2052 try: 2053 sys.setrecursionlimit(max_depth) 2054 yield 2055 finally: 2056 sys.setrecursionlimit(original_depth) 2057 2058 2059def check_disallow_instantiation(testcase, tp, *args, **kwds): 2060 """ 2061 Check that given type cannot be instantiated using *args and **kwds. 2062 2063 See bpo-43916: Add Py_TPFLAGS_DISALLOW_INSTANTIATION type flag. 2064 """ 2065 mod = tp.__module__ 2066 name = tp.__name__ 2067 if mod != 'builtins': 2068 qualname = f"{mod}.{name}" 2069 else: 2070 qualname = f"{name}" 2071 msg = f"cannot create '{re.escape(qualname)}' instances" 2072 testcase.assertRaisesRegex(TypeError, msg, tp, *args, **kwds) 2073 2074 2075def ignore_deprecations_from(module: str, *, like: str) -> object: 2076 token = object() 2077 warnings.filterwarnings( 2078 "ignore", 2079 category=DeprecationWarning, 2080 module=module, 2081 message=like + fr"(?#support{id(token)})", 2082 ) 2083 return token 2084 2085 2086def clear_ignored_deprecations(*tokens: object) -> None: 2087 if not tokens: 2088 raise ValueError("Provide token or tokens returned by ignore_deprecations_from") 2089 2090 new_filters = [] 2091 endswith = tuple(rf"(?#support{id(token)})" for token in tokens) 2092 for action, message, category, module, lineno in warnings.filters: 2093 if action == "ignore" and category is DeprecationWarning: 2094 if isinstance(message, re.Pattern): 2095 msg = message.pattern 2096 else: 2097 msg = message or "" 2098 if msg.endswith(endswith): 2099 continue 2100 new_filters.append((action, message, category, module, lineno)) 2101 if warnings.filters != new_filters: 2102 warnings.filters[:] = new_filters 2103 warnings._filters_mutated() 2104