1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import collections.abc 7import contextlib 8import errno 9import fnmatch 10import functools 11import glob 12import importlib 13import importlib.util 14import os 15import platform 16import re 17import stat 18import struct 19import subprocess 20import sys 21import sysconfig 22import _thread 23import threading 24import time 25import types 26import unittest 27import warnings 28 29from .testresult import get_test_runner 30 31__all__ = [ 32 # globals 33 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 34 # exceptions 35 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 36 # imports 37 "import_module", "import_fresh_module", "CleanImport", 38 # modules 39 "unload", "forget", 40 # io 41 "record_original_stdout", "get_original_stdout", "captured_stdout", 42 "captured_stdin", "captured_stderr", 43 # filesystem 44 "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", 45 "create_empty_file", "can_symlink", "fs_is_case_insensitive", 46 # unittest 47 "is_resource_enabled", "requires", "requires_freebsd_version", 48 "requires_linux_version", "requires_mac_ver", 49 "check_syntax_error", "check_syntax_warning", 50 "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", 51 "BasicTestRunner", "run_unittest", "run_doctest", 52 "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", 53 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 54 "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", 55 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 56 "check__all__", "skip_if_buggy_ucrt_strfptime", 57 "ignore_warnings", 58 # sys 59 "is_jython", "is_android", "check_impl_detail", "unix_shell", 60 "setswitchinterval", 61 # network 62 "open_urlresource", 63 # processes 64 'temp_umask', "reap_children", 65 # threads 66 "threading_setup", "threading_cleanup", "reap_threads", "start_threads", 67 # miscellaneous 68 "check_warnings", "check_no_resource_warning", "check_no_warnings", 69 "EnvironmentVarGuard", 70 "run_with_locale", "swap_item", 71 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 72 "run_with_tz", "PGO", "missing_compiler_executable", "fd_count", 73 "ALWAYS_EQ", "NEVER_EQ", "LARGEST", "SMALLEST", 74 "LOOPBACK_TIMEOUT", "INTERNET_TIMEOUT", "SHORT_TIMEOUT", "LONG_TIMEOUT", 75 ] 76 77 78# Timeout in seconds for tests using a network server listening on the network 79# local loopback interface like 127.0.0.1. 80# 81# The timeout is long enough to prevent test failure: it takes into account 82# that the client and the server can run in different threads or even different 83# processes. 84# 85# The timeout should be long enough for connect(), recv() and send() methods 86# of socket.socket. 87LOOPBACK_TIMEOUT = 5.0 88if sys.platform == 'win32' and platform.machine() == 'ARM': 89 # bpo-37553: test_socket.SendfileUsingSendTest is taking longer than 2 90 # seconds on Windows ARM32 buildbot 91 LOOPBACK_TIMEOUT = 10 92 93# Timeout in seconds for network requests going to the Internet. The timeout is 94# short enough to prevent a test to wait for too long if the Internet request 95# is blocked for whatever reason. 96# 97# Usually, a timeout using INTERNET_TIMEOUT should not mark a test as failed, 98# but skip the test instead: see transient_internet(). 99INTERNET_TIMEOUT = 60.0 100 101# Timeout in seconds to mark a test as failed if the test takes "too long". 102# 103# The timeout value depends on the regrtest --timeout command line option. 104# 105# If a test using SHORT_TIMEOUT starts to fail randomly on slow buildbots, use 106# LONG_TIMEOUT instead. 107SHORT_TIMEOUT = 30.0 108 109# Timeout in seconds to detect when a test hangs. 110# 111# It is long enough to reduce the risk of test failure on the slowest Python 112# buildbots. It should not be used to mark a test as failed if the test takes 113# "too long". The timeout value depends on the regrtest --timeout command line 114# option. 115LONG_TIMEOUT = 5 * 60.0 116 117 118class Error(Exception): 119 """Base class for regression test exceptions.""" 120 121class TestFailed(Error): 122 """Test failed.""" 123 124class TestDidNotRun(Error): 125 """Test did not run any subtests.""" 126 127class ResourceDenied(unittest.SkipTest): 128 """Test skipped because it requested a disallowed resource. 129 130 This is raised when a test calls requires() for a resource that 131 has not be enabled. It is used to distinguish between expected 132 and unexpected skips. 133 """ 134 135@contextlib.contextmanager 136def _ignore_deprecated_imports(ignore=True): 137 """Context manager to suppress package and module deprecation 138 warnings when importing them. 139 140 If ignore is False, this context manager has no effect. 141 """ 142 if ignore: 143 with warnings.catch_warnings(): 144 warnings.filterwarnings("ignore", ".+ (module|package)", 145 DeprecationWarning) 146 yield 147 else: 148 yield 149 150 151def ignore_warnings(*, category): 152 """Decorator to suppress deprecation warnings. 153 154 Use of context managers to hide warnings make diffs 155 more noisy and tools like 'git blame' less useful. 156 """ 157 def decorator(test): 158 @functools.wraps(test) 159 def wrapper(self, *args, **kwargs): 160 with warnings.catch_warnings(): 161 warnings.simplefilter('ignore', category=category) 162 return test(self, *args, **kwargs) 163 return wrapper 164 return decorator 165 166 167def import_module(name, deprecated=False, *, required_on=()): 168 """Import and return the module to be tested, raising SkipTest if 169 it is not available. 170 171 If deprecated is True, any module or package deprecation messages 172 will be suppressed. If a module is required on a platform but optional for 173 others, set required_on to an iterable of platform prefixes which will be 174 compared against sys.platform. 175 """ 176 with _ignore_deprecated_imports(deprecated): 177 try: 178 return importlib.import_module(name) 179 except ImportError as msg: 180 if sys.platform.startswith(tuple(required_on)): 181 raise 182 raise unittest.SkipTest(str(msg)) 183 184 185def _save_and_remove_module(name, orig_modules): 186 """Helper function to save and remove a module from sys.modules 187 188 Raise ImportError if the module can't be imported. 189 """ 190 # try to import the module and raise an error if it can't be imported 191 if name not in sys.modules: 192 __import__(name) 193 del sys.modules[name] 194 for modname in list(sys.modules): 195 if modname == name or modname.startswith(name + '.'): 196 orig_modules[modname] = sys.modules[modname] 197 del sys.modules[modname] 198 199def _save_and_block_module(name, orig_modules): 200 """Helper function to save and block a module in sys.modules 201 202 Return True if the module was in sys.modules, False otherwise. 203 """ 204 saved = True 205 try: 206 orig_modules[name] = sys.modules[name] 207 except KeyError: 208 saved = False 209 sys.modules[name] = None 210 return saved 211 212 213def anticipate_failure(condition): 214 """Decorator to mark a test that is known to be broken in some cases 215 216 Any use of this decorator should have a comment identifying the 217 associated tracker issue. 218 """ 219 if condition: 220 return unittest.expectedFailure 221 return lambda f: f 222 223def load_package_tests(pkg_dir, loader, standard_tests, pattern): 224 """Generic load_tests implementation for simple test packages. 225 226 Most packages can implement load_tests using this function as follows: 227 228 def load_tests(*args): 229 return load_package_tests(os.path.dirname(__file__), *args) 230 """ 231 if pattern is None: 232 pattern = "test*" 233 top_dir = os.path.dirname( # Lib 234 os.path.dirname( # test 235 os.path.dirname(__file__))) # support 236 package_tests = loader.discover(start_dir=pkg_dir, 237 top_level_dir=top_dir, 238 pattern=pattern) 239 standard_tests.addTests(package_tests) 240 return standard_tests 241 242 243def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 244 """Import and return a module, deliberately bypassing sys.modules. 245 246 This function imports and returns a fresh copy of the named Python module 247 by removing the named module from sys.modules before doing the import. 248 Note that unlike reload, the original module is not affected by 249 this operation. 250 251 *fresh* is an iterable of additional module names that are also removed 252 from the sys.modules cache before doing the import. 253 254 *blocked* is an iterable of module names that are replaced with None 255 in the module cache during the import to ensure that attempts to import 256 them raise ImportError. 257 258 The named module and any modules named in the *fresh* and *blocked* 259 parameters are saved before starting the import and then reinserted into 260 sys.modules when the fresh import is complete. 261 262 Module and package deprecation messages are suppressed during this import 263 if *deprecated* is True. 264 265 This function will raise ImportError if the named module cannot be 266 imported. 267 """ 268 # NOTE: test_heapq, test_json and test_warnings include extra sanity checks 269 # to make sure that this utility function is working as expected 270 with _ignore_deprecated_imports(deprecated): 271 # Keep track of modules saved for later restoration as well 272 # as those which just need a blocking entry removed 273 orig_modules = {} 274 names_to_remove = [] 275 _save_and_remove_module(name, orig_modules) 276 try: 277 for fresh_name in fresh: 278 _save_and_remove_module(fresh_name, orig_modules) 279 for blocked_name in blocked: 280 if not _save_and_block_module(blocked_name, orig_modules): 281 names_to_remove.append(blocked_name) 282 fresh_module = importlib.import_module(name) 283 except ImportError: 284 fresh_module = None 285 finally: 286 for orig_name, module in orig_modules.items(): 287 sys.modules[orig_name] = module 288 for name_to_remove in names_to_remove: 289 del sys.modules[name_to_remove] 290 return fresh_module 291 292 293def get_attribute(obj, name): 294 """Get an attribute, raising SkipTest if AttributeError is raised.""" 295 try: 296 attribute = getattr(obj, name) 297 except AttributeError: 298 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 299 else: 300 return attribute 301 302verbose = 1 # Flag set to 0 by regrtest.py 303use_resources = None # Flag set to [] by regrtest.py 304max_memuse = 0 # Disable bigmem tests (they will still be run with 305 # small sizes, to make sure they work.) 306real_max_memuse = 0 307junit_xml_list = None # list of testsuite XML elements 308failfast = False 309 310# _original_stdout is meant to hold stdout at the time regrtest began. 311# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 312# The point is to have some flavor of stdout the user can actually see. 313_original_stdout = None 314def record_original_stdout(stdout): 315 global _original_stdout 316 _original_stdout = stdout 317 318def get_original_stdout(): 319 return _original_stdout or sys.stdout 320 321def unload(name): 322 try: 323 del sys.modules[name] 324 except KeyError: 325 pass 326 327def _force_run(path, func, *args): 328 try: 329 return func(*args) 330 except OSError as err: 331 if verbose >= 2: 332 print('%s: %s' % (err.__class__.__name__, err)) 333 print('re-run %s%r' % (func.__name__, args)) 334 os.chmod(path, stat.S_IRWXU) 335 return func(*args) 336 337if sys.platform.startswith("win"): 338 def _waitfor(func, pathname, waitall=False): 339 # Perform the operation 340 func(pathname) 341 # Now setup the wait loop 342 if waitall: 343 dirname = pathname 344 else: 345 dirname, name = os.path.split(pathname) 346 dirname = dirname or '.' 347 # Check for `pathname` to be removed from the filesystem. 348 # The exponential backoff of the timeout amounts to a total 349 # of ~1 second after which the deletion is probably an error 350 # anyway. 351 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 352 # required when contention occurs. 353 timeout = 0.001 354 while timeout < 1.0: 355 # Note we are only testing for the existence of the file(s) in 356 # the contents of the directory regardless of any security or 357 # access rights. If we have made it this far, we have sufficient 358 # permissions to do that much using Python's equivalent of the 359 # Windows API FindFirstFile. 360 # Other Windows APIs can fail or give incorrect results when 361 # dealing with files that are pending deletion. 362 L = os.listdir(dirname) 363 if not (L if waitall else name in L): 364 return 365 # Increase the timeout and try again 366 time.sleep(timeout) 367 timeout *= 2 368 warnings.warn('tests may fail, delete still pending for ' + pathname, 369 RuntimeWarning, stacklevel=4) 370 371 def _unlink(filename): 372 _waitfor(os.unlink, filename) 373 374 def _rmdir(dirname): 375 _waitfor(os.rmdir, dirname) 376 377 def _rmtree(path): 378 def _rmtree_inner(path): 379 for name in _force_run(path, os.listdir, path): 380 fullname = os.path.join(path, name) 381 try: 382 mode = os.lstat(fullname).st_mode 383 except OSError as exc: 384 print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), 385 file=sys.__stderr__) 386 mode = 0 387 if stat.S_ISDIR(mode): 388 _waitfor(_rmtree_inner, fullname, waitall=True) 389 _force_run(fullname, os.rmdir, fullname) 390 else: 391 _force_run(fullname, os.unlink, fullname) 392 _waitfor(_rmtree_inner, path, waitall=True) 393 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 394 395 def _longpath(path): 396 try: 397 import ctypes 398 except ImportError: 399 # No ctypes means we can't expands paths. 400 pass 401 else: 402 buffer = ctypes.create_unicode_buffer(len(path) * 2) 403 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 404 len(buffer)) 405 if length: 406 return buffer[:length] 407 return path 408else: 409 _unlink = os.unlink 410 _rmdir = os.rmdir 411 412 def _rmtree(path): 413 import shutil 414 try: 415 shutil.rmtree(path) 416 return 417 except OSError: 418 pass 419 420 def _rmtree_inner(path): 421 for name in _force_run(path, os.listdir, path): 422 fullname = os.path.join(path, name) 423 try: 424 mode = os.lstat(fullname).st_mode 425 except OSError: 426 mode = 0 427 if stat.S_ISDIR(mode): 428 _rmtree_inner(fullname) 429 _force_run(path, os.rmdir, fullname) 430 else: 431 _force_run(path, os.unlink, fullname) 432 _rmtree_inner(path) 433 os.rmdir(path) 434 435 def _longpath(path): 436 return path 437 438def unlink(filename): 439 try: 440 _unlink(filename) 441 except (FileNotFoundError, NotADirectoryError): 442 pass 443 444def rmdir(dirname): 445 try: 446 _rmdir(dirname) 447 except FileNotFoundError: 448 pass 449 450def rmtree(path): 451 try: 452 _rmtree(path) 453 except FileNotFoundError: 454 pass 455 456def make_legacy_pyc(source): 457 """Move a PEP 3147/488 pyc file to its legacy pyc location. 458 459 :param source: The file system path to the source file. The source file 460 does not need to exist, however the PEP 3147/488 pyc file must exist. 461 :return: The file system path to the legacy pyc file. 462 """ 463 pyc_file = importlib.util.cache_from_source(source) 464 up_one = os.path.dirname(os.path.abspath(source)) 465 legacy_pyc = os.path.join(up_one, source + 'c') 466 os.rename(pyc_file, legacy_pyc) 467 return legacy_pyc 468 469def forget(modname): 470 """'Forget' a module was ever imported. 471 472 This removes the module from sys.modules and deletes any PEP 3147/488 or 473 legacy .pyc files. 474 """ 475 unload(modname) 476 for dirname in sys.path: 477 source = os.path.join(dirname, modname + '.py') 478 # It doesn't matter if they exist or not, unlink all possible 479 # combinations of PEP 3147/488 and legacy pyc files. 480 unlink(source + 'c') 481 for opt in ('', 1, 2): 482 unlink(importlib.util.cache_from_source(source, optimization=opt)) 483 484# Check whether a gui is actually available 485def _is_gui_available(): 486 if hasattr(_is_gui_available, 'result'): 487 return _is_gui_available.result 488 reason = None 489 if sys.platform.startswith('win') and platform.win32_is_iot(): 490 reason = "gui is not available on Windows IoT Core" 491 elif sys.platform.startswith('win'): 492 # if Python is running as a service (such as the buildbot service), 493 # gui interaction may be disallowed 494 import ctypes 495 import ctypes.wintypes 496 UOI_FLAGS = 1 497 WSF_VISIBLE = 0x0001 498 class USEROBJECTFLAGS(ctypes.Structure): 499 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 500 ("fReserved", ctypes.wintypes.BOOL), 501 ("dwFlags", ctypes.wintypes.DWORD)] 502 dll = ctypes.windll.user32 503 h = dll.GetProcessWindowStation() 504 if not h: 505 raise ctypes.WinError() 506 uof = USEROBJECTFLAGS() 507 needed = ctypes.wintypes.DWORD() 508 res = dll.GetUserObjectInformationW(h, 509 UOI_FLAGS, 510 ctypes.byref(uof), 511 ctypes.sizeof(uof), 512 ctypes.byref(needed)) 513 if not res: 514 raise ctypes.WinError() 515 if not bool(uof.dwFlags & WSF_VISIBLE): 516 reason = "gui not available (WSF_VISIBLE flag not set)" 517 elif sys.platform == 'darwin': 518 # The Aqua Tk implementations on OS X can abort the process if 519 # being called in an environment where a window server connection 520 # cannot be made, for instance when invoked by a buildbot or ssh 521 # process not running under the same user id as the current console 522 # user. To avoid that, raise an exception if the window manager 523 # connection is not available. 524 from ctypes import cdll, c_int, pointer, Structure 525 from ctypes.util import find_library 526 527 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 528 529 if app_services.CGMainDisplayID() == 0: 530 reason = "gui tests cannot run without OS X window manager" 531 else: 532 class ProcessSerialNumber(Structure): 533 _fields_ = [("highLongOfPSN", c_int), 534 ("lowLongOfPSN", c_int)] 535 psn = ProcessSerialNumber() 536 psn_p = pointer(psn) 537 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 538 (app_services.SetFrontProcess(psn_p) < 0) ): 539 reason = "cannot run without OS X gui process" 540 541 # check on every platform whether tkinter can actually do anything 542 if not reason: 543 try: 544 from tkinter import Tk 545 root = Tk() 546 root.withdraw() 547 root.update() 548 root.destroy() 549 except Exception as e: 550 err_string = str(e) 551 if len(err_string) > 50: 552 err_string = err_string[:50] + ' [...]' 553 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 554 err_string) 555 556 _is_gui_available.reason = reason 557 _is_gui_available.result = not reason 558 559 return _is_gui_available.result 560 561def is_resource_enabled(resource): 562 """Test whether a resource is enabled. 563 564 Known resources are set by regrtest.py. If not running under regrtest.py, 565 all resources are assumed enabled unless use_resources has been set. 566 """ 567 return use_resources is None or resource in use_resources 568 569def requires(resource, msg=None): 570 """Raise ResourceDenied if the specified resource is not available.""" 571 if not is_resource_enabled(resource): 572 if msg is None: 573 msg = "Use of the %r resource not enabled" % resource 574 raise ResourceDenied(msg) 575 if resource == 'gui' and not _is_gui_available(): 576 raise ResourceDenied(_is_gui_available.reason) 577 578def _requires_unix_version(sysname, min_version): 579 """Decorator raising SkipTest if the OS is `sysname` and the version is less 580 than `min_version`. 581 582 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 583 the FreeBSD version is less than 7.2. 584 """ 585 import platform 586 min_version_txt = '.'.join(map(str, min_version)) 587 version_txt = platform.release().split('-', 1)[0] 588 if platform.system() == sysname: 589 try: 590 version = tuple(map(int, version_txt.split('.'))) 591 except ValueError: 592 skip = False 593 else: 594 skip = version < min_version 595 else: 596 skip = False 597 598 return unittest.skipIf( 599 skip, 600 f"{sysname} version {min_version_txt} or higher required, not " 601 f"{version_txt}" 602 ) 603 604 605def requires_freebsd_version(*min_version): 606 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 607 less than `min_version`. 608 609 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 610 version is less than 7.2. 611 """ 612 return _requires_unix_version('FreeBSD', min_version) 613 614def requires_linux_version(*min_version): 615 """Decorator raising SkipTest if the OS is Linux and the Linux version is 616 less than `min_version`. 617 618 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 619 version is less than 2.6.32. 620 """ 621 return _requires_unix_version('Linux', min_version) 622 623def requires_mac_ver(*min_version): 624 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 625 version if less than min_version. 626 627 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 628 is lesser than 10.5. 629 """ 630 def decorator(func): 631 @functools.wraps(func) 632 def wrapper(*args, **kw): 633 if sys.platform == 'darwin': 634 version_txt = platform.mac_ver()[0] 635 try: 636 version = tuple(map(int, version_txt.split('.'))) 637 except ValueError: 638 pass 639 else: 640 if version < min_version: 641 min_version_txt = '.'.join(map(str, min_version)) 642 raise unittest.SkipTest( 643 "Mac OS X %s or higher required, not %s" 644 % (min_version_txt, version_txt)) 645 return func(*args, **kw) 646 wrapper.min_version = min_version 647 return wrapper 648 return decorator 649 650 651def system_must_validate_cert(f): 652 """Skip the test on TLS certificate validation failures.""" 653 @functools.wraps(f) 654 def dec(*args, **kwargs): 655 try: 656 f(*args, **kwargs) 657 except OSError as e: 658 if "CERTIFICATE_VERIFY_FAILED" in str(e): 659 raise unittest.SkipTest("system does not contain " 660 "necessary certificates") 661 raise 662 return dec 663 664# A constant likely larger than the underlying OS pipe buffer size, to 665# make writes blocking. 666# Windows limit seems to be around 512 B, and many Unix kernels have a 667# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 668# (see issue #17835 for a discussion of this number). 669PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 670 671# A constant likely larger than the underlying OS socket buffer size, to make 672# writes blocking. 673# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 674# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 675# for a discussion of this number). 676SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 677 678# decorator for skipping tests on non-IEEE 754 platforms 679requires_IEEE_754 = unittest.skipUnless( 680 float.__getformat__("double").startswith("IEEE"), 681 "test requires IEEE 754 doubles") 682 683def requires_zlib(reason='requires zlib'): 684 try: 685 import zlib 686 except ImportError: 687 zlib = None 688 return unittest.skipUnless(zlib, reason) 689 690def requires_gzip(reason='requires gzip'): 691 try: 692 import gzip 693 except ImportError: 694 gzip = None 695 return unittest.skipUnless(gzip, reason) 696 697def requires_bz2(reason='requires bz2'): 698 try: 699 import bz2 700 except ImportError: 701 bz2 = None 702 return unittest.skipUnless(bz2, reason) 703 704def requires_lzma(reason='requires lzma'): 705 try: 706 import lzma 707 except ImportError: 708 lzma = None 709 return unittest.skipUnless(lzma, reason) 710 711is_jython = sys.platform.startswith('java') 712 713is_android = hasattr(sys, 'getandroidapilevel') 714 715if sys.platform != 'win32': 716 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 717else: 718 unix_shell = None 719 720# Filename used for testing 721if os.name == 'java': 722 # Jython disallows @ in module names 723 TESTFN_ASCII = '$test' 724else: 725 TESTFN_ASCII = '@test' 726 727# Disambiguate TESTFN for parallel testing, while letting it remain a valid 728# module name. 729TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid()) 730 731# Define the URL of a dedicated HTTP server for the network tests. 732# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 733TEST_HTTP_URL = "http://www.pythontest.net" 734 735# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 736# or an empty string if there is no such character. 737FS_NONASCII = '' 738for character in ( 739 # First try printable and common characters to have a readable filename. 740 # For each character, the encoding list are just example of encodings able 741 # to encode the character (the list is not exhaustive). 742 743 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 744 '\u00E6', 745 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 746 '\u0130', 747 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 748 '\u0141', 749 # U+03C6 (Greek Small Letter Phi): cp1253 750 '\u03C6', 751 # U+041A (Cyrillic Capital Letter Ka): cp1251 752 '\u041A', 753 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 754 '\u05D0', 755 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 756 '\u060C', 757 # U+062A (Arabic Letter Teh): cp720 758 '\u062A', 759 # U+0E01 (Thai Character Ko Kai): cp874 760 '\u0E01', 761 762 # Then try more "special" characters. "special" because they may be 763 # interpreted or displayed differently depending on the exact locale 764 # encoding and the font. 765 766 # U+00A0 (No-Break Space) 767 '\u00A0', 768 # U+20AC (Euro Sign) 769 '\u20AC', 770): 771 try: 772 # If Python is set up to use the legacy 'mbcs' in Windows, 773 # 'replace' error mode is used, and encode() returns b'?' 774 # for characters missing in the ANSI codepage 775 if os.fsdecode(os.fsencode(character)) != character: 776 raise UnicodeError 777 except UnicodeError: 778 pass 779 else: 780 FS_NONASCII = character 781 break 782 783# TESTFN_UNICODE is a non-ascii filename 784TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f" 785if sys.platform == 'darwin': 786 # In Mac OS X's VFS API file names are, by definition, canonically 787 # decomposed Unicode, encoded using UTF-8. See QA1173: 788 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 789 import unicodedata 790 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 791TESTFN_ENCODING = sys.getfilesystemencoding() 792 793# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 794# encoded by the filesystem encoding (in strict mode). It can be None if we 795# cannot generate such filename. 796TESTFN_UNENCODABLE = None 797if os.name == 'nt': 798 # skip win32s (0) or Windows 9x/ME (1) 799 if sys.getwindowsversion().platform >= 2: 800 # Different kinds of characters from various languages to minimize the 801 # probability that the whole name is encodable to MBCS (issue #9819) 802 TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80" 803 try: 804 TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) 805 except UnicodeEncodeError: 806 pass 807 else: 808 print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 809 'Unicode filename tests may not be effective' 810 % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) 811 TESTFN_UNENCODABLE = None 812# Mac OS X denies unencodable filenames (invalid utf-8) 813elif sys.platform != 'darwin': 814 try: 815 # ascii and utf-8 cannot encode the byte 0xff 816 b'\xff'.decode(TESTFN_ENCODING) 817 except UnicodeDecodeError: 818 # 0xff will be encoded using the surrogate character u+DCFF 819 TESTFN_UNENCODABLE = TESTFN_ASCII \ 820 + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') 821 else: 822 # File system encoding (eg. ISO-8859-* encodings) can encode 823 # the byte 0xff. Skip some unicode filename tests. 824 pass 825 826# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 827# decoded from the filesystem encoding (in strict mode). It can be None if we 828# cannot generate such filename (ex: the latin1 encoding can decode any byte 829# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 830# to the surrogateescape error handler (PEP 383), but not from the filesystem 831# encoding in strict mode. 832TESTFN_UNDECODABLE = None 833for name in ( 834 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 835 # accepts it to create a file or a directory, or don't accept to enter to 836 # such directory (when the bytes name is used). So test b'\xe7' first: it is 837 # not decodable from cp932. 838 b'\xe7w\xf0', 839 # undecodable from ASCII, UTF-8 840 b'\xff', 841 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 842 # and cp857 843 b'\xae\xd5' 844 # undecodable from UTF-8 (UNIX and Mac OS X) 845 b'\xed\xb2\x80', b'\xed\xb4\x80', 846 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 847 # cp1253, cp1254, cp1255, cp1257, cp1258 848 b'\x81\x98', 849): 850 try: 851 name.decode(TESTFN_ENCODING) 852 except UnicodeDecodeError: 853 TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name 854 break 855 856if FS_NONASCII: 857 TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII 858else: 859 TESTFN_NONASCII = None 860TESTFN = TESTFN_NONASCII or TESTFN_ASCII 861 862# Save the initial cwd 863SAVEDCWD = os.getcwd() 864 865# Set by libregrtest/main.py so we can skip tests that are not 866# useful for PGO 867PGO = False 868 869# Set by libregrtest/main.py if we are running the extended (time consuming) 870# PGO task. If this is True, PGO is also True. 871PGO_EXTENDED = False 872 873@contextlib.contextmanager 874def temp_dir(path=None, quiet=False): 875 """Return a context manager that creates a temporary directory. 876 877 Arguments: 878 879 path: the directory to create temporarily. If omitted or None, 880 defaults to creating a temporary directory using tempfile.mkdtemp. 881 882 quiet: if False (the default), the context manager raises an exception 883 on error. Otherwise, if the path is specified and cannot be 884 created, only a warning is issued. 885 886 """ 887 import tempfile 888 dir_created = False 889 if path is None: 890 path = tempfile.mkdtemp() 891 dir_created = True 892 path = os.path.realpath(path) 893 else: 894 try: 895 os.mkdir(path) 896 dir_created = True 897 except OSError as exc: 898 if not quiet: 899 raise 900 warnings.warn(f'tests may fail, unable to create ' 901 f'temporary directory {path!r}: {exc}', 902 RuntimeWarning, stacklevel=3) 903 if dir_created: 904 pid = os.getpid() 905 try: 906 yield path 907 finally: 908 # In case the process forks, let only the parent remove the 909 # directory. The child has a different process id. (bpo-30028) 910 if dir_created and pid == os.getpid(): 911 rmtree(path) 912 913@contextlib.contextmanager 914def change_cwd(path, quiet=False): 915 """Return a context manager that changes the current working directory. 916 917 Arguments: 918 919 path: the directory to use as the temporary current working directory. 920 921 quiet: if False (the default), the context manager raises an exception 922 on error. Otherwise, it issues only a warning and keeps the current 923 working directory the same. 924 925 """ 926 saved_dir = os.getcwd() 927 try: 928 os.chdir(os.path.realpath(path)) 929 except OSError as exc: 930 if not quiet: 931 raise 932 warnings.warn(f'tests may fail, unable to change the current working ' 933 f'directory to {path!r}: {exc}', 934 RuntimeWarning, stacklevel=3) 935 try: 936 yield os.getcwd() 937 finally: 938 os.chdir(saved_dir) 939 940 941@contextlib.contextmanager 942def temp_cwd(name='tempcwd', quiet=False): 943 """ 944 Context manager that temporarily creates and changes the CWD. 945 946 The function temporarily changes the current working directory 947 after creating a temporary directory in the current directory with 948 name *name*. If *name* is None, the temporary directory is 949 created using tempfile.mkdtemp. 950 951 If *quiet* is False (default) and it is not possible to 952 create or change the CWD, an error is raised. If *quiet* is True, 953 only a warning is raised and the original CWD is used. 954 955 """ 956 with temp_dir(path=name, quiet=quiet) as temp_path: 957 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 958 yield cwd_dir 959 960if hasattr(os, "umask"): 961 @contextlib.contextmanager 962 def temp_umask(umask): 963 """Context manager that temporarily sets the process umask.""" 964 oldmask = os.umask(umask) 965 try: 966 yield 967 finally: 968 os.umask(oldmask) 969 970# TEST_HOME_DIR refers to the top level directory of the "test" package 971# that contains Python's regression test suite 972TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 973TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 974 975# TEST_DATA_DIR is used as a target download location for remote resources 976TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 977 978def findfile(filename, subdir=None): 979 """Try to find a file on sys.path or in the test directory. If it is not 980 found the argument passed to the function is returned (this does not 981 necessarily signal failure; could still be the legitimate path). 982 983 Setting *subdir* indicates a relative path to use to find the file 984 rather than looking directly in the path directories. 985 """ 986 if os.path.isabs(filename): 987 return filename 988 if subdir is not None: 989 filename = os.path.join(subdir, filename) 990 path = [TEST_HOME_DIR] + sys.path 991 for dn in path: 992 fn = os.path.join(dn, filename) 993 if os.path.exists(fn): return fn 994 return filename 995 996def create_empty_file(filename): 997 """Create an empty file. If the file already exists, truncate it.""" 998 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 999 os.close(fd) 1000 1001def sortdict(dict): 1002 "Like repr(dict), but in sorted order." 1003 items = sorted(dict.items()) 1004 reprpairs = ["%r: %r" % pair for pair in items] 1005 withcommas = ", ".join(reprpairs) 1006 return "{%s}" % withcommas 1007 1008def make_bad_fd(): 1009 """ 1010 Create an invalid file descriptor by opening and closing a file and return 1011 its fd. 1012 """ 1013 file = open(TESTFN, "wb") 1014 try: 1015 return file.fileno() 1016 finally: 1017 file.close() 1018 unlink(TESTFN) 1019 1020 1021def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 1022 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 1023 compile(statement, '<test string>', 'exec') 1024 err = cm.exception 1025 testcase.assertIsNotNone(err.lineno) 1026 if lineno is not None: 1027 testcase.assertEqual(err.lineno, lineno) 1028 testcase.assertIsNotNone(err.offset) 1029 if offset is not None: 1030 testcase.assertEqual(err.offset, offset) 1031 1032def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None): 1033 # Test also that a warning is emitted only once. 1034 with warnings.catch_warnings(record=True) as warns: 1035 warnings.simplefilter('always', SyntaxWarning) 1036 compile(statement, '<testcase>', 'exec') 1037 testcase.assertEqual(len(warns), 1, warns) 1038 1039 warn, = warns 1040 testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category) 1041 if errtext: 1042 testcase.assertRegex(str(warn.message), errtext) 1043 testcase.assertEqual(warn.filename, '<testcase>') 1044 testcase.assertIsNotNone(warn.lineno) 1045 if lineno is not None: 1046 testcase.assertEqual(warn.lineno, lineno) 1047 1048 # SyntaxWarning should be converted to SyntaxError when raised, 1049 # since the latter contains more information and provides better 1050 # error report. 1051 with warnings.catch_warnings(record=True) as warns: 1052 warnings.simplefilter('error', SyntaxWarning) 1053 check_syntax_error(testcase, statement, errtext, 1054 lineno=lineno, offset=offset) 1055 # No warnings are leaked when a SyntaxError is raised. 1056 testcase.assertEqual(warns, []) 1057 1058 1059def open_urlresource(url, *args, **kw): 1060 import urllib.request, urllib.parse 1061 try: 1062 import gzip 1063 except ImportError: 1064 gzip = None 1065 1066 check = kw.pop('check', None) 1067 1068 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 1069 1070 fn = os.path.join(TEST_DATA_DIR, filename) 1071 1072 def check_valid_file(fn): 1073 f = open(fn, *args, **kw) 1074 if check is None: 1075 return f 1076 elif check(f): 1077 f.seek(0) 1078 return f 1079 f.close() 1080 1081 if os.path.exists(fn): 1082 f = check_valid_file(fn) 1083 if f is not None: 1084 return f 1085 unlink(fn) 1086 1087 # Verify the requirement before downloading the file 1088 requires('urlfetch') 1089 1090 if verbose: 1091 print('\tfetching %s ...' % url, file=get_original_stdout()) 1092 opener = urllib.request.build_opener() 1093 if gzip: 1094 opener.addheaders.append(('Accept-Encoding', 'gzip')) 1095 f = opener.open(url, timeout=INTERNET_TIMEOUT) 1096 if gzip and f.headers.get('Content-Encoding') == 'gzip': 1097 f = gzip.GzipFile(fileobj=f) 1098 try: 1099 with open(fn, "wb") as out: 1100 s = f.read() 1101 while s: 1102 out.write(s) 1103 s = f.read() 1104 finally: 1105 f.close() 1106 1107 f = check_valid_file(fn) 1108 if f is not None: 1109 return f 1110 raise TestFailed('invalid resource %r' % fn) 1111 1112 1113class WarningsRecorder(object): 1114 """Convenience wrapper for the warnings list returned on 1115 entry to the warnings.catch_warnings() context manager. 1116 """ 1117 def __init__(self, warnings_list): 1118 self._warnings = warnings_list 1119 self._last = 0 1120 1121 def __getattr__(self, attr): 1122 if len(self._warnings) > self._last: 1123 return getattr(self._warnings[-1], attr) 1124 elif attr in warnings.WarningMessage._WARNING_DETAILS: 1125 return None 1126 raise AttributeError("%r has no attribute %r" % (self, attr)) 1127 1128 @property 1129 def warnings(self): 1130 return self._warnings[self._last:] 1131 1132 def reset(self): 1133 self._last = len(self._warnings) 1134 1135 1136def _filterwarnings(filters, quiet=False): 1137 """Catch the warnings, then check if all the expected 1138 warnings have been raised and re-raise unexpected warnings. 1139 If 'quiet' is True, only re-raise the unexpected warnings. 1140 """ 1141 # Clear the warning registry of the calling module 1142 # in order to re-raise the warnings. 1143 frame = sys._getframe(2) 1144 registry = frame.f_globals.get('__warningregistry__') 1145 if registry: 1146 registry.clear() 1147 with warnings.catch_warnings(record=True) as w: 1148 # Set filter "always" to record all warnings. Because 1149 # test_warnings swap the module, we need to look up in 1150 # the sys.modules dictionary. 1151 sys.modules['warnings'].simplefilter("always") 1152 yield WarningsRecorder(w) 1153 # Filter the recorded warnings 1154 reraise = list(w) 1155 missing = [] 1156 for msg, cat in filters: 1157 seen = False 1158 for w in reraise[:]: 1159 warning = w.message 1160 # Filter out the matching messages 1161 if (re.match(msg, str(warning), re.I) and 1162 issubclass(warning.__class__, cat)): 1163 seen = True 1164 reraise.remove(w) 1165 if not seen and not quiet: 1166 # This filter caught nothing 1167 missing.append((msg, cat.__name__)) 1168 if reraise: 1169 raise AssertionError("unhandled warning %s" % reraise[0]) 1170 if missing: 1171 raise AssertionError("filter (%r, %s) did not catch any warning" % 1172 missing[0]) 1173 1174 1175@contextlib.contextmanager 1176def check_warnings(*filters, **kwargs): 1177 """Context manager to silence warnings. 1178 1179 Accept 2-tuples as positional arguments: 1180 ("message regexp", WarningCategory) 1181 1182 Optional argument: 1183 - if 'quiet' is True, it does not fail if a filter catches nothing 1184 (default True without argument, 1185 default False if some filters are defined) 1186 1187 Without argument, it defaults to: 1188 check_warnings(("", Warning), quiet=True) 1189 """ 1190 quiet = kwargs.get('quiet') 1191 if not filters: 1192 filters = (("", Warning),) 1193 # Preserve backward compatibility 1194 if quiet is None: 1195 quiet = True 1196 return _filterwarnings(filters, quiet) 1197 1198 1199@contextlib.contextmanager 1200def check_no_warnings(testcase, message='', category=Warning, force_gc=False): 1201 """Context manager to check that no warnings are emitted. 1202 1203 This context manager enables a given warning within its scope 1204 and checks that no warnings are emitted even with that warning 1205 enabled. 1206 1207 If force_gc is True, a garbage collection is attempted before checking 1208 for warnings. This may help to catch warnings emitted when objects 1209 are deleted, such as ResourceWarning. 1210 1211 Other keyword arguments are passed to warnings.filterwarnings(). 1212 """ 1213 with warnings.catch_warnings(record=True) as warns: 1214 warnings.filterwarnings('always', 1215 message=message, 1216 category=category) 1217 yield 1218 if force_gc: 1219 gc_collect() 1220 testcase.assertEqual(warns, []) 1221 1222 1223@contextlib.contextmanager 1224def check_no_resource_warning(testcase): 1225 """Context manager to check that no ResourceWarning is emitted. 1226 1227 Usage: 1228 1229 with check_no_resource_warning(self): 1230 f = open(...) 1231 ... 1232 del f 1233 1234 You must remove the object which may emit ResourceWarning before 1235 the end of the context manager. 1236 """ 1237 with check_no_warnings(testcase, category=ResourceWarning, force_gc=True): 1238 yield 1239 1240 1241class CleanImport(object): 1242 """Context manager to force import to return a new module reference. 1243 1244 This is useful for testing module-level behaviours, such as 1245 the emission of a DeprecationWarning on import. 1246 1247 Use like this: 1248 1249 with CleanImport("foo"): 1250 importlib.import_module("foo") # new reference 1251 """ 1252 1253 def __init__(self, *module_names): 1254 self.original_modules = sys.modules.copy() 1255 for module_name in module_names: 1256 if module_name in sys.modules: 1257 module = sys.modules[module_name] 1258 # It is possible that module_name is just an alias for 1259 # another module (e.g. stub for modules renamed in 3.x). 1260 # In that case, we also need delete the real module to clear 1261 # the import cache. 1262 if module.__name__ != module_name: 1263 del sys.modules[module.__name__] 1264 del sys.modules[module_name] 1265 1266 def __enter__(self): 1267 return self 1268 1269 def __exit__(self, *ignore_exc): 1270 sys.modules.update(self.original_modules) 1271 1272 1273class EnvironmentVarGuard(collections.abc.MutableMapping): 1274 1275 """Class to help protect the environment variable properly. Can be used as 1276 a context manager.""" 1277 1278 def __init__(self): 1279 self._environ = os.environ 1280 self._changed = {} 1281 1282 def __getitem__(self, envvar): 1283 return self._environ[envvar] 1284 1285 def __setitem__(self, envvar, value): 1286 # Remember the initial value on the first access 1287 if envvar not in self._changed: 1288 self._changed[envvar] = self._environ.get(envvar) 1289 self._environ[envvar] = value 1290 1291 def __delitem__(self, envvar): 1292 # Remember the initial value on the first access 1293 if envvar not in self._changed: 1294 self._changed[envvar] = self._environ.get(envvar) 1295 if envvar in self._environ: 1296 del self._environ[envvar] 1297 1298 def keys(self): 1299 return self._environ.keys() 1300 1301 def __iter__(self): 1302 return iter(self._environ) 1303 1304 def __len__(self): 1305 return len(self._environ) 1306 1307 def set(self, envvar, value): 1308 self[envvar] = value 1309 1310 def unset(self, envvar): 1311 del self[envvar] 1312 1313 def __enter__(self): 1314 return self 1315 1316 def __exit__(self, *ignore_exc): 1317 for (k, v) in self._changed.items(): 1318 if v is None: 1319 if k in self._environ: 1320 del self._environ[k] 1321 else: 1322 self._environ[k] = v 1323 os.environ = self._environ 1324 1325 1326class DirsOnSysPath(object): 1327 """Context manager to temporarily add directories to sys.path. 1328 1329 This makes a copy of sys.path, appends any directories given 1330 as positional arguments, then reverts sys.path to the copied 1331 settings when the context ends. 1332 1333 Note that *all* sys.path modifications in the body of the 1334 context manager, including replacement of the object, 1335 will be reverted at the end of the block. 1336 """ 1337 1338 def __init__(self, *paths): 1339 self.original_value = sys.path[:] 1340 self.original_object = sys.path 1341 sys.path.extend(paths) 1342 1343 def __enter__(self): 1344 return self 1345 1346 def __exit__(self, *ignore_exc): 1347 sys.path = self.original_object 1348 sys.path[:] = self.original_value 1349 1350 1351class TransientResource(object): 1352 1353 """Raise ResourceDenied if an exception is raised while the context manager 1354 is in effect that matches the specified exception and attributes.""" 1355 1356 def __init__(self, exc, **kwargs): 1357 self.exc = exc 1358 self.attrs = kwargs 1359 1360 def __enter__(self): 1361 return self 1362 1363 def __exit__(self, type_=None, value=None, traceback=None): 1364 """If type_ is a subclass of self.exc and value has attributes matching 1365 self.attrs, raise ResourceDenied. Otherwise let the exception 1366 propagate (if any).""" 1367 if type_ is not None and issubclass(self.exc, type_): 1368 for attr, attr_value in self.attrs.items(): 1369 if not hasattr(value, attr): 1370 break 1371 if getattr(value, attr) != attr_value: 1372 break 1373 else: 1374 raise ResourceDenied("an optional resource is not available") 1375 1376# Context managers that raise ResourceDenied when various issues 1377# with the Internet connection manifest themselves as exceptions. 1378# XXX deprecate these and use transient_internet() instead 1379time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 1380socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1381ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1382 1383 1384@contextlib.contextmanager 1385def captured_output(stream_name): 1386 """Return a context manager used by captured_stdout/stdin/stderr 1387 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1388 import io 1389 orig_stdout = getattr(sys, stream_name) 1390 setattr(sys, stream_name, io.StringIO()) 1391 try: 1392 yield getattr(sys, stream_name) 1393 finally: 1394 setattr(sys, stream_name, orig_stdout) 1395 1396def captured_stdout(): 1397 """Capture the output of sys.stdout: 1398 1399 with captured_stdout() as stdout: 1400 print("hello") 1401 self.assertEqual(stdout.getvalue(), "hello\\n") 1402 """ 1403 return captured_output("stdout") 1404 1405def captured_stderr(): 1406 """Capture the output of sys.stderr: 1407 1408 with captured_stderr() as stderr: 1409 print("hello", file=sys.stderr) 1410 self.assertEqual(stderr.getvalue(), "hello\\n") 1411 """ 1412 return captured_output("stderr") 1413 1414def captured_stdin(): 1415 """Capture the input to sys.stdin: 1416 1417 with captured_stdin() as stdin: 1418 stdin.write('hello\\n') 1419 stdin.seek(0) 1420 # call test code that consumes from sys.stdin 1421 captured = input() 1422 self.assertEqual(captured, "hello") 1423 """ 1424 return captured_output("stdin") 1425 1426 1427def gc_collect(): 1428 """Force as many objects as possible to be collected. 1429 1430 In non-CPython implementations of Python, this is needed because timely 1431 deallocation is not guaranteed by the garbage collector. (Even in CPython 1432 this can be the case in case of reference cycles.) This means that __del__ 1433 methods may be called later than expected and weakrefs may remain alive for 1434 longer than expected. This function tries its best to force all garbage 1435 objects to disappear. 1436 """ 1437 import gc 1438 gc.collect() 1439 if is_jython: 1440 time.sleep(0.1) 1441 gc.collect() 1442 gc.collect() 1443 1444@contextlib.contextmanager 1445def disable_gc(): 1446 import gc 1447 have_gc = gc.isenabled() 1448 gc.disable() 1449 try: 1450 yield 1451 finally: 1452 if have_gc: 1453 gc.enable() 1454 1455 1456def python_is_optimized(): 1457 """Find if Python was built with optimizations.""" 1458 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1459 final_opt = "" 1460 for opt in cflags.split(): 1461 if opt.startswith('-O'): 1462 final_opt = opt 1463 return final_opt not in ('', '-O0', '-Og') 1464 1465 1466_header = 'nP' 1467_align = '0n' 1468if hasattr(sys, "getobjects"): 1469 _header = '2P' + _header 1470 _align = '0P' 1471_vheader = _header + 'n' 1472 1473def calcobjsize(fmt): 1474 return struct.calcsize(_header + fmt + _align) 1475 1476def calcvobjsize(fmt): 1477 return struct.calcsize(_vheader + fmt + _align) 1478 1479 1480_TPFLAGS_HAVE_GC = 1<<14 1481_TPFLAGS_HEAPTYPE = 1<<9 1482 1483def check_sizeof(test, o, size): 1484 import _testinternalcapi 1485 result = sys.getsizeof(o) 1486 # add GC header size 1487 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1488 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1489 size += _testinternalcapi.SIZEOF_PYGC_HEAD 1490 msg = 'wrong size for %s: got %d, expected %d' \ 1491 % (type(o), result, size) 1492 test.assertEqual(result, size, msg) 1493 1494#======================================================================= 1495# Decorator for running a function in a different locale, correctly resetting 1496# it afterwards. 1497 1498def run_with_locale(catstr, *locales): 1499 def decorator(func): 1500 def inner(*args, **kwds): 1501 try: 1502 import locale 1503 category = getattr(locale, catstr) 1504 orig_locale = locale.setlocale(category) 1505 except AttributeError: 1506 # if the test author gives us an invalid category string 1507 raise 1508 except: 1509 # cannot retrieve original locale, so do nothing 1510 locale = orig_locale = None 1511 else: 1512 for loc in locales: 1513 try: 1514 locale.setlocale(category, loc) 1515 break 1516 except: 1517 pass 1518 1519 # now run the function, resetting the locale on exceptions 1520 try: 1521 return func(*args, **kwds) 1522 finally: 1523 if locale and orig_locale: 1524 locale.setlocale(category, orig_locale) 1525 inner.__name__ = func.__name__ 1526 inner.__doc__ = func.__doc__ 1527 return inner 1528 return decorator 1529 1530#======================================================================= 1531# Decorator for running a function in a specific timezone, correctly 1532# resetting it afterwards. 1533 1534def run_with_tz(tz): 1535 def decorator(func): 1536 def inner(*args, **kwds): 1537 try: 1538 tzset = time.tzset 1539 except AttributeError: 1540 raise unittest.SkipTest("tzset required") 1541 if 'TZ' in os.environ: 1542 orig_tz = os.environ['TZ'] 1543 else: 1544 orig_tz = None 1545 os.environ['TZ'] = tz 1546 tzset() 1547 1548 # now run the function, resetting the tz on exceptions 1549 try: 1550 return func(*args, **kwds) 1551 finally: 1552 if orig_tz is None: 1553 del os.environ['TZ'] 1554 else: 1555 os.environ['TZ'] = orig_tz 1556 time.tzset() 1557 1558 inner.__name__ = func.__name__ 1559 inner.__doc__ = func.__doc__ 1560 return inner 1561 return decorator 1562 1563#======================================================================= 1564# Big-memory-test support. Separate from 'resources' because memory use 1565# should be configurable. 1566 1567# Some handy shorthands. Note that these are used for byte-limits as well 1568# as size-limits, in the various bigmem tests 1569_1M = 1024*1024 1570_1G = 1024 * _1M 1571_2G = 2 * _1G 1572_4G = 4 * _1G 1573 1574MAX_Py_ssize_t = sys.maxsize 1575 1576def set_memlimit(limit): 1577 global max_memuse 1578 global real_max_memuse 1579 sizes = { 1580 'k': 1024, 1581 'm': _1M, 1582 'g': _1G, 1583 't': 1024*_1G, 1584 } 1585 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1586 re.IGNORECASE | re.VERBOSE) 1587 if m is None: 1588 raise ValueError('Invalid memory limit %r' % (limit,)) 1589 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1590 real_max_memuse = memlimit 1591 if memlimit > MAX_Py_ssize_t: 1592 memlimit = MAX_Py_ssize_t 1593 if memlimit < _2G - 1: 1594 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1595 max_memuse = memlimit 1596 1597class _MemoryWatchdog: 1598 """An object which periodically watches the process' memory consumption 1599 and prints it out. 1600 """ 1601 1602 def __init__(self): 1603 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1604 self.started = False 1605 1606 def start(self): 1607 try: 1608 f = open(self.procfile, 'r') 1609 except OSError as e: 1610 warnings.warn('/proc not available for stats: {}'.format(e), 1611 RuntimeWarning) 1612 sys.stderr.flush() 1613 return 1614 1615 with f: 1616 watchdog_script = findfile("memory_watchdog.py") 1617 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1618 stdin=f, 1619 stderr=subprocess.DEVNULL) 1620 self.started = True 1621 1622 def stop(self): 1623 if self.started: 1624 self.mem_watchdog.terminate() 1625 self.mem_watchdog.wait() 1626 1627 1628def bigmemtest(size, memuse, dry_run=True): 1629 """Decorator for bigmem tests. 1630 1631 'size' is a requested size for the test (in arbitrary, test-interpreted 1632 units.) 'memuse' is the number of bytes per unit for the test, or a good 1633 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1634 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1635 1636 The 'size' argument is normally passed to the decorated test method as an 1637 extra argument. If 'dry_run' is true, the value passed to the test method 1638 may be less than the requested value. If 'dry_run' is false, it means the 1639 test doesn't support dummy runs when -M is not specified. 1640 """ 1641 def decorator(f): 1642 def wrapper(self): 1643 size = wrapper.size 1644 memuse = wrapper.memuse 1645 if not real_max_memuse: 1646 maxsize = 5147 1647 else: 1648 maxsize = size 1649 1650 if ((real_max_memuse or not dry_run) 1651 and real_max_memuse < maxsize * memuse): 1652 raise unittest.SkipTest( 1653 "not enough memory: %.1fG minimum needed" 1654 % (size * memuse / (1024 ** 3))) 1655 1656 if real_max_memuse and verbose: 1657 print() 1658 print(" ... expected peak memory use: {peak:.1f}G" 1659 .format(peak=size * memuse / (1024 ** 3))) 1660 watchdog = _MemoryWatchdog() 1661 watchdog.start() 1662 else: 1663 watchdog = None 1664 1665 try: 1666 return f(self, maxsize) 1667 finally: 1668 if watchdog: 1669 watchdog.stop() 1670 1671 wrapper.size = size 1672 wrapper.memuse = memuse 1673 return wrapper 1674 return decorator 1675 1676def bigaddrspacetest(f): 1677 """Decorator for tests that fill the address space.""" 1678 def wrapper(self): 1679 if max_memuse < MAX_Py_ssize_t: 1680 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1681 raise unittest.SkipTest( 1682 "not enough memory: try a 32-bit build instead") 1683 else: 1684 raise unittest.SkipTest( 1685 "not enough memory: %.1fG minimum needed" 1686 % (MAX_Py_ssize_t / (1024 ** 3))) 1687 else: 1688 return f(self) 1689 return wrapper 1690 1691#======================================================================= 1692# unittest integration. 1693 1694class BasicTestRunner: 1695 def run(self, test): 1696 result = unittest.TestResult() 1697 test(result) 1698 return result 1699 1700def _id(obj): 1701 return obj 1702 1703def requires_resource(resource): 1704 if resource == 'gui' and not _is_gui_available(): 1705 return unittest.skip(_is_gui_available.reason) 1706 if is_resource_enabled(resource): 1707 return _id 1708 else: 1709 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1710 1711def cpython_only(test): 1712 """ 1713 Decorator for tests only applicable on CPython. 1714 """ 1715 return impl_detail(cpython=True)(test) 1716 1717def impl_detail(msg=None, **guards): 1718 if check_impl_detail(**guards): 1719 return _id 1720 if msg is None: 1721 guardnames, default = _parse_guards(guards) 1722 if default: 1723 msg = "implementation detail not available on {0}" 1724 else: 1725 msg = "implementation detail specific to {0}" 1726 guardnames = sorted(guardnames.keys()) 1727 msg = msg.format(' or '.join(guardnames)) 1728 return unittest.skip(msg) 1729 1730def _parse_guards(guards): 1731 # Returns a tuple ({platform_name: run_me}, default_value) 1732 if not guards: 1733 return ({'cpython': True}, False) 1734 is_true = list(guards.values())[0] 1735 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1736 return (guards, not is_true) 1737 1738# Use the following check to guard CPython's implementation-specific tests -- 1739# or to run them only on the implementation(s) guarded by the arguments. 1740def check_impl_detail(**guards): 1741 """This function returns True or False depending on the host platform. 1742 Examples: 1743 if check_impl_detail(): # only on CPython (default) 1744 if check_impl_detail(jython=True): # only on Jython 1745 if check_impl_detail(cpython=False): # everywhere except on CPython 1746 """ 1747 guards, default = _parse_guards(guards) 1748 return guards.get(platform.python_implementation().lower(), default) 1749 1750 1751def no_tracing(func): 1752 """Decorator to temporarily turn off tracing for the duration of a test.""" 1753 if not hasattr(sys, 'gettrace'): 1754 return func 1755 else: 1756 @functools.wraps(func) 1757 def wrapper(*args, **kwargs): 1758 original_trace = sys.gettrace() 1759 try: 1760 sys.settrace(None) 1761 return func(*args, **kwargs) 1762 finally: 1763 sys.settrace(original_trace) 1764 return wrapper 1765 1766 1767def refcount_test(test): 1768 """Decorator for tests which involve reference counting. 1769 1770 To start, the decorator does not run the test if is not run by CPython. 1771 After that, any trace function is unset during the test to prevent 1772 unexpected refcounts caused by the trace function. 1773 1774 """ 1775 return no_tracing(cpython_only(test)) 1776 1777 1778def _filter_suite(suite, pred): 1779 """Recursively filter test cases in a suite based on a predicate.""" 1780 newtests = [] 1781 for test in suite._tests: 1782 if isinstance(test, unittest.TestSuite): 1783 _filter_suite(test, pred) 1784 newtests.append(test) 1785 else: 1786 if pred(test): 1787 newtests.append(test) 1788 suite._tests = newtests 1789 1790def _run_suite(suite): 1791 """Run tests from a unittest.TestSuite-derived class.""" 1792 runner = get_test_runner(sys.stdout, 1793 verbosity=verbose, 1794 capture_output=(junit_xml_list is not None)) 1795 1796 result = runner.run(suite) 1797 1798 if junit_xml_list is not None: 1799 junit_xml_list.append(result.get_xml_element()) 1800 1801 if not result.testsRun and not result.skipped: 1802 raise TestDidNotRun 1803 if not result.wasSuccessful(): 1804 if len(result.errors) == 1 and not result.failures: 1805 err = result.errors[0][1] 1806 elif len(result.failures) == 1 and not result.errors: 1807 err = result.failures[0][1] 1808 else: 1809 err = "multiple errors occurred" 1810 if not verbose: err += "; run in verbose mode for details" 1811 raise TestFailed(err) 1812 1813 1814# By default, don't filter tests 1815_match_test_func = None 1816 1817_accept_test_patterns = None 1818_ignore_test_patterns = None 1819 1820 1821def match_test(test): 1822 # Function used by support.run_unittest() and regrtest --list-cases 1823 if _match_test_func is None: 1824 return True 1825 else: 1826 return _match_test_func(test.id()) 1827 1828 1829def _is_full_match_test(pattern): 1830 # If a pattern contains at least one dot, it's considered 1831 # as a full test identifier. 1832 # Example: 'test.test_os.FileTests.test_access'. 1833 # 1834 # ignore patterns which contain fnmatch patterns: '*', '?', '[...]' 1835 # or '[!...]'. For example, ignore 'test_access*'. 1836 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1837 1838 1839def set_match_tests(accept_patterns=None, ignore_patterns=None): 1840 global _match_test_func, _accept_test_patterns, _ignore_test_patterns 1841 1842 1843 if accept_patterns is None: 1844 accept_patterns = () 1845 if ignore_patterns is None: 1846 ignore_patterns = () 1847 1848 accept_func = ignore_func = None 1849 1850 if accept_patterns != _accept_test_patterns: 1851 accept_patterns, accept_func = _compile_match_function(accept_patterns) 1852 if ignore_patterns != _ignore_test_patterns: 1853 ignore_patterns, ignore_func = _compile_match_function(ignore_patterns) 1854 1855 # Create a copy since patterns can be mutable and so modified later 1856 _accept_test_patterns = tuple(accept_patterns) 1857 _ignore_test_patterns = tuple(ignore_patterns) 1858 1859 if accept_func is not None or ignore_func is not None: 1860 def match_function(test_id): 1861 accept = True 1862 ignore = False 1863 if accept_func: 1864 accept = accept_func(test_id) 1865 if ignore_func: 1866 ignore = ignore_func(test_id) 1867 return accept and not ignore 1868 1869 _match_test_func = match_function 1870 1871 1872def _compile_match_function(patterns): 1873 if not patterns: 1874 func = None 1875 # set_match_tests(None) behaves as set_match_tests(()) 1876 patterns = () 1877 elif all(map(_is_full_match_test, patterns)): 1878 # Simple case: all patterns are full test identifier. 1879 # The test.bisect_cmd utility only uses such full test identifiers. 1880 func = set(patterns).__contains__ 1881 else: 1882 regex = '|'.join(map(fnmatch.translate, patterns)) 1883 # The search *is* case sensitive on purpose: 1884 # don't use flags=re.IGNORECASE 1885 regex_match = re.compile(regex).match 1886 1887 def match_test_regex(test_id): 1888 if regex_match(test_id): 1889 # The regex matches the whole identifier, for example 1890 # 'test.test_os.FileTests.test_access'. 1891 return True 1892 else: 1893 # Try to match parts of the test identifier. 1894 # For example, split 'test.test_os.FileTests.test_access' 1895 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1896 return any(map(regex_match, test_id.split("."))) 1897 1898 func = match_test_regex 1899 1900 return patterns, func 1901 1902 1903def run_unittest(*classes): 1904 """Run tests from unittest.TestCase-derived classes.""" 1905 valid_types = (unittest.TestSuite, unittest.TestCase) 1906 suite = unittest.TestSuite() 1907 for cls in classes: 1908 if isinstance(cls, str): 1909 if cls in sys.modules: 1910 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1911 else: 1912 raise ValueError("str arguments must be keys in sys.modules") 1913 elif isinstance(cls, valid_types): 1914 suite.addTest(cls) 1915 else: 1916 suite.addTest(unittest.makeSuite(cls)) 1917 _filter_suite(suite, match_test) 1918 _run_suite(suite) 1919 1920#======================================================================= 1921# Check for the presence of docstrings. 1922 1923# Rather than trying to enumerate all the cases where docstrings may be 1924# disabled, we just check for that directly 1925 1926def _check_docstrings(): 1927 """Just used to check if docstrings are enabled""" 1928 1929MISSING_C_DOCSTRINGS = (check_impl_detail() and 1930 sys.platform != 'win32' and 1931 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1932 1933HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1934 not MISSING_C_DOCSTRINGS) 1935 1936requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1937 "test requires docstrings") 1938 1939 1940#======================================================================= 1941# doctest driver. 1942 1943def run_doctest(module, verbosity=None, optionflags=0): 1944 """Run doctest on the given module. Return (#failures, #tests). 1945 1946 If optional argument verbosity is not specified (or is None), pass 1947 support's belief about verbosity on to doctest. Else doctest's 1948 usual behavior is used (it searches sys.argv for -v). 1949 """ 1950 1951 import doctest 1952 1953 if verbosity is None: 1954 verbosity = verbose 1955 else: 1956 verbosity = None 1957 1958 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1959 if f: 1960 raise TestFailed("%d of %d doctests failed" % (f, t)) 1961 if verbose: 1962 print('doctest (%s) ... %d tests with zero failures' % 1963 (module.__name__, t)) 1964 return f, t 1965 1966 1967#======================================================================= 1968# Support for saving and restoring the imported modules. 1969 1970def print_warning(msg): 1971 # bpo-39983: Print into sys.__stderr__ to display the warning even 1972 # when sys.stderr is captured temporarily by a test 1973 for line in msg.splitlines(): 1974 print(f"Warning -- {line}", file=sys.__stderr__, flush=True) 1975 1976def modules_setup(): 1977 return sys.modules.copy(), 1978 1979def modules_cleanup(oldmodules): 1980 # Encoders/decoders are registered permanently within the internal 1981 # codec cache. If we destroy the corresponding modules their 1982 # globals will be set to None which will trip up the cached functions. 1983 encodings = [(k, v) for k, v in sys.modules.items() 1984 if k.startswith('encodings.')] 1985 sys.modules.clear() 1986 sys.modules.update(encodings) 1987 # XXX: This kind of problem can affect more than just encodings. In particular 1988 # extension modules (such as _ssl) don't cope with reloading properly. 1989 # Really, test modules should be cleaning out the test specific modules they 1990 # know they added (ala test_runpy) rather than relying on this function (as 1991 # test_importhooks and test_pkg do currently). 1992 # Implicitly imported *real* modules should be left alone (see issue 10556). 1993 sys.modules.update(oldmodules) 1994 1995#======================================================================= 1996# Threading support to prevent reporting refleaks when running regrtest.py -R 1997 1998# Flag used by saved_test_environment of test.libregrtest.save_env, 1999# to check if a test modified the environment. The flag should be set to False 2000# before running a new test. 2001# 2002# For example, threading_cleanup() sets the flag is the function fails 2003# to cleanup threads. 2004environment_altered = False 2005 2006# NOTE: we use thread._count() rather than threading.enumerate() (or the 2007# moral equivalent thereof) because a threading.Thread object is still alive 2008# until its __bootstrap() method has returned, even after it has been 2009# unregistered from the threading module. 2010# thread._count(), on the other hand, only gets decremented *after* the 2011# __bootstrap() method has returned, which gives us reliable reference counts 2012# at the end of a test run. 2013 2014def threading_setup(): 2015 return _thread._count(), threading._dangling.copy() 2016 2017def threading_cleanup(*original_values): 2018 global environment_altered 2019 2020 _MAX_COUNT = 100 2021 2022 for count in range(_MAX_COUNT): 2023 values = _thread._count(), threading._dangling 2024 if values == original_values: 2025 break 2026 2027 if not count: 2028 # Display a warning at the first iteration 2029 environment_altered = True 2030 dangling_threads = values[1] 2031 print_warning(f"threading_cleanup() failed to cleanup " 2032 f"{values[0] - original_values[0]} threads " 2033 f"(count: {values[0]}, " 2034 f"dangling: {len(dangling_threads)})") 2035 for thread in dangling_threads: 2036 print_warning(f"Dangling thread: {thread!r}") 2037 2038 # Don't hold references to threads 2039 dangling_threads = None 2040 values = None 2041 2042 time.sleep(0.01) 2043 gc_collect() 2044 2045 2046def reap_threads(func): 2047 """Use this function when threads are being used. This will 2048 ensure that the threads are cleaned up even when the test fails. 2049 """ 2050 @functools.wraps(func) 2051 def decorator(*args): 2052 key = threading_setup() 2053 try: 2054 return func(*args) 2055 finally: 2056 threading_cleanup(*key) 2057 return decorator 2058 2059 2060@contextlib.contextmanager 2061def wait_threads_exit(timeout=None): 2062 """ 2063 bpo-31234: Context manager to wait until all threads created in the with 2064 statement exit. 2065 2066 Use _thread.count() to check if threads exited. Indirectly, wait until 2067 threads exit the internal t_bootstrap() C function of the _thread module. 2068 2069 threading_setup() and threading_cleanup() are designed to emit a warning 2070 if a test leaves running threads in the background. This context manager 2071 is designed to cleanup threads started by the _thread.start_new_thread() 2072 which doesn't allow to wait for thread exit, whereas thread.Thread has a 2073 join() method. 2074 """ 2075 if timeout is None: 2076 timeout = SHORT_TIMEOUT 2077 old_count = _thread._count() 2078 try: 2079 yield 2080 finally: 2081 start_time = time.monotonic() 2082 deadline = start_time + timeout 2083 while True: 2084 count = _thread._count() 2085 if count <= old_count: 2086 break 2087 if time.monotonic() > deadline: 2088 dt = time.monotonic() - start_time 2089 msg = (f"wait_threads() failed to cleanup {count - old_count} " 2090 f"threads after {dt:.1f} seconds " 2091 f"(count: {count}, old count: {old_count})") 2092 raise AssertionError(msg) 2093 time.sleep(0.010) 2094 gc_collect() 2095 2096 2097def join_thread(thread, timeout=None): 2098 """Join a thread. Raise an AssertionError if the thread is still alive 2099 after timeout seconds. 2100 """ 2101 if timeout is None: 2102 timeout = SHORT_TIMEOUT 2103 thread.join(timeout) 2104 if thread.is_alive(): 2105 msg = f"failed to join the thread in {timeout:.1f} seconds" 2106 raise AssertionError(msg) 2107 2108 2109def reap_children(): 2110 """Use this function at the end of test_main() whenever sub-processes 2111 are started. This will help ensure that no extra children (zombies) 2112 stick around to hog resources and create problems when looking 2113 for refleaks. 2114 """ 2115 global environment_altered 2116 2117 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 2118 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 2119 return 2120 2121 # Reap all our dead child processes so we don't leave zombies around. 2122 # These hog resources and might be causing some of the buildbots to die. 2123 while True: 2124 try: 2125 # Read the exit status of any child process which already completed 2126 pid, status = os.waitpid(-1, os.WNOHANG) 2127 except OSError: 2128 break 2129 2130 if pid == 0: 2131 break 2132 2133 print_warning(f"reap_children() reaped child process {pid}") 2134 environment_altered = True 2135 2136 2137@contextlib.contextmanager 2138def start_threads(threads, unlock=None): 2139 import faulthandler 2140 threads = list(threads) 2141 started = [] 2142 try: 2143 try: 2144 for t in threads: 2145 t.start() 2146 started.append(t) 2147 except: 2148 if verbose: 2149 print("Can't start %d threads, only %d threads started" % 2150 (len(threads), len(started))) 2151 raise 2152 yield 2153 finally: 2154 try: 2155 if unlock: 2156 unlock() 2157 endtime = starttime = time.monotonic() 2158 for timeout in range(1, 16): 2159 endtime += 60 2160 for t in started: 2161 t.join(max(endtime - time.monotonic(), 0.01)) 2162 started = [t for t in started if t.is_alive()] 2163 if not started: 2164 break 2165 if verbose: 2166 print('Unable to join %d threads during a period of ' 2167 '%d minutes' % (len(started), timeout)) 2168 finally: 2169 started = [t for t in started if t.is_alive()] 2170 if started: 2171 faulthandler.dump_traceback(sys.stdout) 2172 raise AssertionError('Unable to join %d threads' % len(started)) 2173 2174@contextlib.contextmanager 2175def swap_attr(obj, attr, new_val): 2176 """Temporary swap out an attribute with a new object. 2177 2178 Usage: 2179 with swap_attr(obj, "attr", 5): 2180 ... 2181 2182 This will set obj.attr to 5 for the duration of the with: block, 2183 restoring the old value at the end of the block. If `attr` doesn't 2184 exist on `obj`, it will be created and then deleted at the end of the 2185 block. 2186 2187 The old value (or None if it doesn't exist) will be assigned to the 2188 target of the "as" clause, if there is one. 2189 """ 2190 if hasattr(obj, attr): 2191 real_val = getattr(obj, attr) 2192 setattr(obj, attr, new_val) 2193 try: 2194 yield real_val 2195 finally: 2196 setattr(obj, attr, real_val) 2197 else: 2198 setattr(obj, attr, new_val) 2199 try: 2200 yield 2201 finally: 2202 if hasattr(obj, attr): 2203 delattr(obj, attr) 2204 2205@contextlib.contextmanager 2206def swap_item(obj, item, new_val): 2207 """Temporary swap out an item with a new object. 2208 2209 Usage: 2210 with swap_item(obj, "item", 5): 2211 ... 2212 2213 This will set obj["item"] to 5 for the duration of the with: block, 2214 restoring the old value at the end of the block. If `item` doesn't 2215 exist on `obj`, it will be created and then deleted at the end of the 2216 block. 2217 2218 The old value (or None if it doesn't exist) will be assigned to the 2219 target of the "as" clause, if there is one. 2220 """ 2221 if item in obj: 2222 real_val = obj[item] 2223 obj[item] = new_val 2224 try: 2225 yield real_val 2226 finally: 2227 obj[item] = real_val 2228 else: 2229 obj[item] = new_val 2230 try: 2231 yield 2232 finally: 2233 if item in obj: 2234 del obj[item] 2235 2236def args_from_interpreter_flags(): 2237 """Return a list of command-line arguments reproducing the current 2238 settings in sys.flags and sys.warnoptions.""" 2239 return subprocess._args_from_interpreter_flags() 2240 2241def optim_args_from_interpreter_flags(): 2242 """Return a list of command-line arguments reproducing the current 2243 optimization settings in sys.flags.""" 2244 return subprocess._optim_args_from_interpreter_flags() 2245 2246 2247class Matcher(object): 2248 2249 _partial_matches = ('msg', 'message') 2250 2251 def matches(self, d, **kwargs): 2252 """ 2253 Try to match a single dict with the supplied arguments. 2254 2255 Keys whose values are strings and which are in self._partial_matches 2256 will be checked for partial (i.e. substring) matches. You can extend 2257 this scheme to (for example) do regular expression matching, etc. 2258 """ 2259 result = True 2260 for k in kwargs: 2261 v = kwargs[k] 2262 dv = d.get(k) 2263 if not self.match_value(k, dv, v): 2264 result = False 2265 break 2266 return result 2267 2268 def match_value(self, k, dv, v): 2269 """ 2270 Try to match a single stored value (dv) with a supplied value (v). 2271 """ 2272 if type(v) != type(dv): 2273 result = False 2274 elif type(dv) is not str or k not in self._partial_matches: 2275 result = (v == dv) 2276 else: 2277 result = dv.find(v) >= 0 2278 return result 2279 2280 2281_can_symlink = None 2282def can_symlink(): 2283 global _can_symlink 2284 if _can_symlink is not None: 2285 return _can_symlink 2286 symlink_path = TESTFN + "can_symlink" 2287 try: 2288 os.symlink(TESTFN, symlink_path) 2289 can = True 2290 except (OSError, NotImplementedError, AttributeError): 2291 can = False 2292 else: 2293 os.remove(symlink_path) 2294 _can_symlink = can 2295 return can 2296 2297def skip_unless_symlink(test): 2298 """Skip decorator for tests that require functional symlink""" 2299 ok = can_symlink() 2300 msg = "Requires functional symlink implementation" 2301 return test if ok else unittest.skip(msg)(test) 2302 2303_buggy_ucrt = None 2304def skip_if_buggy_ucrt_strfptime(test): 2305 """ 2306 Skip decorator for tests that use buggy strptime/strftime 2307 2308 If the UCRT bugs are present time.localtime().tm_zone will be 2309 an empty string, otherwise we assume the UCRT bugs are fixed 2310 2311 See bpo-37552 [Windows] strptime/strftime return invalid 2312 results with UCRT version 17763.615 2313 """ 2314 import locale 2315 global _buggy_ucrt 2316 if _buggy_ucrt is None: 2317 if(sys.platform == 'win32' and 2318 locale.getdefaultlocale()[1] == 'cp65001' and 2319 time.localtime().tm_zone == ''): 2320 _buggy_ucrt = True 2321 else: 2322 _buggy_ucrt = False 2323 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 2324 2325class PythonSymlink: 2326 """Creates a symlink for the current Python executable""" 2327 def __init__(self, link=None): 2328 self.link = link or os.path.abspath(TESTFN) 2329 self._linked = [] 2330 self.real = os.path.realpath(sys.executable) 2331 self._also_link = [] 2332 2333 self._env = None 2334 2335 self._platform_specific() 2336 2337 def _platform_specific(self): 2338 pass 2339 2340 if sys.platform == "win32": 2341 def _platform_specific(self): 2342 import _winapi 2343 2344 if os.path.lexists(self.real) and not os.path.exists(self.real): 2345 # App symlink appears to not exist, but we want the 2346 # real executable here anyway 2347 self.real = _winapi.GetModuleFileName(0) 2348 2349 dll = _winapi.GetModuleFileName(sys.dllhandle) 2350 src_dir = os.path.dirname(dll) 2351 dest_dir = os.path.dirname(self.link) 2352 self._also_link.append(( 2353 dll, 2354 os.path.join(dest_dir, os.path.basename(dll)) 2355 )) 2356 for runtime in glob.glob(os.path.join(glob.escape(src_dir), "vcruntime*.dll")): 2357 self._also_link.append(( 2358 runtime, 2359 os.path.join(dest_dir, os.path.basename(runtime)) 2360 )) 2361 2362 self._env = {k.upper(): os.getenv(k) for k in os.environ} 2363 self._env["PYTHONHOME"] = os.path.dirname(self.real) 2364 if sysconfig.is_python_build(True): 2365 self._env["PYTHONPATH"] = os.path.dirname(os.__file__) 2366 2367 def __enter__(self): 2368 os.symlink(self.real, self.link) 2369 self._linked.append(self.link) 2370 for real, link in self._also_link: 2371 os.symlink(real, link) 2372 self._linked.append(link) 2373 return self 2374 2375 def __exit__(self, exc_type, exc_value, exc_tb): 2376 for link in self._linked: 2377 try: 2378 os.remove(link) 2379 except IOError as ex: 2380 if verbose: 2381 print("failed to clean up {}: {}".format(link, ex)) 2382 2383 def _call(self, python, args, env, returncode): 2384 cmd = [python, *args] 2385 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 2386 stderr=subprocess.PIPE, env=env) 2387 r = p.communicate() 2388 if p.returncode != returncode: 2389 if verbose: 2390 print(repr(r[0])) 2391 print(repr(r[1]), file=sys.stderr) 2392 raise RuntimeError( 2393 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 2394 return r 2395 2396 def call_real(self, *args, returncode=0): 2397 return self._call(self.real, args, None, returncode) 2398 2399 def call_link(self, *args, returncode=0): 2400 return self._call(self.link, args, self._env, returncode) 2401 2402 2403_can_xattr = None 2404def can_xattr(): 2405 import tempfile 2406 global _can_xattr 2407 if _can_xattr is not None: 2408 return _can_xattr 2409 if not hasattr(os, "setxattr"): 2410 can = False 2411 else: 2412 tmp_dir = tempfile.mkdtemp() 2413 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 2414 try: 2415 with open(TESTFN, "wb") as fp: 2416 try: 2417 # TESTFN & tempfile may use different file systems with 2418 # different capabilities 2419 os.setxattr(tmp_fp, b"user.test", b"") 2420 os.setxattr(tmp_name, b"trusted.foo", b"42") 2421 os.setxattr(fp.fileno(), b"user.test", b"") 2422 # Kernels < 2.6.39 don't respect setxattr flags. 2423 kernel_version = platform.release() 2424 m = re.match(r"2.6.(\d{1,2})", kernel_version) 2425 can = m is None or int(m.group(1)) >= 39 2426 except OSError: 2427 can = False 2428 finally: 2429 unlink(TESTFN) 2430 unlink(tmp_name) 2431 rmdir(tmp_dir) 2432 _can_xattr = can 2433 return can 2434 2435def skip_unless_xattr(test): 2436 """Skip decorator for tests that require functional extended attributes""" 2437 ok = can_xattr() 2438 msg = "no non-broken extended attribute support" 2439 return test if ok else unittest.skip(msg)(test) 2440 2441def skip_if_pgo_task(test): 2442 """Skip decorator for tests not run in (non-extended) PGO task""" 2443 ok = not PGO or PGO_EXTENDED 2444 msg = "Not run for (non-extended) PGO task" 2445 return test if ok else unittest.skip(msg)(test) 2446 2447 2448def fs_is_case_insensitive(directory): 2449 """Detects if the file system for the specified directory is case-insensitive.""" 2450 import tempfile 2451 with tempfile.NamedTemporaryFile(dir=directory) as base: 2452 base_path = base.name 2453 case_path = base_path.upper() 2454 if case_path == base_path: 2455 case_path = base_path.lower() 2456 try: 2457 return os.path.samefile(base_path, case_path) 2458 except FileNotFoundError: 2459 return False 2460 2461 2462def detect_api_mismatch(ref_api, other_api, *, ignore=()): 2463 """Returns the set of items in ref_api not in other_api, except for a 2464 defined list of items to be ignored in this check. 2465 2466 By default this skips private attributes beginning with '_' but 2467 includes all magic methods, i.e. those starting and ending in '__'. 2468 """ 2469 missing_items = set(dir(ref_api)) - set(dir(other_api)) 2470 if ignore: 2471 missing_items -= set(ignore) 2472 missing_items = set(m for m in missing_items 2473 if not m.startswith('_') or m.endswith('__')) 2474 return missing_items 2475 2476 2477def check__all__(test_case, module, name_of_module=None, extra=(), 2478 blacklist=()): 2479 """Assert that the __all__ variable of 'module' contains all public names. 2480 2481 The module's public names (its API) are detected automatically based on 2482 whether they match the public name convention and were defined in 2483 'module'. 2484 2485 The 'name_of_module' argument can specify (as a string or tuple thereof) 2486 what module(s) an API could be defined in in order to be detected as a 2487 public API. One case for this is when 'module' imports part of its public 2488 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 2489 2490 The 'extra' argument can be a set of names that wouldn't otherwise be 2491 automatically detected as "public", like objects without a proper 2492 '__module__' attribute. If provided, it will be added to the 2493 automatically detected ones. 2494 2495 The 'blacklist' argument can be a set of names that must not be treated 2496 as part of the public API even though their names indicate otherwise. 2497 2498 Usage: 2499 import bar 2500 import foo 2501 import unittest 2502 from test import support 2503 2504 class MiscTestCase(unittest.TestCase): 2505 def test__all__(self): 2506 support.check__all__(self, foo) 2507 2508 class OtherTestCase(unittest.TestCase): 2509 def test__all__(self): 2510 extra = {'BAR_CONST', 'FOO_CONST'} 2511 blacklist = {'baz'} # Undocumented name. 2512 # bar imports part of its API from _bar. 2513 support.check__all__(self, bar, ('bar', '_bar'), 2514 extra=extra, blacklist=blacklist) 2515 2516 """ 2517 2518 if name_of_module is None: 2519 name_of_module = (module.__name__, ) 2520 elif isinstance(name_of_module, str): 2521 name_of_module = (name_of_module, ) 2522 2523 expected = set(extra) 2524 2525 for name in dir(module): 2526 if name.startswith('_') or name in blacklist: 2527 continue 2528 obj = getattr(module, name) 2529 if (getattr(obj, '__module__', None) in name_of_module or 2530 (not hasattr(obj, '__module__') and 2531 not isinstance(obj, types.ModuleType))): 2532 expected.add(name) 2533 test_case.assertCountEqual(module.__all__, expected) 2534 2535 2536def suppress_msvcrt_asserts(verbose=False): 2537 try: 2538 import msvcrt 2539 except ImportError: 2540 return 2541 2542 msvcrt.SetErrorMode(msvcrt.SEM_FAILCRITICALERRORS 2543 | msvcrt.SEM_NOALIGNMENTFAULTEXCEPT 2544 | msvcrt.SEM_NOGPFAULTERRORBOX 2545 | msvcrt.SEM_NOOPENFILEERRORBOX) 2546 2547 # CrtSetReportMode() is only available in debug build 2548 if hasattr(msvcrt, 'CrtSetReportMode'): 2549 for m in [msvcrt.CRT_WARN, msvcrt.CRT_ERROR, msvcrt.CRT_ASSERT]: 2550 if verbose: 2551 msvcrt.CrtSetReportMode(m, msvcrt.CRTDBG_MODE_FILE) 2552 msvcrt.CrtSetReportFile(m, msvcrt.CRTDBG_FILE_STDERR) 2553 else: 2554 msvcrt.CrtSetReportMode(m, 0) 2555 2556 2557class SuppressCrashReport: 2558 """Try to prevent a crash report from popping up. 2559 2560 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 2561 disable the creation of coredump file. 2562 """ 2563 old_value = None 2564 old_modes = None 2565 2566 def __enter__(self): 2567 """On Windows, disable Windows Error Reporting dialogs using 2568 SetErrorMode() and CrtSetReportMode(). 2569 2570 On UNIX, try to save the previous core file size limit, then set 2571 soft limit to 0. 2572 """ 2573 if sys.platform.startswith('win'): 2574 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 2575 # GetErrorMode is not available on Windows XP and Windows Server 2003, 2576 # but SetErrorMode returns the previous value, so we can use that 2577 try: 2578 import msvcrt 2579 except ImportError: 2580 return 2581 2582 self.old_value = msvcrt.SetErrorMode(msvcrt.SEM_NOGPFAULTERRORBOX) 2583 2584 msvcrt.SetErrorMode(self.old_value | msvcrt.SEM_NOGPFAULTERRORBOX) 2585 2586 # bpo-23314: Suppress assert dialogs in debug builds. 2587 # CrtSetReportMode() is only available in debug build. 2588 if hasattr(msvcrt, 'CrtSetReportMode'): 2589 self.old_modes = {} 2590 for report_type in [msvcrt.CRT_WARN, 2591 msvcrt.CRT_ERROR, 2592 msvcrt.CRT_ASSERT]: 2593 old_mode = msvcrt.CrtSetReportMode(report_type, 2594 msvcrt.CRTDBG_MODE_FILE) 2595 old_file = msvcrt.CrtSetReportFile(report_type, 2596 msvcrt.CRTDBG_FILE_STDERR) 2597 self.old_modes[report_type] = old_mode, old_file 2598 2599 else: 2600 try: 2601 import resource 2602 self.resource = resource 2603 except ImportError: 2604 self.resource = None 2605 if self.resource is not None: 2606 try: 2607 self.old_value = self.resource.getrlimit(self.resource.RLIMIT_CORE) 2608 self.resource.setrlimit(self.resource.RLIMIT_CORE, 2609 (0, self.old_value[1])) 2610 except (ValueError, OSError): 2611 pass 2612 2613 if sys.platform == 'darwin': 2614 # Check if the 'Crash Reporter' on OSX was configured 2615 # in 'Developer' mode and warn that it will get triggered 2616 # when it is. 2617 # 2618 # This assumes that this context manager is used in tests 2619 # that might trigger the next manager. 2620 cmd = ['/usr/bin/defaults', 'read', 2621 'com.apple.CrashReporter', 'DialogType'] 2622 proc = subprocess.Popen(cmd, 2623 stdout=subprocess.PIPE, 2624 stderr=subprocess.PIPE) 2625 with proc: 2626 stdout = proc.communicate()[0] 2627 if stdout.strip() == b'developer': 2628 print("this test triggers the Crash Reporter, " 2629 "that is intentional", end='', flush=True) 2630 2631 return self 2632 2633 def __exit__(self, *ignore_exc): 2634 """Restore Windows ErrorMode or core file behavior to initial value.""" 2635 if self.old_value is None: 2636 return 2637 2638 if sys.platform.startswith('win'): 2639 import msvcrt 2640 msvcrt.SetErrorMode(self.old_value) 2641 2642 if self.old_modes: 2643 for report_type, (old_mode, old_file) in self.old_modes.items(): 2644 msvcrt.CrtSetReportMode(report_type, old_mode) 2645 msvcrt.CrtSetReportFile(report_type, old_file) 2646 else: 2647 if self.resource is not None: 2648 try: 2649 self.resource.setrlimit(self.resource.RLIMIT_CORE, self.old_value) 2650 except (ValueError, OSError): 2651 pass 2652 2653 2654def patch(test_instance, object_to_patch, attr_name, new_value): 2655 """Override 'object_to_patch'.'attr_name' with 'new_value'. 2656 2657 Also, add a cleanup procedure to 'test_instance' to restore 2658 'object_to_patch' value for 'attr_name'. 2659 The 'attr_name' should be a valid attribute for 'object_to_patch'. 2660 2661 """ 2662 # check that 'attr_name' is a real attribute for 'object_to_patch' 2663 # will raise AttributeError if it does not exist 2664 getattr(object_to_patch, attr_name) 2665 2666 # keep a copy of the old value 2667 attr_is_local = False 2668 try: 2669 old_value = object_to_patch.__dict__[attr_name] 2670 except (AttributeError, KeyError): 2671 old_value = getattr(object_to_patch, attr_name, None) 2672 else: 2673 attr_is_local = True 2674 2675 # restore the value when the test is done 2676 def cleanup(): 2677 if attr_is_local: 2678 setattr(object_to_patch, attr_name, old_value) 2679 else: 2680 delattr(object_to_patch, attr_name) 2681 2682 test_instance.addCleanup(cleanup) 2683 2684 # actually override the attribute 2685 setattr(object_to_patch, attr_name, new_value) 2686 2687 2688def run_in_subinterp(code): 2689 """ 2690 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 2691 module is enabled. 2692 """ 2693 # Issue #10915, #15751: PyGILState_*() functions don't work with 2694 # sub-interpreters, the tracemalloc module uses these functions internally 2695 try: 2696 import tracemalloc 2697 except ImportError: 2698 pass 2699 else: 2700 if tracemalloc.is_tracing(): 2701 raise unittest.SkipTest("run_in_subinterp() cannot be used " 2702 "if tracemalloc module is tracing " 2703 "memory allocations") 2704 import _testcapi 2705 return _testcapi.run_in_subinterp(code) 2706 2707 2708def check_free_after_iterating(test, iter, cls, args=()): 2709 class A(cls): 2710 def __del__(self): 2711 nonlocal done 2712 done = True 2713 try: 2714 next(it) 2715 except StopIteration: 2716 pass 2717 2718 done = False 2719 it = iter(A(*args)) 2720 # Issue 26494: Shouldn't crash 2721 test.assertRaises(StopIteration, next, it) 2722 # The sequence should be deallocated just after the end of iterating 2723 gc_collect() 2724 test.assertTrue(done) 2725 2726 2727def missing_compiler_executable(cmd_names=[]): 2728 """Check if the compiler components used to build the interpreter exist. 2729 2730 Check for the existence of the compiler executables whose names are listed 2731 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 2732 and return the first missing executable or None when none is found 2733 missing. 2734 2735 """ 2736 from distutils import ccompiler, sysconfig, spawn, errors 2737 compiler = ccompiler.new_compiler() 2738 sysconfig.customize_compiler(compiler) 2739 if compiler.compiler_type == "msvc": 2740 # MSVC has no executables, so check whether initialization succeeds 2741 try: 2742 compiler.initialize() 2743 except errors.DistutilsPlatformError: 2744 return "msvc" 2745 for name in compiler.executables: 2746 if cmd_names and name not in cmd_names: 2747 continue 2748 cmd = getattr(compiler, name) 2749 if cmd_names: 2750 assert cmd is not None, \ 2751 "the '%s' executable is not configured" % name 2752 elif not cmd: 2753 continue 2754 if spawn.find_executable(cmd[0]) is None: 2755 return cmd[0] 2756 2757 2758_is_android_emulator = None 2759def setswitchinterval(interval): 2760 # Setting a very low gil interval on the Android emulator causes python 2761 # to hang (issue #26939). 2762 minimum_interval = 1e-5 2763 if is_android and interval < minimum_interval: 2764 global _is_android_emulator 2765 if _is_android_emulator is None: 2766 _is_android_emulator = (subprocess.check_output( 2767 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 2768 if _is_android_emulator: 2769 interval = minimum_interval 2770 return sys.setswitchinterval(interval) 2771 2772 2773@contextlib.contextmanager 2774def disable_faulthandler(): 2775 import faulthandler 2776 2777 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 2778 # sys.stderr with a StringIO which has no file descriptor when a test 2779 # is run with -W/--verbose3. 2780 fd = sys.__stderr__.fileno() 2781 2782 is_enabled = faulthandler.is_enabled() 2783 try: 2784 faulthandler.disable() 2785 yield 2786 finally: 2787 if is_enabled: 2788 faulthandler.enable(file=fd, all_threads=True) 2789 2790 2791def fd_count(): 2792 """Count the number of open file descriptors. 2793 """ 2794 if sys.platform.startswith(('linux', 'freebsd')): 2795 try: 2796 names = os.listdir("/proc/self/fd") 2797 # Subtract one because listdir() internally opens a file 2798 # descriptor to list the content of the /proc/self/fd/ directory. 2799 return len(names) - 1 2800 except FileNotFoundError: 2801 pass 2802 2803 MAXFD = 256 2804 if hasattr(os, 'sysconf'): 2805 try: 2806 MAXFD = os.sysconf("SC_OPEN_MAX") 2807 except OSError: 2808 pass 2809 2810 old_modes = None 2811 if sys.platform == 'win32': 2812 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 2813 # on invalid file descriptor if Python is compiled in debug mode 2814 try: 2815 import msvcrt 2816 msvcrt.CrtSetReportMode 2817 except (AttributeError, ImportError): 2818 # no msvcrt or a release build 2819 pass 2820 else: 2821 old_modes = {} 2822 for report_type in (msvcrt.CRT_WARN, 2823 msvcrt.CRT_ERROR, 2824 msvcrt.CRT_ASSERT): 2825 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) 2826 2827 try: 2828 count = 0 2829 for fd in range(MAXFD): 2830 try: 2831 # Prefer dup() over fstat(). fstat() can require input/output 2832 # whereas dup() doesn't. 2833 fd2 = os.dup(fd) 2834 except OSError as e: 2835 if e.errno != errno.EBADF: 2836 raise 2837 else: 2838 os.close(fd2) 2839 count += 1 2840 finally: 2841 if old_modes is not None: 2842 for report_type in (msvcrt.CRT_WARN, 2843 msvcrt.CRT_ERROR, 2844 msvcrt.CRT_ASSERT): 2845 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 2846 2847 return count 2848 2849 2850class SaveSignals: 2851 """ 2852 Save and restore signal handlers. 2853 2854 This class is only able to save/restore signal handlers registered 2855 by the Python signal module: see bpo-13285 for "external" signal 2856 handlers. 2857 """ 2858 2859 def __init__(self): 2860 import signal 2861 self.signal = signal 2862 self.signals = signal.valid_signals() 2863 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 2864 for signame in ('SIGKILL', 'SIGSTOP'): 2865 try: 2866 signum = getattr(signal, signame) 2867 except AttributeError: 2868 continue 2869 self.signals.remove(signum) 2870 self.handlers = {} 2871 2872 def save(self): 2873 for signum in self.signals: 2874 handler = self.signal.getsignal(signum) 2875 if handler is None: 2876 # getsignal() returns None if a signal handler was not 2877 # registered by the Python signal module, 2878 # and the handler is not SIG_DFL nor SIG_IGN. 2879 # 2880 # Ignore the signal: we cannot restore the handler. 2881 continue 2882 self.handlers[signum] = handler 2883 2884 def restore(self): 2885 for signum, handler in self.handlers.items(): 2886 self.signal.signal(signum, handler) 2887 2888 2889def with_pymalloc(): 2890 import _testcapi 2891 return _testcapi.WITH_PYMALLOC 2892 2893 2894class FakePath: 2895 """Simple implementing of the path protocol. 2896 """ 2897 def __init__(self, path): 2898 self.path = path 2899 2900 def __repr__(self): 2901 return f'<FakePath {self.path!r}>' 2902 2903 def __fspath__(self): 2904 if (isinstance(self.path, BaseException) or 2905 isinstance(self.path, type) and 2906 issubclass(self.path, BaseException)): 2907 raise self.path 2908 else: 2909 return self.path 2910 2911 2912class _ALWAYS_EQ: 2913 """ 2914 Object that is equal to anything. 2915 """ 2916 def __eq__(self, other): 2917 return True 2918 def __ne__(self, other): 2919 return False 2920 2921ALWAYS_EQ = _ALWAYS_EQ() 2922 2923class _NEVER_EQ: 2924 """ 2925 Object that is not equal to anything. 2926 """ 2927 def __eq__(self, other): 2928 return False 2929 def __ne__(self, other): 2930 return True 2931 def __hash__(self): 2932 return 1 2933 2934NEVER_EQ = _NEVER_EQ() 2935 2936@functools.total_ordering 2937class _LARGEST: 2938 """ 2939 Object that is greater than anything (except itself). 2940 """ 2941 def __eq__(self, other): 2942 return isinstance(other, _LARGEST) 2943 def __lt__(self, other): 2944 return False 2945 2946LARGEST = _LARGEST() 2947 2948@functools.total_ordering 2949class _SMALLEST: 2950 """ 2951 Object that is less than anything (except itself). 2952 """ 2953 def __eq__(self, other): 2954 return isinstance(other, _SMALLEST) 2955 def __gt__(self, other): 2956 return False 2957 2958SMALLEST = _SMALLEST() 2959 2960def maybe_get_event_loop_policy(): 2961 """Return the global event loop policy if one is set, else return None.""" 2962 import asyncio.events 2963 return asyncio.events._event_loop_policy 2964 2965# Helpers for testing hashing. 2966NHASHBITS = sys.hash_info.width # number of bits in hash() result 2967assert NHASHBITS in (32, 64) 2968 2969# Return mean and sdev of number of collisions when tossing nballs balls 2970# uniformly at random into nbins bins. By definition, the number of 2971# collisions is the number of balls minus the number of occupied bins at 2972# the end. 2973def collision_stats(nbins, nballs): 2974 n, k = nbins, nballs 2975 # prob a bin empty after k trials = (1 - 1/n)**k 2976 # mean # empty is then n * (1 - 1/n)**k 2977 # so mean # occupied is n - n * (1 - 1/n)**k 2978 # so collisions = k - (n - n*(1 - 1/n)**k) 2979 # 2980 # For the variance: 2981 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 2982 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 2983 # 2984 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 2985 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 2986 # error-prone) efforts to rework the naive formulas to avoid those, 2987 # we use the `decimal` module to get plenty of extra precision. 2988 # 2989 # Note: the exact values are straightforward to compute with 2990 # rationals, but in context that's unbearably slow, requiring 2991 # multi-million bit arithmetic. 2992 import decimal 2993 with decimal.localcontext() as ctx: 2994 bits = n.bit_length() * 2 # bits in n**2 2995 # At least that many bits will likely cancel out. 2996 # Use that many decimal digits instead. 2997 ctx.prec = max(bits, 30) 2998 dn = decimal.Decimal(n) 2999 p1empty = ((dn - 1) / dn) ** k 3000 meanempty = n * p1empty 3001 occupied = n - meanempty 3002 collisions = k - occupied 3003 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 3004 return float(collisions), float(var.sqrt()) 3005 3006 3007class catch_unraisable_exception: 3008 """ 3009 Context manager catching unraisable exception using sys.unraisablehook. 3010 3011 Storing the exception value (cm.unraisable.exc_value) creates a reference 3012 cycle. The reference cycle is broken explicitly when the context manager 3013 exits. 3014 3015 Storing the object (cm.unraisable.object) can resurrect it if it is set to 3016 an object which is being finalized. Exiting the context manager clears the 3017 stored object. 3018 3019 Usage: 3020 3021 with support.catch_unraisable_exception() as cm: 3022 # code creating an "unraisable exception" 3023 ... 3024 3025 # check the unraisable exception: use cm.unraisable 3026 ... 3027 3028 # cm.unraisable attribute no longer exists at this point 3029 # (to break a reference cycle) 3030 """ 3031 3032 def __init__(self): 3033 self.unraisable = None 3034 self._old_hook = None 3035 3036 def _hook(self, unraisable): 3037 # Storing unraisable.object can resurrect an object which is being 3038 # finalized. Storing unraisable.exc_value creates a reference cycle. 3039 self.unraisable = unraisable 3040 3041 def __enter__(self): 3042 self._old_hook = sys.unraisablehook 3043 sys.unraisablehook = self._hook 3044 return self 3045 3046 def __exit__(self, *exc_info): 3047 sys.unraisablehook = self._old_hook 3048 del self.unraisable 3049 3050 3051class catch_threading_exception: 3052 """ 3053 Context manager catching threading.Thread exception using 3054 threading.excepthook. 3055 3056 Attributes set when an exception is catched: 3057 3058 * exc_type 3059 * exc_value 3060 * exc_traceback 3061 * thread 3062 3063 See threading.excepthook() documentation for these attributes. 3064 3065 These attributes are deleted at the context manager exit. 3066 3067 Usage: 3068 3069 with support.catch_threading_exception() as cm: 3070 # code spawning a thread which raises an exception 3071 ... 3072 3073 # check the thread exception, use cm attributes: 3074 # exc_type, exc_value, exc_traceback, thread 3075 ... 3076 3077 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer 3078 # exists at this point 3079 # (to avoid reference cycles) 3080 """ 3081 3082 def __init__(self): 3083 self.exc_type = None 3084 self.exc_value = None 3085 self.exc_traceback = None 3086 self.thread = None 3087 self._old_hook = None 3088 3089 def _hook(self, args): 3090 self.exc_type = args.exc_type 3091 self.exc_value = args.exc_value 3092 self.exc_traceback = args.exc_traceback 3093 self.thread = args.thread 3094 3095 def __enter__(self): 3096 self._old_hook = threading.excepthook 3097 threading.excepthook = self._hook 3098 return self 3099 3100 def __exit__(self, *exc_info): 3101 threading.excepthook = self._old_hook 3102 del self.exc_type 3103 del self.exc_value 3104 del self.exc_traceback 3105 del self.thread 3106 3107 3108def wait_process(pid, *, exitcode, timeout=None): 3109 """ 3110 Wait until process pid completes and check that the process exit code is 3111 exitcode. 3112 3113 Raise an AssertionError if the process exit code is not equal to exitcode. 3114 3115 If the process runs longer than timeout seconds (SHORT_TIMEOUT by default), 3116 kill the process (if signal.SIGKILL is available) and raise an 3117 AssertionError. The timeout feature is not available on Windows. 3118 """ 3119 if os.name != "nt": 3120 import signal 3121 3122 if timeout is None: 3123 timeout = SHORT_TIMEOUT 3124 t0 = time.monotonic() 3125 sleep = 0.001 3126 max_sleep = 0.1 3127 while True: 3128 pid2, status = os.waitpid(pid, os.WNOHANG) 3129 if pid2 != 0: 3130 break 3131 # process is still running 3132 3133 dt = time.monotonic() - t0 3134 if dt > SHORT_TIMEOUT: 3135 try: 3136 os.kill(pid, signal.SIGKILL) 3137 os.waitpid(pid, 0) 3138 except OSError: 3139 # Ignore errors like ChildProcessError or PermissionError 3140 pass 3141 3142 raise AssertionError(f"process {pid} is still running " 3143 f"after {dt:.1f} seconds") 3144 3145 sleep = min(sleep * 2, max_sleep) 3146 time.sleep(sleep) 3147 else: 3148 # Windows implementation 3149 pid2, status = os.waitpid(pid, 0) 3150 3151 exitcode2 = os.waitstatus_to_exitcode(status) 3152 if exitcode2 != exitcode: 3153 raise AssertionError(f"process {pid} exited with code {exitcode2}, " 3154 f"but exit code {exitcode} is expected") 3155 3156 # sanity check: it should not fail in practice 3157 if pid2 != pid: 3158 raise AssertionError(f"pid {pid2} != pid {pid}") 3159 3160 3161def use_old_parser(): 3162 import _testinternalcapi 3163 config = _testinternalcapi.get_configs() 3164 return (config['config']['_use_peg_parser'] == 0) 3165 3166 3167def skip_if_new_parser(msg): 3168 return unittest.skipIf(not use_old_parser(), msg) 3169 3170 3171@contextlib.contextmanager 3172def save_restore_warnings_filters(): 3173 old_filters = warnings.filters[:] 3174 try: 3175 yield 3176 finally: 3177 warnings.filters[:] = old_filters 3178 3179 3180def skip_if_broken_multiprocessing_synchronize(): 3181 """ 3182 Skip tests if the multiprocessing.synchronize module is missing, if there 3183 is no available semaphore implementation, or if creating a lock raises an 3184 OSError (on Linux only). 3185 """ 3186 3187 # Skip tests if the _multiprocessing extension is missing. 3188 import_module('_multiprocessing') 3189 3190 # Skip tests if there is no available semaphore implementation: 3191 # multiprocessing.synchronize requires _multiprocessing.SemLock. 3192 synchronize = import_module('multiprocessing.synchronize') 3193 3194 if sys.platform == "linux": 3195 try: 3196 # bpo-38377: On Linux, creating a semaphore fails with OSError 3197 # if the current user does not have the permission to create 3198 # a file in /dev/shm/ directory. 3199 synchronize.Lock(ctx=None) 3200 except OSError as exc: 3201 raise unittest.SkipTest(f"broken multiprocessing SemLock: {exc!r}") 3202