1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import asyncio.events 7import collections.abc 8import contextlib 9import errno 10import faulthandler 11import fnmatch 12import functools 13import gc 14import glob 15import hashlib 16import importlib 17import importlib.util 18import locale 19import logging.handlers 20import nntplib 21import os 22import platform 23import re 24import shutil 25import socket 26import stat 27import struct 28import subprocess 29import sys 30import sysconfig 31import tempfile 32import _thread 33import threading 34import time 35import types 36import unittest 37import urllib.error 38import warnings 39 40from .testresult import get_test_runner 41 42try: 43 import multiprocessing.process 44except ImportError: 45 multiprocessing = None 46 47try: 48 import zlib 49except ImportError: 50 zlib = None 51 52try: 53 import gzip 54except ImportError: 55 gzip = None 56 57try: 58 import bz2 59except ImportError: 60 bz2 = None 61 62try: 63 import lzma 64except ImportError: 65 lzma = None 66 67try: 68 import resource 69except ImportError: 70 resource = None 71 72try: 73 import _hashlib 74except ImportError: 75 _hashlib = None 76 77__all__ = [ 78 # globals 79 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 80 # exceptions 81 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 82 # imports 83 "import_module", "import_fresh_module", "CleanImport", 84 # modules 85 "unload", "forget", 86 # io 87 "record_original_stdout", "get_original_stdout", "captured_stdout", 88 "captured_stdin", "captured_stderr", 89 # filesystem 90 "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", 91 "create_empty_file", "can_symlink", "fs_is_case_insensitive", 92 # unittest 93 "is_resource_enabled", "requires", "requires_freebsd_version", 94 "requires_linux_version", "requires_mac_ver", "requires_hashdigest", 95 "check_syntax_error", "check_syntax_warning", 96 "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", 97 "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest", 98 "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", 99 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 100 "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", 101 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 102 "check__all__", "skip_unless_bind_unix_socket", "skip_if_buggy_ucrt_strfptime", 103 "ignore_warnings", 104 # sys 105 "is_jython", "is_android", "check_impl_detail", "unix_shell", 106 "setswitchinterval", 107 # network 108 "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", 109 "bind_unix_socket", 110 # processes 111 'temp_umask', "reap_children", 112 # logging 113 "TestHandler", 114 # threads 115 "threading_setup", "threading_cleanup", "reap_threads", "start_threads", 116 # miscellaneous 117 "check_warnings", "check_no_resource_warning", "check_no_warnings", 118 "EnvironmentVarGuard", 119 "run_with_locale", "swap_item", 120 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 121 "run_with_tz", "PGO", "missing_compiler_executable", "fd_count", 122 "ALWAYS_EQ", "LARGEST", "SMALLEST" 123 ] 124 125class Error(Exception): 126 """Base class for regression test exceptions.""" 127 128class TestFailed(Error): 129 """Test failed.""" 130 131class TestDidNotRun(Error): 132 """Test did not run any subtests.""" 133 134class ResourceDenied(unittest.SkipTest): 135 """Test skipped because it requested a disallowed resource. 136 137 This is raised when a test calls requires() for a resource that 138 has not be enabled. It is used to distinguish between expected 139 and unexpected skips. 140 """ 141 142@contextlib.contextmanager 143def _ignore_deprecated_imports(ignore=True): 144 """Context manager to suppress package and module deprecation 145 warnings when importing them. 146 147 If ignore is False, this context manager has no effect. 148 """ 149 if ignore: 150 with warnings.catch_warnings(): 151 warnings.filterwarnings("ignore", ".+ (module|package)", 152 DeprecationWarning) 153 yield 154 else: 155 yield 156 157 158def ignore_warnings(*, category): 159 """Decorator to suppress deprecation warnings. 160 161 Use of context managers to hide warnings make diffs 162 more noisy and tools like 'git blame' less useful. 163 """ 164 def decorator(test): 165 @functools.wraps(test) 166 def wrapper(self, *args, **kwargs): 167 with warnings.catch_warnings(): 168 warnings.simplefilter('ignore', category=category) 169 return test(self, *args, **kwargs) 170 return wrapper 171 return decorator 172 173 174def import_module(name, deprecated=False, *, required_on=()): 175 """Import and return the module to be tested, raising SkipTest if 176 it is not available. 177 178 If deprecated is True, any module or package deprecation messages 179 will be suppressed. If a module is required on a platform but optional for 180 others, set required_on to an iterable of platform prefixes which will be 181 compared against sys.platform. 182 """ 183 with _ignore_deprecated_imports(deprecated): 184 try: 185 return importlib.import_module(name) 186 except ImportError as msg: 187 if sys.platform.startswith(tuple(required_on)): 188 raise 189 raise unittest.SkipTest(str(msg)) 190 191 192def _save_and_remove_module(name, orig_modules): 193 """Helper function to save and remove a module from sys.modules 194 195 Raise ImportError if the module can't be imported. 196 """ 197 # try to import the module and raise an error if it can't be imported 198 if name not in sys.modules: 199 __import__(name) 200 del sys.modules[name] 201 for modname in list(sys.modules): 202 if modname == name or modname.startswith(name + '.'): 203 orig_modules[modname] = sys.modules[modname] 204 del sys.modules[modname] 205 206def _save_and_block_module(name, orig_modules): 207 """Helper function to save and block a module in sys.modules 208 209 Return True if the module was in sys.modules, False otherwise. 210 """ 211 saved = True 212 try: 213 orig_modules[name] = sys.modules[name] 214 except KeyError: 215 saved = False 216 sys.modules[name] = None 217 return saved 218 219 220def anticipate_failure(condition): 221 """Decorator to mark a test that is known to be broken in some cases 222 223 Any use of this decorator should have a comment identifying the 224 associated tracker issue. 225 """ 226 if condition: 227 return unittest.expectedFailure 228 return lambda f: f 229 230def load_package_tests(pkg_dir, loader, standard_tests, pattern): 231 """Generic load_tests implementation for simple test packages. 232 233 Most packages can implement load_tests using this function as follows: 234 235 def load_tests(*args): 236 return load_package_tests(os.path.dirname(__file__), *args) 237 """ 238 if pattern is None: 239 pattern = "test*" 240 top_dir = os.path.dirname( # Lib 241 os.path.dirname( # test 242 os.path.dirname(__file__))) # support 243 package_tests = loader.discover(start_dir=pkg_dir, 244 top_level_dir=top_dir, 245 pattern=pattern) 246 standard_tests.addTests(package_tests) 247 return standard_tests 248 249 250def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 251 """Import and return a module, deliberately bypassing sys.modules. 252 253 This function imports and returns a fresh copy of the named Python module 254 by removing the named module from sys.modules before doing the import. 255 Note that unlike reload, the original module is not affected by 256 this operation. 257 258 *fresh* is an iterable of additional module names that are also removed 259 from the sys.modules cache before doing the import. 260 261 *blocked* is an iterable of module names that are replaced with None 262 in the module cache during the import to ensure that attempts to import 263 them raise ImportError. 264 265 The named module and any modules named in the *fresh* and *blocked* 266 parameters are saved before starting the import and then reinserted into 267 sys.modules when the fresh import is complete. 268 269 Module and package deprecation messages are suppressed during this import 270 if *deprecated* is True. 271 272 This function will raise ImportError if the named module cannot be 273 imported. 274 """ 275 # NOTE: test_heapq, test_json and test_warnings include extra sanity checks 276 # to make sure that this utility function is working as expected 277 with _ignore_deprecated_imports(deprecated): 278 # Keep track of modules saved for later restoration as well 279 # as those which just need a blocking entry removed 280 orig_modules = {} 281 names_to_remove = [] 282 _save_and_remove_module(name, orig_modules) 283 try: 284 for fresh_name in fresh: 285 _save_and_remove_module(fresh_name, orig_modules) 286 for blocked_name in blocked: 287 if not _save_and_block_module(blocked_name, orig_modules): 288 names_to_remove.append(blocked_name) 289 fresh_module = importlib.import_module(name) 290 except ImportError: 291 fresh_module = None 292 finally: 293 for orig_name, module in orig_modules.items(): 294 sys.modules[orig_name] = module 295 for name_to_remove in names_to_remove: 296 del sys.modules[name_to_remove] 297 return fresh_module 298 299 300def get_attribute(obj, name): 301 """Get an attribute, raising SkipTest if AttributeError is raised.""" 302 try: 303 attribute = getattr(obj, name) 304 except AttributeError: 305 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 306 else: 307 return attribute 308 309verbose = 1 # Flag set to 0 by regrtest.py 310use_resources = None # Flag set to [] by regrtest.py 311max_memuse = 0 # Disable bigmem tests (they will still be run with 312 # small sizes, to make sure they work.) 313real_max_memuse = 0 314junit_xml_list = None # list of testsuite XML elements 315failfast = False 316 317# _original_stdout is meant to hold stdout at the time regrtest began. 318# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 319# The point is to have some flavor of stdout the user can actually see. 320_original_stdout = None 321def record_original_stdout(stdout): 322 global _original_stdout 323 _original_stdout = stdout 324 325def get_original_stdout(): 326 return _original_stdout or sys.stdout 327 328def unload(name): 329 try: 330 del sys.modules[name] 331 except KeyError: 332 pass 333 334def _force_run(path, func, *args): 335 try: 336 return func(*args) 337 except OSError as err: 338 if verbose >= 2: 339 print('%s: %s' % (err.__class__.__name__, err)) 340 print('re-run %s%r' % (func.__name__, args)) 341 os.chmod(path, stat.S_IRWXU) 342 return func(*args) 343 344if sys.platform.startswith("win"): 345 def _waitfor(func, pathname, waitall=False): 346 # Perform the operation 347 func(pathname) 348 # Now setup the wait loop 349 if waitall: 350 dirname = pathname 351 else: 352 dirname, name = os.path.split(pathname) 353 dirname = dirname or '.' 354 # Check for `pathname` to be removed from the filesystem. 355 # The exponential backoff of the timeout amounts to a total 356 # of ~1 second after which the deletion is probably an error 357 # anyway. 358 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 359 # required when contention occurs. 360 timeout = 0.001 361 while timeout < 1.0: 362 # Note we are only testing for the existence of the file(s) in 363 # the contents of the directory regardless of any security or 364 # access rights. If we have made it this far, we have sufficient 365 # permissions to do that much using Python's equivalent of the 366 # Windows API FindFirstFile. 367 # Other Windows APIs can fail or give incorrect results when 368 # dealing with files that are pending deletion. 369 L = os.listdir(dirname) 370 if not (L if waitall else name in L): 371 return 372 # Increase the timeout and try again 373 time.sleep(timeout) 374 timeout *= 2 375 warnings.warn('tests may fail, delete still pending for ' + pathname, 376 RuntimeWarning, stacklevel=4) 377 378 def _unlink(filename): 379 _waitfor(os.unlink, filename) 380 381 def _rmdir(dirname): 382 _waitfor(os.rmdir, dirname) 383 384 def _rmtree(path): 385 def _rmtree_inner(path): 386 for name in _force_run(path, os.listdir, path): 387 fullname = os.path.join(path, name) 388 try: 389 mode = os.lstat(fullname).st_mode 390 except OSError as exc: 391 print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), 392 file=sys.__stderr__) 393 mode = 0 394 if stat.S_ISDIR(mode): 395 _waitfor(_rmtree_inner, fullname, waitall=True) 396 _force_run(fullname, os.rmdir, fullname) 397 else: 398 _force_run(fullname, os.unlink, fullname) 399 _waitfor(_rmtree_inner, path, waitall=True) 400 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 401 402 def _longpath(path): 403 try: 404 import ctypes 405 except ImportError: 406 # No ctypes means we can't expands paths. 407 pass 408 else: 409 buffer = ctypes.create_unicode_buffer(len(path) * 2) 410 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 411 len(buffer)) 412 if length: 413 return buffer[:length] 414 return path 415else: 416 _unlink = os.unlink 417 _rmdir = os.rmdir 418 419 def _rmtree(path): 420 try: 421 shutil.rmtree(path) 422 return 423 except OSError: 424 pass 425 426 def _rmtree_inner(path): 427 for name in _force_run(path, os.listdir, path): 428 fullname = os.path.join(path, name) 429 try: 430 mode = os.lstat(fullname).st_mode 431 except OSError: 432 mode = 0 433 if stat.S_ISDIR(mode): 434 _rmtree_inner(fullname) 435 _force_run(path, os.rmdir, fullname) 436 else: 437 _force_run(path, os.unlink, fullname) 438 _rmtree_inner(path) 439 os.rmdir(path) 440 441 def _longpath(path): 442 return path 443 444def unlink(filename): 445 try: 446 _unlink(filename) 447 except (FileNotFoundError, NotADirectoryError): 448 pass 449 450def rmdir(dirname): 451 try: 452 _rmdir(dirname) 453 except FileNotFoundError: 454 pass 455 456def rmtree(path): 457 try: 458 _rmtree(path) 459 except FileNotFoundError: 460 pass 461 462def make_legacy_pyc(source): 463 """Move a PEP 3147/488 pyc file to its legacy pyc location. 464 465 :param source: The file system path to the source file. The source file 466 does not need to exist, however the PEP 3147/488 pyc file must exist. 467 :return: The file system path to the legacy pyc file. 468 """ 469 pyc_file = importlib.util.cache_from_source(source) 470 up_one = os.path.dirname(os.path.abspath(source)) 471 legacy_pyc = os.path.join(up_one, source + 'c') 472 os.rename(pyc_file, legacy_pyc) 473 return legacy_pyc 474 475def forget(modname): 476 """'Forget' a module was ever imported. 477 478 This removes the module from sys.modules and deletes any PEP 3147/488 or 479 legacy .pyc files. 480 """ 481 unload(modname) 482 for dirname in sys.path: 483 source = os.path.join(dirname, modname + '.py') 484 # It doesn't matter if they exist or not, unlink all possible 485 # combinations of PEP 3147/488 and legacy pyc files. 486 unlink(source + 'c') 487 for opt in ('', 1, 2): 488 unlink(importlib.util.cache_from_source(source, optimization=opt)) 489 490# Check whether a gui is actually available 491def _is_gui_available(): 492 if hasattr(_is_gui_available, 'result'): 493 return _is_gui_available.result 494 reason = None 495 if sys.platform.startswith('win'): 496 # if Python is running as a service (such as the buildbot service), 497 # gui interaction may be disallowed 498 import ctypes 499 import ctypes.wintypes 500 UOI_FLAGS = 1 501 WSF_VISIBLE = 0x0001 502 class USEROBJECTFLAGS(ctypes.Structure): 503 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 504 ("fReserved", ctypes.wintypes.BOOL), 505 ("dwFlags", ctypes.wintypes.DWORD)] 506 dll = ctypes.windll.user32 507 h = dll.GetProcessWindowStation() 508 if not h: 509 raise ctypes.WinError() 510 uof = USEROBJECTFLAGS() 511 needed = ctypes.wintypes.DWORD() 512 res = dll.GetUserObjectInformationW(h, 513 UOI_FLAGS, 514 ctypes.byref(uof), 515 ctypes.sizeof(uof), 516 ctypes.byref(needed)) 517 if not res: 518 raise ctypes.WinError() 519 if not bool(uof.dwFlags & WSF_VISIBLE): 520 reason = "gui not available (WSF_VISIBLE flag not set)" 521 elif sys.platform == 'darwin': 522 # The Aqua Tk implementations on OS X can abort the process if 523 # being called in an environment where a window server connection 524 # cannot be made, for instance when invoked by a buildbot or ssh 525 # process not running under the same user id as the current console 526 # user. To avoid that, raise an exception if the window manager 527 # connection is not available. 528 from ctypes import cdll, c_int, pointer, Structure 529 from ctypes.util import find_library 530 531 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 532 533 if app_services.CGMainDisplayID() == 0: 534 reason = "gui tests cannot run without OS X window manager" 535 else: 536 class ProcessSerialNumber(Structure): 537 _fields_ = [("highLongOfPSN", c_int), 538 ("lowLongOfPSN", c_int)] 539 psn = ProcessSerialNumber() 540 psn_p = pointer(psn) 541 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 542 (app_services.SetFrontProcess(psn_p) < 0) ): 543 reason = "cannot run without OS X gui process" 544 545 # check on every platform whether tkinter can actually do anything 546 if not reason: 547 try: 548 from tkinter import Tk 549 root = Tk() 550 root.withdraw() 551 root.update() 552 root.destroy() 553 except Exception as e: 554 err_string = str(e) 555 if len(err_string) > 50: 556 err_string = err_string[:50] + ' [...]' 557 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 558 err_string) 559 560 _is_gui_available.reason = reason 561 _is_gui_available.result = not reason 562 563 return _is_gui_available.result 564 565def is_resource_enabled(resource): 566 """Test whether a resource is enabled. 567 568 Known resources are set by regrtest.py. If not running under regrtest.py, 569 all resources are assumed enabled unless use_resources has been set. 570 """ 571 return use_resources is None or resource in use_resources 572 573def requires(resource, msg=None): 574 """Raise ResourceDenied if the specified resource is not available.""" 575 if not is_resource_enabled(resource): 576 if msg is None: 577 msg = "Use of the %r resource not enabled" % resource 578 raise ResourceDenied(msg) 579 if resource == 'gui' and not _is_gui_available(): 580 raise ResourceDenied(_is_gui_available.reason) 581 582def _requires_unix_version(sysname, min_version): 583 """Decorator raising SkipTest if the OS is `sysname` and the version is less 584 than `min_version`. 585 586 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 587 the FreeBSD version is less than 7.2. 588 """ 589 def decorator(func): 590 @functools.wraps(func) 591 def wrapper(*args, **kw): 592 if platform.system() == sysname: 593 version_txt = platform.release().split('-', 1)[0] 594 try: 595 version = tuple(map(int, version_txt.split('.'))) 596 except ValueError: 597 pass 598 else: 599 if version < min_version: 600 min_version_txt = '.'.join(map(str, min_version)) 601 raise unittest.SkipTest( 602 "%s version %s or higher required, not %s" 603 % (sysname, min_version_txt, version_txt)) 604 return func(*args, **kw) 605 wrapper.min_version = min_version 606 return wrapper 607 return decorator 608 609def requires_freebsd_version(*min_version): 610 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 611 less than `min_version`. 612 613 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 614 version is less than 7.2. 615 """ 616 return _requires_unix_version('FreeBSD', min_version) 617 618def requires_linux_version(*min_version): 619 """Decorator raising SkipTest if the OS is Linux and the Linux version is 620 less than `min_version`. 621 622 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 623 version is less than 2.6.32. 624 """ 625 return _requires_unix_version('Linux', min_version) 626 627def requires_mac_ver(*min_version): 628 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 629 version if less than min_version. 630 631 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 632 is lesser than 10.5. 633 """ 634 def decorator(func): 635 @functools.wraps(func) 636 def wrapper(*args, **kw): 637 if sys.platform == 'darwin': 638 version_txt = platform.mac_ver()[0] 639 try: 640 version = tuple(map(int, version_txt.split('.'))) 641 except ValueError: 642 pass 643 else: 644 if version < min_version: 645 min_version_txt = '.'.join(map(str, min_version)) 646 raise unittest.SkipTest( 647 "Mac OS X %s or higher required, not %s" 648 % (min_version_txt, version_txt)) 649 return func(*args, **kw) 650 wrapper.min_version = min_version 651 return wrapper 652 return decorator 653 654 655def requires_hashdigest(digestname, openssl=None): 656 """Decorator raising SkipTest if a hashing algorithm is not available 657 658 The hashing algorithm could be missing or blocked by a strict crypto 659 policy. 660 661 If 'openssl' is True, then the decorator checks that OpenSSL provides 662 the algorithm. Otherwise the check falls back to built-in 663 implementations. 664 665 ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS 666 ValueError: unsupported hash type md4 667 """ 668 def decorator(func): 669 @functools.wraps(func) 670 def wrapper(*args, **kwargs): 671 try: 672 if openssl and _hashlib is not None: 673 _hashlib.new(digestname) 674 else: 675 hashlib.new(digestname) 676 except ValueError: 677 raise unittest.SkipTest( 678 f"hash digest '{digestname}' is not available." 679 ) 680 return func(*args, **kwargs) 681 return wrapper 682 return decorator 683 684 685HOST = "localhost" 686HOSTv4 = "127.0.0.1" 687HOSTv6 = "::1" 688 689 690def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 691 """Returns an unused port that should be suitable for binding. This is 692 achieved by creating a temporary socket with the same family and type as 693 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 694 the specified host address (defaults to 0.0.0.0) with the port set to 0, 695 eliciting an unused ephemeral port from the OS. The temporary socket is 696 then closed and deleted, and the ephemeral port is returned. 697 698 Either this method or bind_port() should be used for any tests where a 699 server socket needs to be bound to a particular port for the duration of 700 the test. Which one to use depends on whether the calling code is creating 701 a python socket, or if an unused port needs to be provided in a constructor 702 or passed to an external program (i.e. the -accept argument to openssl's 703 s_server mode). Always prefer bind_port() over find_unused_port() where 704 possible. Hard coded ports should *NEVER* be used. As soon as a server 705 socket is bound to a hard coded port, the ability to run multiple instances 706 of the test simultaneously on the same host is compromised, which makes the 707 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 708 may simply manifest as a failed test, which can be recovered from without 709 intervention in most cases, but on Windows, the entire python process can 710 completely and utterly wedge, requiring someone to log in to the buildbot 711 and manually kill the affected process. 712 713 (This is easy to reproduce on Windows, unfortunately, and can be traced to 714 the SO_REUSEADDR socket option having different semantics on Windows versus 715 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 716 listen and then accept connections on identical host/ports. An EADDRINUSE 717 OSError will be raised at some point (depending on the platform and 718 the order bind and listen were called on each socket). 719 720 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 721 will ever be raised when attempting to bind two identical host/ports. When 722 accept() is called on each socket, the second caller's process will steal 723 the port from the first caller, leaving them both in an awkwardly wedged 724 state where they'll no longer respond to any signals or graceful kills, and 725 must be forcibly killed via OpenProcess()/TerminateProcess(). 726 727 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 728 instead of SO_REUSEADDR, which effectively affords the same semantics as 729 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 730 Source world compared to Windows ones, this is a common mistake. A quick 731 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 732 openssl.exe is called with the 's_server' option, for example. See 733 http://bugs.python.org/issue2550 for more info. The following site also 734 has a very thorough description about the implications of both REUSEADDR 735 and EXCLUSIVEADDRUSE on Windows: 736 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 737 738 XXX: although this approach is a vast improvement on previous attempts to 739 elicit unused ports, it rests heavily on the assumption that the ephemeral 740 port returned to us by the OS won't immediately be dished back out to some 741 other process when we close and delete our temporary socket but before our 742 calling code has a chance to bind the returned port. We can deal with this 743 issue if/when we come across it. 744 """ 745 746 with socket.socket(family, socktype) as tempsock: 747 port = bind_port(tempsock) 748 del tempsock 749 return port 750 751def bind_port(sock, host=HOST): 752 """Bind the socket to a free port and return the port number. Relies on 753 ephemeral ports in order to ensure we are using an unbound port. This is 754 important as many tests may be running simultaneously, especially in a 755 buildbot environment. This method raises an exception if the sock.family 756 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 757 or SO_REUSEPORT set on it. Tests should *never* set these socket options 758 for TCP/IP sockets. The only case for setting these options is testing 759 multicasting via multiple UDP sockets. 760 761 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 762 on Windows), it will be set on the socket. This will prevent anyone else 763 from bind()'ing to our host/port for the duration of the test. 764 """ 765 766 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 767 if hasattr(socket, 'SO_REUSEADDR'): 768 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 769 raise TestFailed("tests should never set the SO_REUSEADDR " \ 770 "socket option on TCP/IP sockets!") 771 if hasattr(socket, 'SO_REUSEPORT'): 772 try: 773 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 774 raise TestFailed("tests should never set the SO_REUSEPORT " \ 775 "socket option on TCP/IP sockets!") 776 except OSError: 777 # Python's socket module was compiled using modern headers 778 # thus defining SO_REUSEPORT but this process is running 779 # under an older kernel that does not support SO_REUSEPORT. 780 pass 781 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 782 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 783 784 sock.bind((host, 0)) 785 port = sock.getsockname()[1] 786 return port 787 788def bind_unix_socket(sock, addr): 789 """Bind a unix socket, raising SkipTest if PermissionError is raised.""" 790 assert sock.family == socket.AF_UNIX 791 try: 792 sock.bind(addr) 793 except PermissionError: 794 sock.close() 795 raise unittest.SkipTest('cannot bind AF_UNIX sockets') 796 797def _is_ipv6_enabled(): 798 """Check whether IPv6 is enabled on this host.""" 799 if socket.has_ipv6: 800 sock = None 801 try: 802 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 803 sock.bind((HOSTv6, 0)) 804 return True 805 except OSError: 806 pass 807 finally: 808 if sock: 809 sock.close() 810 return False 811 812IPV6_ENABLED = _is_ipv6_enabled() 813 814def system_must_validate_cert(f): 815 """Skip the test on TLS certificate validation failures.""" 816 @functools.wraps(f) 817 def dec(*args, **kwargs): 818 try: 819 f(*args, **kwargs) 820 except OSError as e: 821 if "CERTIFICATE_VERIFY_FAILED" in str(e): 822 raise unittest.SkipTest("system does not contain " 823 "necessary certificates") 824 raise 825 return dec 826 827# A constant likely larger than the underlying OS pipe buffer size, to 828# make writes blocking. 829# Windows limit seems to be around 512 B, and many Unix kernels have a 830# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 831# (see issue #17835 for a discussion of this number). 832PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 833 834# A constant likely larger than the underlying OS socket buffer size, to make 835# writes blocking. 836# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 837# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 838# for a discussion of this number). 839SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 840 841# decorator for skipping tests on non-IEEE 754 platforms 842requires_IEEE_754 = unittest.skipUnless( 843 float.__getformat__("double").startswith("IEEE"), 844 "test requires IEEE 754 doubles") 845 846requires_zlib = unittest.skipUnless(zlib, 'requires zlib') 847 848requires_gzip = unittest.skipUnless(gzip, 'requires gzip') 849 850requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') 851 852requires_lzma = unittest.skipUnless(lzma, 'requires lzma') 853 854is_jython = sys.platform.startswith('java') 855 856is_android = hasattr(sys, 'getandroidapilevel') 857 858if sys.platform != 'win32': 859 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 860else: 861 unix_shell = None 862 863# Filename used for testing 864if os.name == 'java': 865 # Jython disallows @ in module names 866 TESTFN = '$test' 867else: 868 TESTFN = '@test' 869 870# Disambiguate TESTFN for parallel testing, while letting it remain a valid 871# module name. 872TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 873 874# Define the URL of a dedicated HTTP server for the network tests. 875# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 876TEST_HTTP_URL = "http://www.pythontest.net" 877 878# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 879# or None if there is no such character. 880FS_NONASCII = None 881for character in ( 882 # First try printable and common characters to have a readable filename. 883 # For each character, the encoding list are just example of encodings able 884 # to encode the character (the list is not exhaustive). 885 886 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 887 '\u00E6', 888 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 889 '\u0130', 890 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 891 '\u0141', 892 # U+03C6 (Greek Small Letter Phi): cp1253 893 '\u03C6', 894 # U+041A (Cyrillic Capital Letter Ka): cp1251 895 '\u041A', 896 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 897 '\u05D0', 898 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 899 '\u060C', 900 # U+062A (Arabic Letter Teh): cp720 901 '\u062A', 902 # U+0E01 (Thai Character Ko Kai): cp874 903 '\u0E01', 904 905 # Then try more "special" characters. "special" because they may be 906 # interpreted or displayed differently depending on the exact locale 907 # encoding and the font. 908 909 # U+00A0 (No-Break Space) 910 '\u00A0', 911 # U+20AC (Euro Sign) 912 '\u20AC', 913): 914 try: 915 # If Python is set up to use the legacy 'mbcs' in Windows, 916 # 'replace' error mode is used, and encode() returns b'?' 917 # for characters missing in the ANSI codepage 918 if os.fsdecode(os.fsencode(character)) != character: 919 raise UnicodeError 920 except UnicodeError: 921 pass 922 else: 923 FS_NONASCII = character 924 break 925 926# TESTFN_UNICODE is a non-ascii filename 927TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" 928if sys.platform == 'darwin': 929 # In Mac OS X's VFS API file names are, by definition, canonically 930 # decomposed Unicode, encoded using UTF-8. See QA1173: 931 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 932 import unicodedata 933 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 934TESTFN_ENCODING = sys.getfilesystemencoding() 935 936# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 937# encoded by the filesystem encoding (in strict mode). It can be None if we 938# cannot generate such filename. 939TESTFN_UNENCODABLE = None 940if os.name == 'nt': 941 # skip win32s (0) or Windows 9x/ME (1) 942 if sys.getwindowsversion().platform >= 2: 943 # Different kinds of characters from various languages to minimize the 944 # probability that the whole name is encodable to MBCS (issue #9819) 945 TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" 946 try: 947 TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) 948 except UnicodeEncodeError: 949 pass 950 else: 951 print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 952 'Unicode filename tests may not be effective' 953 % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) 954 TESTFN_UNENCODABLE = None 955# Mac OS X denies unencodable filenames (invalid utf-8) 956elif sys.platform != 'darwin': 957 try: 958 # ascii and utf-8 cannot encode the byte 0xff 959 b'\xff'.decode(TESTFN_ENCODING) 960 except UnicodeDecodeError: 961 # 0xff will be encoded using the surrogate character u+DCFF 962 TESTFN_UNENCODABLE = TESTFN \ 963 + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') 964 else: 965 # File system encoding (eg. ISO-8859-* encodings) can encode 966 # the byte 0xff. Skip some unicode filename tests. 967 pass 968 969# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 970# decoded from the filesystem encoding (in strict mode). It can be None if we 971# cannot generate such filename (ex: the latin1 encoding can decode any byte 972# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 973# to the surrogateescape error handler (PEP 383), but not from the filesystem 974# encoding in strict mode. 975TESTFN_UNDECODABLE = None 976for name in ( 977 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 978 # accepts it to create a file or a directory, or don't accept to enter to 979 # such directory (when the bytes name is used). So test b'\xe7' first: it is 980 # not decodable from cp932. 981 b'\xe7w\xf0', 982 # undecodable from ASCII, UTF-8 983 b'\xff', 984 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 985 # and cp857 986 b'\xae\xd5' 987 # undecodable from UTF-8 (UNIX and Mac OS X) 988 b'\xed\xb2\x80', b'\xed\xb4\x80', 989 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 990 # cp1253, cp1254, cp1255, cp1257, cp1258 991 b'\x81\x98', 992): 993 try: 994 name.decode(TESTFN_ENCODING) 995 except UnicodeDecodeError: 996 TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name 997 break 998 999if FS_NONASCII: 1000 TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII 1001else: 1002 TESTFN_NONASCII = None 1003 1004# Save the initial cwd 1005SAVEDCWD = os.getcwd() 1006 1007# Set by libregrtest/main.py so we can skip tests that are not 1008# useful for PGO 1009PGO = False 1010 1011# Set by libregrtest/main.py if we are running the extended (time consuming) 1012# PGO task. If this is True, PGO is also True. 1013PGO_EXTENDED = False 1014 1015@contextlib.contextmanager 1016def temp_dir(path=None, quiet=False): 1017 """Return a context manager that creates a temporary directory. 1018 1019 Arguments: 1020 1021 path: the directory to create temporarily. If omitted or None, 1022 defaults to creating a temporary directory using tempfile.mkdtemp. 1023 1024 quiet: if False (the default), the context manager raises an exception 1025 on error. Otherwise, if the path is specified and cannot be 1026 created, only a warning is issued. 1027 1028 """ 1029 dir_created = False 1030 if path is None: 1031 path = tempfile.mkdtemp() 1032 dir_created = True 1033 path = os.path.realpath(path) 1034 else: 1035 try: 1036 os.mkdir(path) 1037 dir_created = True 1038 except OSError as exc: 1039 if not quiet: 1040 raise 1041 warnings.warn(f'tests may fail, unable to create ' 1042 f'temporary directory {path!r}: {exc}', 1043 RuntimeWarning, stacklevel=3) 1044 if dir_created: 1045 pid = os.getpid() 1046 try: 1047 yield path 1048 finally: 1049 # In case the process forks, let only the parent remove the 1050 # directory. The child has a different process id. (bpo-30028) 1051 if dir_created and pid == os.getpid(): 1052 rmtree(path) 1053 1054@contextlib.contextmanager 1055def change_cwd(path, quiet=False): 1056 """Return a context manager that changes the current working directory. 1057 1058 Arguments: 1059 1060 path: the directory to use as the temporary current working directory. 1061 1062 quiet: if False (the default), the context manager raises an exception 1063 on error. Otherwise, it issues only a warning and keeps the current 1064 working directory the same. 1065 1066 """ 1067 saved_dir = os.getcwd() 1068 try: 1069 os.chdir(os.path.realpath(path)) 1070 except OSError as exc: 1071 if not quiet: 1072 raise 1073 warnings.warn(f'tests may fail, unable to change the current working ' 1074 f'directory to {path!r}: {exc}', 1075 RuntimeWarning, stacklevel=3) 1076 try: 1077 yield os.getcwd() 1078 finally: 1079 os.chdir(saved_dir) 1080 1081 1082@contextlib.contextmanager 1083def temp_cwd(name='tempcwd', quiet=False): 1084 """ 1085 Context manager that temporarily creates and changes the CWD. 1086 1087 The function temporarily changes the current working directory 1088 after creating a temporary directory in the current directory with 1089 name *name*. If *name* is None, the temporary directory is 1090 created using tempfile.mkdtemp. 1091 1092 If *quiet* is False (default) and it is not possible to 1093 create or change the CWD, an error is raised. If *quiet* is True, 1094 only a warning is raised and the original CWD is used. 1095 1096 """ 1097 with temp_dir(path=name, quiet=quiet) as temp_path: 1098 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 1099 yield cwd_dir 1100 1101if hasattr(os, "umask"): 1102 @contextlib.contextmanager 1103 def temp_umask(umask): 1104 """Context manager that temporarily sets the process umask.""" 1105 oldmask = os.umask(umask) 1106 try: 1107 yield 1108 finally: 1109 os.umask(oldmask) 1110 1111# TEST_HOME_DIR refers to the top level directory of the "test" package 1112# that contains Python's regression test suite 1113TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 1114TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 1115 1116# TEST_DATA_DIR is used as a target download location for remote resources 1117TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 1118 1119def findfile(filename, subdir=None): 1120 """Try to find a file on sys.path or in the test directory. If it is not 1121 found the argument passed to the function is returned (this does not 1122 necessarily signal failure; could still be the legitimate path). 1123 1124 Setting *subdir* indicates a relative path to use to find the file 1125 rather than looking directly in the path directories. 1126 """ 1127 if os.path.isabs(filename): 1128 return filename 1129 if subdir is not None: 1130 filename = os.path.join(subdir, filename) 1131 path = [TEST_HOME_DIR] + sys.path 1132 for dn in path: 1133 fn = os.path.join(dn, filename) 1134 if os.path.exists(fn): return fn 1135 return filename 1136 1137def create_empty_file(filename): 1138 """Create an empty file. If the file already exists, truncate it.""" 1139 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 1140 os.close(fd) 1141 1142def sortdict(dict): 1143 "Like repr(dict), but in sorted order." 1144 items = sorted(dict.items()) 1145 reprpairs = ["%r: %r" % pair for pair in items] 1146 withcommas = ", ".join(reprpairs) 1147 return "{%s}" % withcommas 1148 1149def make_bad_fd(): 1150 """ 1151 Create an invalid file descriptor by opening and closing a file and return 1152 its fd. 1153 """ 1154 file = open(TESTFN, "wb") 1155 try: 1156 return file.fileno() 1157 finally: 1158 file.close() 1159 unlink(TESTFN) 1160 1161 1162def check_syntax_error(testcase, statement, errtext='', *, lineno=None, offset=None): 1163 with testcase.assertRaisesRegex(SyntaxError, errtext) as cm: 1164 compile(statement, '<test string>', 'exec') 1165 err = cm.exception 1166 testcase.assertIsNotNone(err.lineno) 1167 if lineno is not None: 1168 testcase.assertEqual(err.lineno, lineno) 1169 testcase.assertIsNotNone(err.offset) 1170 if offset is not None: 1171 testcase.assertEqual(err.offset, offset) 1172 1173def check_syntax_warning(testcase, statement, errtext='', *, lineno=1, offset=None): 1174 # Test also that a warning is emitted only once. 1175 with warnings.catch_warnings(record=True) as warns: 1176 warnings.simplefilter('always', SyntaxWarning) 1177 compile(statement, '<testcase>', 'exec') 1178 testcase.assertEqual(len(warns), 1, warns) 1179 1180 warn, = warns 1181 testcase.assertTrue(issubclass(warn.category, SyntaxWarning), warn.category) 1182 if errtext: 1183 testcase.assertRegex(str(warn.message), errtext) 1184 testcase.assertEqual(warn.filename, '<testcase>') 1185 testcase.assertIsNotNone(warn.lineno) 1186 if lineno is not None: 1187 testcase.assertEqual(warn.lineno, lineno) 1188 1189 # SyntaxWarning should be converted to SyntaxError when raised, 1190 # since the latter contains more information and provides better 1191 # error report. 1192 with warnings.catch_warnings(record=True) as warns: 1193 warnings.simplefilter('error', SyntaxWarning) 1194 check_syntax_error(testcase, statement, errtext, 1195 lineno=lineno, offset=offset) 1196 # No warnings are leaked when a SyntaxError is raised. 1197 testcase.assertEqual(warns, []) 1198 1199 1200def open_urlresource(url, *args, **kw): 1201 import urllib.request, urllib.parse 1202 1203 check = kw.pop('check', None) 1204 1205 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 1206 1207 fn = os.path.join(TEST_DATA_DIR, filename) 1208 1209 def check_valid_file(fn): 1210 f = open(fn, *args, **kw) 1211 if check is None: 1212 return f 1213 elif check(f): 1214 f.seek(0) 1215 return f 1216 f.close() 1217 1218 if os.path.exists(fn): 1219 f = check_valid_file(fn) 1220 if f is not None: 1221 return f 1222 unlink(fn) 1223 1224 # Verify the requirement before downloading the file 1225 requires('urlfetch') 1226 1227 if verbose: 1228 print('\tfetching %s ...' % url, file=get_original_stdout()) 1229 opener = urllib.request.build_opener() 1230 if gzip: 1231 opener.addheaders.append(('Accept-Encoding', 'gzip')) 1232 f = opener.open(url, timeout=15) 1233 if gzip and f.headers.get('Content-Encoding') == 'gzip': 1234 f = gzip.GzipFile(fileobj=f) 1235 try: 1236 with open(fn, "wb") as out: 1237 s = f.read() 1238 while s: 1239 out.write(s) 1240 s = f.read() 1241 finally: 1242 f.close() 1243 1244 f = check_valid_file(fn) 1245 if f is not None: 1246 return f 1247 raise TestFailed('invalid resource %r' % fn) 1248 1249 1250class WarningsRecorder(object): 1251 """Convenience wrapper for the warnings list returned on 1252 entry to the warnings.catch_warnings() context manager. 1253 """ 1254 def __init__(self, warnings_list): 1255 self._warnings = warnings_list 1256 self._last = 0 1257 1258 def __getattr__(self, attr): 1259 if len(self._warnings) > self._last: 1260 return getattr(self._warnings[-1], attr) 1261 elif attr in warnings.WarningMessage._WARNING_DETAILS: 1262 return None 1263 raise AttributeError("%r has no attribute %r" % (self, attr)) 1264 1265 @property 1266 def warnings(self): 1267 return self._warnings[self._last:] 1268 1269 def reset(self): 1270 self._last = len(self._warnings) 1271 1272 1273def _filterwarnings(filters, quiet=False): 1274 """Catch the warnings, then check if all the expected 1275 warnings have been raised and re-raise unexpected warnings. 1276 If 'quiet' is True, only re-raise the unexpected warnings. 1277 """ 1278 # Clear the warning registry of the calling module 1279 # in order to re-raise the warnings. 1280 frame = sys._getframe(2) 1281 registry = frame.f_globals.get('__warningregistry__') 1282 if registry: 1283 registry.clear() 1284 with warnings.catch_warnings(record=True) as w: 1285 # Set filter "always" to record all warnings. Because 1286 # test_warnings swap the module, we need to look up in 1287 # the sys.modules dictionary. 1288 sys.modules['warnings'].simplefilter("always") 1289 yield WarningsRecorder(w) 1290 # Filter the recorded warnings 1291 reraise = list(w) 1292 missing = [] 1293 for msg, cat in filters: 1294 seen = False 1295 for w in reraise[:]: 1296 warning = w.message 1297 # Filter out the matching messages 1298 if (re.match(msg, str(warning), re.I) and 1299 issubclass(warning.__class__, cat)): 1300 seen = True 1301 reraise.remove(w) 1302 if not seen and not quiet: 1303 # This filter caught nothing 1304 missing.append((msg, cat.__name__)) 1305 if reraise: 1306 raise AssertionError("unhandled warning %s" % reraise[0]) 1307 if missing: 1308 raise AssertionError("filter (%r, %s) did not catch any warning" % 1309 missing[0]) 1310 1311 1312@contextlib.contextmanager 1313def check_warnings(*filters, **kwargs): 1314 """Context manager to silence warnings. 1315 1316 Accept 2-tuples as positional arguments: 1317 ("message regexp", WarningCategory) 1318 1319 Optional argument: 1320 - if 'quiet' is True, it does not fail if a filter catches nothing 1321 (default True without argument, 1322 default False if some filters are defined) 1323 1324 Without argument, it defaults to: 1325 check_warnings(("", Warning), quiet=True) 1326 """ 1327 quiet = kwargs.get('quiet') 1328 if not filters: 1329 filters = (("", Warning),) 1330 # Preserve backward compatibility 1331 if quiet is None: 1332 quiet = True 1333 return _filterwarnings(filters, quiet) 1334 1335 1336@contextlib.contextmanager 1337def check_no_warnings(testcase, message='', category=Warning, force_gc=False): 1338 """Context manager to check that no warnings are emitted. 1339 1340 This context manager enables a given warning within its scope 1341 and checks that no warnings are emitted even with that warning 1342 enabled. 1343 1344 If force_gc is True, a garbage collection is attempted before checking 1345 for warnings. This may help to catch warnings emitted when objects 1346 are deleted, such as ResourceWarning. 1347 1348 Other keyword arguments are passed to warnings.filterwarnings(). 1349 """ 1350 with warnings.catch_warnings(record=True) as warns: 1351 warnings.filterwarnings('always', 1352 message=message, 1353 category=category) 1354 yield 1355 if force_gc: 1356 gc_collect() 1357 testcase.assertEqual(warns, []) 1358 1359 1360@contextlib.contextmanager 1361def check_no_resource_warning(testcase): 1362 """Context manager to check that no ResourceWarning is emitted. 1363 1364 Usage: 1365 1366 with check_no_resource_warning(self): 1367 f = open(...) 1368 ... 1369 del f 1370 1371 You must remove the object which may emit ResourceWarning before 1372 the end of the context manager. 1373 """ 1374 with check_no_warnings(testcase, category=ResourceWarning, force_gc=True): 1375 yield 1376 1377 1378class CleanImport(object): 1379 """Context manager to force import to return a new module reference. 1380 1381 This is useful for testing module-level behaviours, such as 1382 the emission of a DeprecationWarning on import. 1383 1384 Use like this: 1385 1386 with CleanImport("foo"): 1387 importlib.import_module("foo") # new reference 1388 """ 1389 1390 def __init__(self, *module_names): 1391 self.original_modules = sys.modules.copy() 1392 for module_name in module_names: 1393 if module_name in sys.modules: 1394 module = sys.modules[module_name] 1395 # It is possible that module_name is just an alias for 1396 # another module (e.g. stub for modules renamed in 3.x). 1397 # In that case, we also need delete the real module to clear 1398 # the import cache. 1399 if module.__name__ != module_name: 1400 del sys.modules[module.__name__] 1401 del sys.modules[module_name] 1402 1403 def __enter__(self): 1404 return self 1405 1406 def __exit__(self, *ignore_exc): 1407 sys.modules.update(self.original_modules) 1408 1409 1410class EnvironmentVarGuard(collections.abc.MutableMapping): 1411 1412 """Class to help protect the environment variable properly. Can be used as 1413 a context manager.""" 1414 1415 def __init__(self): 1416 self._environ = os.environ 1417 self._changed = {} 1418 1419 def __getitem__(self, envvar): 1420 return self._environ[envvar] 1421 1422 def __setitem__(self, envvar, value): 1423 # Remember the initial value on the first access 1424 if envvar not in self._changed: 1425 self._changed[envvar] = self._environ.get(envvar) 1426 self._environ[envvar] = value 1427 1428 def __delitem__(self, envvar): 1429 # Remember the initial value on the first access 1430 if envvar not in self._changed: 1431 self._changed[envvar] = self._environ.get(envvar) 1432 if envvar in self._environ: 1433 del self._environ[envvar] 1434 1435 def keys(self): 1436 return self._environ.keys() 1437 1438 def __iter__(self): 1439 return iter(self._environ) 1440 1441 def __len__(self): 1442 return len(self._environ) 1443 1444 def set(self, envvar, value): 1445 self[envvar] = value 1446 1447 def unset(self, envvar): 1448 del self[envvar] 1449 1450 def __enter__(self): 1451 return self 1452 1453 def __exit__(self, *ignore_exc): 1454 for (k, v) in self._changed.items(): 1455 if v is None: 1456 if k in self._environ: 1457 del self._environ[k] 1458 else: 1459 self._environ[k] = v 1460 os.environ = self._environ 1461 1462 1463class DirsOnSysPath(object): 1464 """Context manager to temporarily add directories to sys.path. 1465 1466 This makes a copy of sys.path, appends any directories given 1467 as positional arguments, then reverts sys.path to the copied 1468 settings when the context ends. 1469 1470 Note that *all* sys.path modifications in the body of the 1471 context manager, including replacement of the object, 1472 will be reverted at the end of the block. 1473 """ 1474 1475 def __init__(self, *paths): 1476 self.original_value = sys.path[:] 1477 self.original_object = sys.path 1478 sys.path.extend(paths) 1479 1480 def __enter__(self): 1481 return self 1482 1483 def __exit__(self, *ignore_exc): 1484 sys.path = self.original_object 1485 sys.path[:] = self.original_value 1486 1487 1488class TransientResource(object): 1489 1490 """Raise ResourceDenied if an exception is raised while the context manager 1491 is in effect that matches the specified exception and attributes.""" 1492 1493 def __init__(self, exc, **kwargs): 1494 self.exc = exc 1495 self.attrs = kwargs 1496 1497 def __enter__(self): 1498 return self 1499 1500 def __exit__(self, type_=None, value=None, traceback=None): 1501 """If type_ is a subclass of self.exc and value has attributes matching 1502 self.attrs, raise ResourceDenied. Otherwise let the exception 1503 propagate (if any).""" 1504 if type_ is not None and issubclass(self.exc, type_): 1505 for attr, attr_value in self.attrs.items(): 1506 if not hasattr(value, attr): 1507 break 1508 if getattr(value, attr) != attr_value: 1509 break 1510 else: 1511 raise ResourceDenied("an optional resource is not available") 1512 1513# Context managers that raise ResourceDenied when various issues 1514# with the Internet connection manifest themselves as exceptions. 1515# XXX deprecate these and use transient_internet() instead 1516time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 1517socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1518ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1519 1520 1521def get_socket_conn_refused_errs(): 1522 """ 1523 Get the different socket error numbers ('errno') which can be received 1524 when a connection is refused. 1525 """ 1526 errors = [errno.ECONNREFUSED] 1527 if hasattr(errno, 'ENETUNREACH'): 1528 # On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED 1529 errors.append(errno.ENETUNREACH) 1530 if hasattr(errno, 'EADDRNOTAVAIL'): 1531 # bpo-31910: socket.create_connection() fails randomly 1532 # with EADDRNOTAVAIL on Travis CI 1533 errors.append(errno.EADDRNOTAVAIL) 1534 if hasattr(errno, 'EHOSTUNREACH'): 1535 # bpo-37583: The destination host cannot be reached 1536 errors.append(errno.EHOSTUNREACH) 1537 if not IPV6_ENABLED: 1538 errors.append(errno.EAFNOSUPPORT) 1539 return errors 1540 1541 1542@contextlib.contextmanager 1543def transient_internet(resource_name, *, timeout=30.0, errnos=()): 1544 """Return a context manager that raises ResourceDenied when various issues 1545 with the Internet connection manifest themselves as exceptions.""" 1546 default_errnos = [ 1547 ('ECONNREFUSED', 111), 1548 ('ECONNRESET', 104), 1549 ('EHOSTUNREACH', 113), 1550 ('ENETUNREACH', 101), 1551 ('ETIMEDOUT', 110), 1552 # socket.create_connection() fails randomly with 1553 # EADDRNOTAVAIL on Travis CI. 1554 ('EADDRNOTAVAIL', 99), 1555 ] 1556 default_gai_errnos = [ 1557 ('EAI_AGAIN', -3), 1558 ('EAI_FAIL', -4), 1559 ('EAI_NONAME', -2), 1560 ('EAI_NODATA', -5), 1561 # Encountered when trying to resolve IPv6-only hostnames 1562 ('WSANO_DATA', 11004), 1563 ] 1564 1565 denied = ResourceDenied("Resource %r is not available" % resource_name) 1566 captured_errnos = errnos 1567 gai_errnos = [] 1568 if not captured_errnos: 1569 captured_errnos = [getattr(errno, name, num) 1570 for (name, num) in default_errnos] 1571 gai_errnos = [getattr(socket, name, num) 1572 for (name, num) in default_gai_errnos] 1573 1574 def filter_error(err): 1575 n = getattr(err, 'errno', None) 1576 if (isinstance(err, socket.timeout) or 1577 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1578 (isinstance(err, urllib.error.HTTPError) and 1579 500 <= err.code <= 599) or 1580 (isinstance(err, urllib.error.URLError) and 1581 (("ConnectionRefusedError" in err.reason) or 1582 ("TimeoutError" in err.reason) or 1583 ("EOFError" in err.reason))) or 1584 n in captured_errnos): 1585 if not verbose: 1586 sys.stderr.write(denied.args[0] + "\n") 1587 raise denied from err 1588 1589 old_timeout = socket.getdefaulttimeout() 1590 try: 1591 if timeout is not None: 1592 socket.setdefaulttimeout(timeout) 1593 yield 1594 except nntplib.NNTPTemporaryError as err: 1595 if verbose: 1596 sys.stderr.write(denied.args[0] + "\n") 1597 raise denied from err 1598 except OSError as err: 1599 # urllib can wrap original socket errors multiple times (!), we must 1600 # unwrap to get at the original error. 1601 while True: 1602 a = err.args 1603 if len(a) >= 1 and isinstance(a[0], OSError): 1604 err = a[0] 1605 # The error can also be wrapped as args[1]: 1606 # except socket.error as msg: 1607 # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) 1608 elif len(a) >= 2 and isinstance(a[1], OSError): 1609 err = a[1] 1610 else: 1611 break 1612 filter_error(err) 1613 raise 1614 # XXX should we catch generic exceptions and look for their 1615 # __cause__ or __context__? 1616 finally: 1617 socket.setdefaulttimeout(old_timeout) 1618 1619 1620@contextlib.contextmanager 1621def captured_output(stream_name): 1622 """Return a context manager used by captured_stdout/stdin/stderr 1623 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1624 import io 1625 orig_stdout = getattr(sys, stream_name) 1626 setattr(sys, stream_name, io.StringIO()) 1627 try: 1628 yield getattr(sys, stream_name) 1629 finally: 1630 setattr(sys, stream_name, orig_stdout) 1631 1632def captured_stdout(): 1633 """Capture the output of sys.stdout: 1634 1635 with captured_stdout() as stdout: 1636 print("hello") 1637 self.assertEqual(stdout.getvalue(), "hello\\n") 1638 """ 1639 return captured_output("stdout") 1640 1641def captured_stderr(): 1642 """Capture the output of sys.stderr: 1643 1644 with captured_stderr() as stderr: 1645 print("hello", file=sys.stderr) 1646 self.assertEqual(stderr.getvalue(), "hello\\n") 1647 """ 1648 return captured_output("stderr") 1649 1650def captured_stdin(): 1651 """Capture the input to sys.stdin: 1652 1653 with captured_stdin() as stdin: 1654 stdin.write('hello\\n') 1655 stdin.seek(0) 1656 # call test code that consumes from sys.stdin 1657 captured = input() 1658 self.assertEqual(captured, "hello") 1659 """ 1660 return captured_output("stdin") 1661 1662 1663def gc_collect(): 1664 """Force as many objects as possible to be collected. 1665 1666 In non-CPython implementations of Python, this is needed because timely 1667 deallocation is not guaranteed by the garbage collector. (Even in CPython 1668 this can be the case in case of reference cycles.) This means that __del__ 1669 methods may be called later than expected and weakrefs may remain alive for 1670 longer than expected. This function tries its best to force all garbage 1671 objects to disappear. 1672 """ 1673 gc.collect() 1674 if is_jython: 1675 time.sleep(0.1) 1676 gc.collect() 1677 gc.collect() 1678 1679@contextlib.contextmanager 1680def disable_gc(): 1681 have_gc = gc.isenabled() 1682 gc.disable() 1683 try: 1684 yield 1685 finally: 1686 if have_gc: 1687 gc.enable() 1688 1689 1690def python_is_optimized(): 1691 """Find if Python was built with optimizations.""" 1692 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1693 final_opt = "" 1694 for opt in cflags.split(): 1695 if opt.startswith('-O'): 1696 final_opt = opt 1697 return final_opt not in ('', '-O0', '-Og') 1698 1699 1700_header = 'nP' 1701_align = '0n' 1702if hasattr(sys, "getobjects"): 1703 _header = '2P' + _header 1704 _align = '0P' 1705_vheader = _header + 'n' 1706 1707def calcobjsize(fmt): 1708 return struct.calcsize(_header + fmt + _align) 1709 1710def calcvobjsize(fmt): 1711 return struct.calcsize(_vheader + fmt + _align) 1712 1713 1714_TPFLAGS_HAVE_GC = 1<<14 1715_TPFLAGS_HEAPTYPE = 1<<9 1716 1717def check_sizeof(test, o, size): 1718 import _testcapi 1719 result = sys.getsizeof(o) 1720 # add GC header size 1721 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1722 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1723 size += _testcapi.SIZEOF_PYGC_HEAD 1724 msg = 'wrong size for %s: got %d, expected %d' \ 1725 % (type(o), result, size) 1726 test.assertEqual(result, size, msg) 1727 1728#======================================================================= 1729# Decorator for running a function in a different locale, correctly resetting 1730# it afterwards. 1731 1732def run_with_locale(catstr, *locales): 1733 def decorator(func): 1734 def inner(*args, **kwds): 1735 try: 1736 import locale 1737 category = getattr(locale, catstr) 1738 orig_locale = locale.setlocale(category) 1739 except AttributeError: 1740 # if the test author gives us an invalid category string 1741 raise 1742 except: 1743 # cannot retrieve original locale, so do nothing 1744 locale = orig_locale = None 1745 else: 1746 for loc in locales: 1747 try: 1748 locale.setlocale(category, loc) 1749 break 1750 except: 1751 pass 1752 1753 # now run the function, resetting the locale on exceptions 1754 try: 1755 return func(*args, **kwds) 1756 finally: 1757 if locale and orig_locale: 1758 locale.setlocale(category, orig_locale) 1759 inner.__name__ = func.__name__ 1760 inner.__doc__ = func.__doc__ 1761 return inner 1762 return decorator 1763 1764#======================================================================= 1765# Decorator for running a function in a specific timezone, correctly 1766# resetting it afterwards. 1767 1768def run_with_tz(tz): 1769 def decorator(func): 1770 def inner(*args, **kwds): 1771 try: 1772 tzset = time.tzset 1773 except AttributeError: 1774 raise unittest.SkipTest("tzset required") 1775 if 'TZ' in os.environ: 1776 orig_tz = os.environ['TZ'] 1777 else: 1778 orig_tz = None 1779 os.environ['TZ'] = tz 1780 tzset() 1781 1782 # now run the function, resetting the tz on exceptions 1783 try: 1784 return func(*args, **kwds) 1785 finally: 1786 if orig_tz is None: 1787 del os.environ['TZ'] 1788 else: 1789 os.environ['TZ'] = orig_tz 1790 time.tzset() 1791 1792 inner.__name__ = func.__name__ 1793 inner.__doc__ = func.__doc__ 1794 return inner 1795 return decorator 1796 1797#======================================================================= 1798# Big-memory-test support. Separate from 'resources' because memory use 1799# should be configurable. 1800 1801# Some handy shorthands. Note that these are used for byte-limits as well 1802# as size-limits, in the various bigmem tests 1803_1M = 1024*1024 1804_1G = 1024 * _1M 1805_2G = 2 * _1G 1806_4G = 4 * _1G 1807 1808MAX_Py_ssize_t = sys.maxsize 1809 1810def set_memlimit(limit): 1811 global max_memuse 1812 global real_max_memuse 1813 sizes = { 1814 'k': 1024, 1815 'm': _1M, 1816 'g': _1G, 1817 't': 1024*_1G, 1818 } 1819 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1820 re.IGNORECASE | re.VERBOSE) 1821 if m is None: 1822 raise ValueError('Invalid memory limit %r' % (limit,)) 1823 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1824 real_max_memuse = memlimit 1825 if memlimit > MAX_Py_ssize_t: 1826 memlimit = MAX_Py_ssize_t 1827 if memlimit < _2G - 1: 1828 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1829 max_memuse = memlimit 1830 1831class _MemoryWatchdog: 1832 """An object which periodically watches the process' memory consumption 1833 and prints it out. 1834 """ 1835 1836 def __init__(self): 1837 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1838 self.started = False 1839 1840 def start(self): 1841 try: 1842 f = open(self.procfile, 'r') 1843 except OSError as e: 1844 warnings.warn('/proc not available for stats: {}'.format(e), 1845 RuntimeWarning) 1846 sys.stderr.flush() 1847 return 1848 1849 with f: 1850 watchdog_script = findfile("memory_watchdog.py") 1851 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1852 stdin=f, 1853 stderr=subprocess.DEVNULL) 1854 self.started = True 1855 1856 def stop(self): 1857 if self.started: 1858 self.mem_watchdog.terminate() 1859 self.mem_watchdog.wait() 1860 1861 1862def bigmemtest(size, memuse, dry_run=True): 1863 """Decorator for bigmem tests. 1864 1865 'size' is a requested size for the test (in arbitrary, test-interpreted 1866 units.) 'memuse' is the number of bytes per unit for the test, or a good 1867 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1868 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1869 1870 The 'size' argument is normally passed to the decorated test method as an 1871 extra argument. If 'dry_run' is true, the value passed to the test method 1872 may be less than the requested value. If 'dry_run' is false, it means the 1873 test doesn't support dummy runs when -M is not specified. 1874 """ 1875 def decorator(f): 1876 def wrapper(self): 1877 size = wrapper.size 1878 memuse = wrapper.memuse 1879 if not real_max_memuse: 1880 maxsize = 5147 1881 else: 1882 maxsize = size 1883 1884 if ((real_max_memuse or not dry_run) 1885 and real_max_memuse < maxsize * memuse): 1886 raise unittest.SkipTest( 1887 "not enough memory: %.1fG minimum needed" 1888 % (size * memuse / (1024 ** 3))) 1889 1890 if real_max_memuse and verbose: 1891 print() 1892 print(" ... expected peak memory use: {peak:.1f}G" 1893 .format(peak=size * memuse / (1024 ** 3))) 1894 watchdog = _MemoryWatchdog() 1895 watchdog.start() 1896 else: 1897 watchdog = None 1898 1899 try: 1900 return f(self, maxsize) 1901 finally: 1902 if watchdog: 1903 watchdog.stop() 1904 1905 wrapper.size = size 1906 wrapper.memuse = memuse 1907 return wrapper 1908 return decorator 1909 1910def bigaddrspacetest(f): 1911 """Decorator for tests that fill the address space.""" 1912 def wrapper(self): 1913 if max_memuse < MAX_Py_ssize_t: 1914 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1915 raise unittest.SkipTest( 1916 "not enough memory: try a 32-bit build instead") 1917 else: 1918 raise unittest.SkipTest( 1919 "not enough memory: %.1fG minimum needed" 1920 % (MAX_Py_ssize_t / (1024 ** 3))) 1921 else: 1922 return f(self) 1923 return wrapper 1924 1925#======================================================================= 1926# unittest integration. 1927 1928class BasicTestRunner: 1929 def run(self, test): 1930 result = unittest.TestResult() 1931 test(result) 1932 return result 1933 1934def _id(obj): 1935 return obj 1936 1937def requires_resource(resource): 1938 if resource == 'gui' and not _is_gui_available(): 1939 return unittest.skip(_is_gui_available.reason) 1940 if is_resource_enabled(resource): 1941 return _id 1942 else: 1943 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1944 1945def cpython_only(test): 1946 """ 1947 Decorator for tests only applicable on CPython. 1948 """ 1949 return impl_detail(cpython=True)(test) 1950 1951def impl_detail(msg=None, **guards): 1952 if check_impl_detail(**guards): 1953 return _id 1954 if msg is None: 1955 guardnames, default = _parse_guards(guards) 1956 if default: 1957 msg = "implementation detail not available on {0}" 1958 else: 1959 msg = "implementation detail specific to {0}" 1960 guardnames = sorted(guardnames.keys()) 1961 msg = msg.format(' or '.join(guardnames)) 1962 return unittest.skip(msg) 1963 1964def _parse_guards(guards): 1965 # Returns a tuple ({platform_name: run_me}, default_value) 1966 if not guards: 1967 return ({'cpython': True}, False) 1968 is_true = list(guards.values())[0] 1969 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1970 return (guards, not is_true) 1971 1972# Use the following check to guard CPython's implementation-specific tests -- 1973# or to run them only on the implementation(s) guarded by the arguments. 1974def check_impl_detail(**guards): 1975 """This function returns True or False depending on the host platform. 1976 Examples: 1977 if check_impl_detail(): # only on CPython (default) 1978 if check_impl_detail(jython=True): # only on Jython 1979 if check_impl_detail(cpython=False): # everywhere except on CPython 1980 """ 1981 guards, default = _parse_guards(guards) 1982 return guards.get(platform.python_implementation().lower(), default) 1983 1984 1985def no_tracing(func): 1986 """Decorator to temporarily turn off tracing for the duration of a test.""" 1987 if not hasattr(sys, 'gettrace'): 1988 return func 1989 else: 1990 @functools.wraps(func) 1991 def wrapper(*args, **kwargs): 1992 original_trace = sys.gettrace() 1993 try: 1994 sys.settrace(None) 1995 return func(*args, **kwargs) 1996 finally: 1997 sys.settrace(original_trace) 1998 return wrapper 1999 2000 2001def refcount_test(test): 2002 """Decorator for tests which involve reference counting. 2003 2004 To start, the decorator does not run the test if is not run by CPython. 2005 After that, any trace function is unset during the test to prevent 2006 unexpected refcounts caused by the trace function. 2007 2008 """ 2009 return no_tracing(cpython_only(test)) 2010 2011 2012def _filter_suite(suite, pred): 2013 """Recursively filter test cases in a suite based on a predicate.""" 2014 newtests = [] 2015 for test in suite._tests: 2016 if isinstance(test, unittest.TestSuite): 2017 _filter_suite(test, pred) 2018 newtests.append(test) 2019 else: 2020 if pred(test): 2021 newtests.append(test) 2022 suite._tests = newtests 2023 2024def _run_suite(suite): 2025 """Run tests from a unittest.TestSuite-derived class.""" 2026 runner = get_test_runner(sys.stdout, 2027 verbosity=verbose, 2028 capture_output=(junit_xml_list is not None)) 2029 2030 result = runner.run(suite) 2031 2032 if junit_xml_list is not None: 2033 junit_xml_list.append(result.get_xml_element()) 2034 2035 if not result.testsRun and not result.skipped: 2036 raise TestDidNotRun 2037 if not result.wasSuccessful(): 2038 if len(result.errors) == 1 and not result.failures: 2039 err = result.errors[0][1] 2040 elif len(result.failures) == 1 and not result.errors: 2041 err = result.failures[0][1] 2042 else: 2043 err = "multiple errors occurred" 2044 if not verbose: err += "; run in verbose mode for details" 2045 raise TestFailed(err) 2046 2047 2048# By default, don't filter tests 2049_match_test_func = None 2050_match_test_patterns = None 2051 2052 2053def match_test(test): 2054 # Function used by support.run_unittest() and regrtest --list-cases 2055 if _match_test_func is None: 2056 return True 2057 else: 2058 return _match_test_func(test.id()) 2059 2060 2061def _is_full_match_test(pattern): 2062 # If a pattern contains at least one dot, it's considered 2063 # as a full test identifier. 2064 # Example: 'test.test_os.FileTests.test_access'. 2065 # 2066 # Reject patterns which contain fnmatch patterns: '*', '?', '[...]' 2067 # or '[!...]'. For example, reject 'test_access*'. 2068 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 2069 2070 2071def set_match_tests(patterns): 2072 global _match_test_func, _match_test_patterns 2073 2074 if patterns == _match_test_patterns: 2075 # No change: no need to recompile patterns. 2076 return 2077 2078 if not patterns: 2079 func = None 2080 # set_match_tests(None) behaves as set_match_tests(()) 2081 patterns = () 2082 elif all(map(_is_full_match_test, patterns)): 2083 # Simple case: all patterns are full test identifier. 2084 # The test.bisect_cmd utility only uses such full test identifiers. 2085 func = set(patterns).__contains__ 2086 else: 2087 regex = '|'.join(map(fnmatch.translate, patterns)) 2088 # The search *is* case sensitive on purpose: 2089 # don't use flags=re.IGNORECASE 2090 regex_match = re.compile(regex).match 2091 2092 def match_test_regex(test_id): 2093 if regex_match(test_id): 2094 # The regex matches the whole identifier, for example 2095 # 'test.test_os.FileTests.test_access'. 2096 return True 2097 else: 2098 # Try to match parts of the test identifier. 2099 # For example, split 'test.test_os.FileTests.test_access' 2100 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 2101 return any(map(regex_match, test_id.split("."))) 2102 2103 func = match_test_regex 2104 2105 # Create a copy since patterns can be mutable and so modified later 2106 _match_test_patterns = tuple(patterns) 2107 _match_test_func = func 2108 2109 2110 2111def run_unittest(*classes): 2112 """Run tests from unittest.TestCase-derived classes.""" 2113 valid_types = (unittest.TestSuite, unittest.TestCase) 2114 suite = unittest.TestSuite() 2115 for cls in classes: 2116 if isinstance(cls, str): 2117 if cls in sys.modules: 2118 suite.addTest(unittest.findTestCases(sys.modules[cls])) 2119 else: 2120 raise ValueError("str arguments must be keys in sys.modules") 2121 elif isinstance(cls, valid_types): 2122 suite.addTest(cls) 2123 else: 2124 suite.addTest(unittest.makeSuite(cls)) 2125 _filter_suite(suite, match_test) 2126 _run_suite(suite) 2127 2128#======================================================================= 2129# Check for the presence of docstrings. 2130 2131# Rather than trying to enumerate all the cases where docstrings may be 2132# disabled, we just check for that directly 2133 2134def _check_docstrings(): 2135 """Just used to check if docstrings are enabled""" 2136 2137MISSING_C_DOCSTRINGS = (check_impl_detail() and 2138 sys.platform != 'win32' and 2139 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 2140 2141HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 2142 not MISSING_C_DOCSTRINGS) 2143 2144requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 2145 "test requires docstrings") 2146 2147 2148#======================================================================= 2149# doctest driver. 2150 2151def run_doctest(module, verbosity=None, optionflags=0): 2152 """Run doctest on the given module. Return (#failures, #tests). 2153 2154 If optional argument verbosity is not specified (or is None), pass 2155 support's belief about verbosity on to doctest. Else doctest's 2156 usual behavior is used (it searches sys.argv for -v). 2157 """ 2158 2159 import doctest 2160 2161 if verbosity is None: 2162 verbosity = verbose 2163 else: 2164 verbosity = None 2165 2166 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 2167 if f: 2168 raise TestFailed("%d of %d doctests failed" % (f, t)) 2169 if verbose: 2170 print('doctest (%s) ... %d tests with zero failures' % 2171 (module.__name__, t)) 2172 return f, t 2173 2174 2175#======================================================================= 2176# Support for saving and restoring the imported modules. 2177 2178def modules_setup(): 2179 return sys.modules.copy(), 2180 2181def modules_cleanup(oldmodules): 2182 # Encoders/decoders are registered permanently within the internal 2183 # codec cache. If we destroy the corresponding modules their 2184 # globals will be set to None which will trip up the cached functions. 2185 encodings = [(k, v) for k, v in sys.modules.items() 2186 if k.startswith('encodings.')] 2187 sys.modules.clear() 2188 sys.modules.update(encodings) 2189 # XXX: This kind of problem can affect more than just encodings. In particular 2190 # extension modules (such as _ssl) don't cope with reloading properly. 2191 # Really, test modules should be cleaning out the test specific modules they 2192 # know they added (ala test_runpy) rather than relying on this function (as 2193 # test_importhooks and test_pkg do currently). 2194 # Implicitly imported *real* modules should be left alone (see issue 10556). 2195 sys.modules.update(oldmodules) 2196 2197#======================================================================= 2198# Threading support to prevent reporting refleaks when running regrtest.py -R 2199 2200# Flag used by saved_test_environment of test.libregrtest.save_env, 2201# to check if a test modified the environment. The flag should be set to False 2202# before running a new test. 2203# 2204# For example, threading_cleanup() sets the flag is the function fails 2205# to cleanup threads. 2206environment_altered = False 2207 2208# NOTE: we use thread._count() rather than threading.enumerate() (or the 2209# moral equivalent thereof) because a threading.Thread object is still alive 2210# until its __bootstrap() method has returned, even after it has been 2211# unregistered from the threading module. 2212# thread._count(), on the other hand, only gets decremented *after* the 2213# __bootstrap() method has returned, which gives us reliable reference counts 2214# at the end of a test run. 2215 2216def threading_setup(): 2217 return _thread._count(), threading._dangling.copy() 2218 2219def threading_cleanup(*original_values): 2220 global environment_altered 2221 2222 _MAX_COUNT = 100 2223 2224 for count in range(_MAX_COUNT): 2225 values = _thread._count(), threading._dangling 2226 if values == original_values: 2227 break 2228 2229 if not count: 2230 # Display a warning at the first iteration 2231 environment_altered = True 2232 dangling_threads = values[1] 2233 print("Warning -- threading_cleanup() failed to cleanup " 2234 "%s threads (count: %s, dangling: %s)" 2235 % (values[0] - original_values[0], 2236 values[0], len(dangling_threads)), 2237 file=sys.stderr) 2238 for thread in dangling_threads: 2239 print(f"Dangling thread: {thread!r}", file=sys.stderr) 2240 sys.stderr.flush() 2241 2242 # Don't hold references to threads 2243 dangling_threads = None 2244 values = None 2245 2246 time.sleep(0.01) 2247 gc_collect() 2248 2249 2250def reap_threads(func): 2251 """Use this function when threads are being used. This will 2252 ensure that the threads are cleaned up even when the test fails. 2253 """ 2254 @functools.wraps(func) 2255 def decorator(*args): 2256 key = threading_setup() 2257 try: 2258 return func(*args) 2259 finally: 2260 threading_cleanup(*key) 2261 return decorator 2262 2263 2264@contextlib.contextmanager 2265def wait_threads_exit(timeout=60.0): 2266 """ 2267 bpo-31234: Context manager to wait until all threads created in the with 2268 statement exit. 2269 2270 Use _thread.count() to check if threads exited. Indirectly, wait until 2271 threads exit the internal t_bootstrap() C function of the _thread module. 2272 2273 threading_setup() and threading_cleanup() are designed to emit a warning 2274 if a test leaves running threads in the background. This context manager 2275 is designed to cleanup threads started by the _thread.start_new_thread() 2276 which doesn't allow to wait for thread exit, whereas thread.Thread has a 2277 join() method. 2278 """ 2279 old_count = _thread._count() 2280 try: 2281 yield 2282 finally: 2283 start_time = time.monotonic() 2284 deadline = start_time + timeout 2285 while True: 2286 count = _thread._count() 2287 if count <= old_count: 2288 break 2289 if time.monotonic() > deadline: 2290 dt = time.monotonic() - start_time 2291 msg = (f"wait_threads() failed to cleanup {count - old_count} " 2292 f"threads after {dt:.1f} seconds " 2293 f"(count: {count}, old count: {old_count})") 2294 raise AssertionError(msg) 2295 time.sleep(0.010) 2296 gc_collect() 2297 2298 2299def join_thread(thread, timeout=30.0): 2300 """Join a thread. Raise an AssertionError if the thread is still alive 2301 after timeout seconds. 2302 """ 2303 thread.join(timeout) 2304 if thread.is_alive(): 2305 msg = f"failed to join the thread in {timeout:.1f} seconds" 2306 raise AssertionError(msg) 2307 2308 2309def reap_children(): 2310 """Use this function at the end of test_main() whenever sub-processes 2311 are started. This will help ensure that no extra children (zombies) 2312 stick around to hog resources and create problems when looking 2313 for refleaks. 2314 """ 2315 global environment_altered 2316 2317 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 2318 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 2319 return 2320 2321 # Reap all our dead child processes so we don't leave zombies around. 2322 # These hog resources and might be causing some of the buildbots to die. 2323 while True: 2324 try: 2325 # Read the exit status of any child process which already completed 2326 pid, status = os.waitpid(-1, os.WNOHANG) 2327 except OSError: 2328 break 2329 2330 if pid == 0: 2331 break 2332 2333 print("Warning -- reap_children() reaped child process %s" 2334 % pid, file=sys.stderr) 2335 environment_altered = True 2336 2337 2338@contextlib.contextmanager 2339def start_threads(threads, unlock=None): 2340 threads = list(threads) 2341 started = [] 2342 try: 2343 try: 2344 for t in threads: 2345 t.start() 2346 started.append(t) 2347 except: 2348 if verbose: 2349 print("Can't start %d threads, only %d threads started" % 2350 (len(threads), len(started))) 2351 raise 2352 yield 2353 finally: 2354 try: 2355 if unlock: 2356 unlock() 2357 endtime = starttime = time.monotonic() 2358 for timeout in range(1, 16): 2359 endtime += 60 2360 for t in started: 2361 t.join(max(endtime - time.monotonic(), 0.01)) 2362 started = [t for t in started if t.is_alive()] 2363 if not started: 2364 break 2365 if verbose: 2366 print('Unable to join %d threads during a period of ' 2367 '%d minutes' % (len(started), timeout)) 2368 finally: 2369 started = [t for t in started if t.is_alive()] 2370 if started: 2371 faulthandler.dump_traceback(sys.stdout) 2372 raise AssertionError('Unable to join %d threads' % len(started)) 2373 2374@contextlib.contextmanager 2375def swap_attr(obj, attr, new_val): 2376 """Temporary swap out an attribute with a new object. 2377 2378 Usage: 2379 with swap_attr(obj, "attr", 5): 2380 ... 2381 2382 This will set obj.attr to 5 for the duration of the with: block, 2383 restoring the old value at the end of the block. If `attr` doesn't 2384 exist on `obj`, it will be created and then deleted at the end of the 2385 block. 2386 2387 The old value (or None if it doesn't exist) will be assigned to the 2388 target of the "as" clause, if there is one. 2389 """ 2390 if hasattr(obj, attr): 2391 real_val = getattr(obj, attr) 2392 setattr(obj, attr, new_val) 2393 try: 2394 yield real_val 2395 finally: 2396 setattr(obj, attr, real_val) 2397 else: 2398 setattr(obj, attr, new_val) 2399 try: 2400 yield 2401 finally: 2402 if hasattr(obj, attr): 2403 delattr(obj, attr) 2404 2405@contextlib.contextmanager 2406def swap_item(obj, item, new_val): 2407 """Temporary swap out an item with a new object. 2408 2409 Usage: 2410 with swap_item(obj, "item", 5): 2411 ... 2412 2413 This will set obj["item"] to 5 for the duration of the with: block, 2414 restoring the old value at the end of the block. If `item` doesn't 2415 exist on `obj`, it will be created and then deleted at the end of the 2416 block. 2417 2418 The old value (or None if it doesn't exist) will be assigned to the 2419 target of the "as" clause, if there is one. 2420 """ 2421 if item in obj: 2422 real_val = obj[item] 2423 obj[item] = new_val 2424 try: 2425 yield real_val 2426 finally: 2427 obj[item] = real_val 2428 else: 2429 obj[item] = new_val 2430 try: 2431 yield 2432 finally: 2433 if item in obj: 2434 del obj[item] 2435 2436def strip_python_stderr(stderr): 2437 """Strip the stderr of a Python process from potential debug output 2438 emitted by the interpreter. 2439 2440 This will typically be run on the result of the communicate() method 2441 of a subprocess.Popen object. 2442 """ 2443 stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip() 2444 return stderr 2445 2446requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), 2447 'types are immortal if COUNT_ALLOCS is defined') 2448 2449def args_from_interpreter_flags(): 2450 """Return a list of command-line arguments reproducing the current 2451 settings in sys.flags and sys.warnoptions.""" 2452 return subprocess._args_from_interpreter_flags() 2453 2454def optim_args_from_interpreter_flags(): 2455 """Return a list of command-line arguments reproducing the current 2456 optimization settings in sys.flags.""" 2457 return subprocess._optim_args_from_interpreter_flags() 2458 2459#============================================================ 2460# Support for assertions about logging. 2461#============================================================ 2462 2463class TestHandler(logging.handlers.BufferingHandler): 2464 def __init__(self, matcher): 2465 # BufferingHandler takes a "capacity" argument 2466 # so as to know when to flush. As we're overriding 2467 # shouldFlush anyway, we can set a capacity of zero. 2468 # You can call flush() manually to clear out the 2469 # buffer. 2470 logging.handlers.BufferingHandler.__init__(self, 0) 2471 self.matcher = matcher 2472 2473 def shouldFlush(self): 2474 return False 2475 2476 def emit(self, record): 2477 self.format(record) 2478 self.buffer.append(record.__dict__) 2479 2480 def matches(self, **kwargs): 2481 """ 2482 Look for a saved dict whose keys/values match the supplied arguments. 2483 """ 2484 result = False 2485 for d in self.buffer: 2486 if self.matcher.matches(d, **kwargs): 2487 result = True 2488 break 2489 return result 2490 2491class Matcher(object): 2492 2493 _partial_matches = ('msg', 'message') 2494 2495 def matches(self, d, **kwargs): 2496 """ 2497 Try to match a single dict with the supplied arguments. 2498 2499 Keys whose values are strings and which are in self._partial_matches 2500 will be checked for partial (i.e. substring) matches. You can extend 2501 this scheme to (for example) do regular expression matching, etc. 2502 """ 2503 result = True 2504 for k in kwargs: 2505 v = kwargs[k] 2506 dv = d.get(k) 2507 if not self.match_value(k, dv, v): 2508 result = False 2509 break 2510 return result 2511 2512 def match_value(self, k, dv, v): 2513 """ 2514 Try to match a single stored value (dv) with a supplied value (v). 2515 """ 2516 if type(v) != type(dv): 2517 result = False 2518 elif type(dv) is not str or k not in self._partial_matches: 2519 result = (v == dv) 2520 else: 2521 result = dv.find(v) >= 0 2522 return result 2523 2524 2525_can_symlink = None 2526def can_symlink(): 2527 global _can_symlink 2528 if _can_symlink is not None: 2529 return _can_symlink 2530 symlink_path = TESTFN + "can_symlink" 2531 try: 2532 os.symlink(TESTFN, symlink_path) 2533 can = True 2534 except (OSError, NotImplementedError, AttributeError): 2535 can = False 2536 else: 2537 os.remove(symlink_path) 2538 _can_symlink = can 2539 return can 2540 2541def skip_unless_symlink(test): 2542 """Skip decorator for tests that require functional symlink""" 2543 ok = can_symlink() 2544 msg = "Requires functional symlink implementation" 2545 return test if ok else unittest.skip(msg)(test) 2546 2547_buggy_ucrt = None 2548def skip_if_buggy_ucrt_strfptime(test): 2549 """ 2550 Skip decorator for tests that use buggy strptime/strftime 2551 2552 If the UCRT bugs are present time.localtime().tm_zone will be 2553 an empty string, otherwise we assume the UCRT bugs are fixed 2554 2555 See bpo-37552 [Windows] strptime/strftime return invalid 2556 results with UCRT version 17763.615 2557 """ 2558 global _buggy_ucrt 2559 if _buggy_ucrt is None: 2560 if(sys.platform == 'win32' and 2561 locale.getdefaultlocale()[1] == 'cp65001' and 2562 time.localtime().tm_zone == ''): 2563 _buggy_ucrt = True 2564 else: 2565 _buggy_ucrt = False 2566 return unittest.skip("buggy MSVC UCRT strptime/strftime")(test) if _buggy_ucrt else test 2567 2568class PythonSymlink: 2569 """Creates a symlink for the current Python executable""" 2570 def __init__(self, link=None): 2571 self.link = link or os.path.abspath(TESTFN) 2572 self._linked = [] 2573 self.real = os.path.realpath(sys.executable) 2574 self._also_link = [] 2575 2576 self._env = None 2577 2578 self._platform_specific() 2579 2580 def _platform_specific(self): 2581 pass 2582 2583 if sys.platform == "win32": 2584 def _platform_specific(self): 2585 import _winapi 2586 2587 if os.path.lexists(self.real) and not os.path.exists(self.real): 2588 # App symlink appears to not exist, but we want the 2589 # real executable here anyway 2590 self.real = _winapi.GetModuleFileName(0) 2591 2592 dll = _winapi.GetModuleFileName(sys.dllhandle) 2593 src_dir = os.path.dirname(dll) 2594 dest_dir = os.path.dirname(self.link) 2595 self._also_link.append(( 2596 dll, 2597 os.path.join(dest_dir, os.path.basename(dll)) 2598 )) 2599 for runtime in glob.glob(os.path.join(src_dir, "vcruntime*.dll")): 2600 self._also_link.append(( 2601 runtime, 2602 os.path.join(dest_dir, os.path.basename(runtime)) 2603 )) 2604 2605 self._env = {k.upper(): os.getenv(k) for k in os.environ} 2606 self._env["PYTHONHOME"] = os.path.dirname(self.real) 2607 if sysconfig.is_python_build(True): 2608 self._env["PYTHONPATH"] = os.path.dirname(os.__file__) 2609 2610 def __enter__(self): 2611 os.symlink(self.real, self.link) 2612 self._linked.append(self.link) 2613 for real, link in self._also_link: 2614 os.symlink(real, link) 2615 self._linked.append(link) 2616 return self 2617 2618 def __exit__(self, exc_type, exc_value, exc_tb): 2619 for link in self._linked: 2620 try: 2621 os.remove(link) 2622 except IOError as ex: 2623 if verbose: 2624 print("failed to clean up {}: {}".format(link, ex)) 2625 2626 def _call(self, python, args, env, returncode): 2627 cmd = [python, *args] 2628 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 2629 stderr=subprocess.PIPE, env=env) 2630 r = p.communicate() 2631 if p.returncode != returncode: 2632 if verbose: 2633 print(repr(r[0])) 2634 print(repr(r[1]), file=sys.stderr) 2635 raise RuntimeError( 2636 'unexpected return code: {0} (0x{0:08X})'.format(p.returncode)) 2637 return r 2638 2639 def call_real(self, *args, returncode=0): 2640 return self._call(self.real, args, None, returncode) 2641 2642 def call_link(self, *args, returncode=0): 2643 return self._call(self.link, args, self._env, returncode) 2644 2645 2646_can_xattr = None 2647def can_xattr(): 2648 global _can_xattr 2649 if _can_xattr is not None: 2650 return _can_xattr 2651 if not hasattr(os, "setxattr"): 2652 can = False 2653 else: 2654 tmp_dir = tempfile.mkdtemp() 2655 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 2656 try: 2657 with open(TESTFN, "wb") as fp: 2658 try: 2659 # TESTFN & tempfile may use different file systems with 2660 # different capabilities 2661 os.setxattr(tmp_fp, b"user.test", b"") 2662 os.setxattr(tmp_name, b"trusted.foo", b"42") 2663 os.setxattr(fp.fileno(), b"user.test", b"") 2664 # Kernels < 2.6.39 don't respect setxattr flags. 2665 kernel_version = platform.release() 2666 m = re.match(r"2.6.(\d{1,2})", kernel_version) 2667 can = m is None or int(m.group(1)) >= 39 2668 except OSError: 2669 can = False 2670 finally: 2671 unlink(TESTFN) 2672 unlink(tmp_name) 2673 rmdir(tmp_dir) 2674 _can_xattr = can 2675 return can 2676 2677def skip_unless_xattr(test): 2678 """Skip decorator for tests that require functional extended attributes""" 2679 ok = can_xattr() 2680 msg = "no non-broken extended attribute support" 2681 return test if ok else unittest.skip(msg)(test) 2682 2683def skip_if_pgo_task(test): 2684 """Skip decorator for tests not run in (non-extended) PGO task""" 2685 ok = not PGO or PGO_EXTENDED 2686 msg = "Not run for (non-extended) PGO task" 2687 return test if ok else unittest.skip(msg)(test) 2688 2689_bind_nix_socket_error = None 2690def skip_unless_bind_unix_socket(test): 2691 """Decorator for tests requiring a functional bind() for unix sockets.""" 2692 if not hasattr(socket, 'AF_UNIX'): 2693 return unittest.skip('No UNIX Sockets')(test) 2694 global _bind_nix_socket_error 2695 if _bind_nix_socket_error is None: 2696 path = TESTFN + "can_bind_unix_socket" 2697 with socket.socket(socket.AF_UNIX) as sock: 2698 try: 2699 sock.bind(path) 2700 _bind_nix_socket_error = False 2701 except OSError as e: 2702 _bind_nix_socket_error = e 2703 finally: 2704 unlink(path) 2705 if _bind_nix_socket_error: 2706 msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error 2707 return unittest.skip(msg)(test) 2708 else: 2709 return test 2710 2711 2712def fs_is_case_insensitive(directory): 2713 """Detects if the file system for the specified directory is case-insensitive.""" 2714 with tempfile.NamedTemporaryFile(dir=directory) as base: 2715 base_path = base.name 2716 case_path = base_path.upper() 2717 if case_path == base_path: 2718 case_path = base_path.lower() 2719 try: 2720 return os.path.samefile(base_path, case_path) 2721 except FileNotFoundError: 2722 return False 2723 2724 2725def detect_api_mismatch(ref_api, other_api, *, ignore=()): 2726 """Returns the set of items in ref_api not in other_api, except for a 2727 defined list of items to be ignored in this check. 2728 2729 By default this skips private attributes beginning with '_' but 2730 includes all magic methods, i.e. those starting and ending in '__'. 2731 """ 2732 missing_items = set(dir(ref_api)) - set(dir(other_api)) 2733 if ignore: 2734 missing_items -= set(ignore) 2735 missing_items = set(m for m in missing_items 2736 if not m.startswith('_') or m.endswith('__')) 2737 return missing_items 2738 2739 2740def check__all__(test_case, module, name_of_module=None, extra=(), 2741 blacklist=()): 2742 """Assert that the __all__ variable of 'module' contains all public names. 2743 2744 The module's public names (its API) are detected automatically based on 2745 whether they match the public name convention and were defined in 2746 'module'. 2747 2748 The 'name_of_module' argument can specify (as a string or tuple thereof) 2749 what module(s) an API could be defined in in order to be detected as a 2750 public API. One case for this is when 'module' imports part of its public 2751 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 2752 2753 The 'extra' argument can be a set of names that wouldn't otherwise be 2754 automatically detected as "public", like objects without a proper 2755 '__module__' attribute. If provided, it will be added to the 2756 automatically detected ones. 2757 2758 The 'blacklist' argument can be a set of names that must not be treated 2759 as part of the public API even though their names indicate otherwise. 2760 2761 Usage: 2762 import bar 2763 import foo 2764 import unittest 2765 from test import support 2766 2767 class MiscTestCase(unittest.TestCase): 2768 def test__all__(self): 2769 support.check__all__(self, foo) 2770 2771 class OtherTestCase(unittest.TestCase): 2772 def test__all__(self): 2773 extra = {'BAR_CONST', 'FOO_CONST'} 2774 blacklist = {'baz'} # Undocumented name. 2775 # bar imports part of its API from _bar. 2776 support.check__all__(self, bar, ('bar', '_bar'), 2777 extra=extra, blacklist=blacklist) 2778 2779 """ 2780 2781 if name_of_module is None: 2782 name_of_module = (module.__name__, ) 2783 elif isinstance(name_of_module, str): 2784 name_of_module = (name_of_module, ) 2785 2786 expected = set(extra) 2787 2788 for name in dir(module): 2789 if name.startswith('_') or name in blacklist: 2790 continue 2791 obj = getattr(module, name) 2792 if (getattr(obj, '__module__', None) in name_of_module or 2793 (not hasattr(obj, '__module__') and 2794 not isinstance(obj, types.ModuleType))): 2795 expected.add(name) 2796 test_case.assertCountEqual(module.__all__, expected) 2797 2798 2799class SuppressCrashReport: 2800 """Try to prevent a crash report from popping up. 2801 2802 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 2803 disable the creation of coredump file. 2804 """ 2805 old_value = None 2806 old_modes = None 2807 2808 def __enter__(self): 2809 """On Windows, disable Windows Error Reporting dialogs using 2810 SetErrorMode. 2811 2812 On UNIX, try to save the previous core file size limit, then set 2813 soft limit to 0. 2814 """ 2815 if sys.platform.startswith('win'): 2816 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 2817 # GetErrorMode is not available on Windows XP and Windows Server 2003, 2818 # but SetErrorMode returns the previous value, so we can use that 2819 import ctypes 2820 self._k32 = ctypes.windll.kernel32 2821 SEM_NOGPFAULTERRORBOX = 0x02 2822 self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) 2823 self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) 2824 2825 # Suppress assert dialogs in debug builds 2826 # (see http://bugs.python.org/issue23314) 2827 try: 2828 import msvcrt 2829 msvcrt.CrtSetReportMode 2830 except (AttributeError, ImportError): 2831 # no msvcrt or a release build 2832 pass 2833 else: 2834 self.old_modes = {} 2835 for report_type in [msvcrt.CRT_WARN, 2836 msvcrt.CRT_ERROR, 2837 msvcrt.CRT_ASSERT]: 2838 old_mode = msvcrt.CrtSetReportMode(report_type, 2839 msvcrt.CRTDBG_MODE_FILE) 2840 old_file = msvcrt.CrtSetReportFile(report_type, 2841 msvcrt.CRTDBG_FILE_STDERR) 2842 self.old_modes[report_type] = old_mode, old_file 2843 2844 else: 2845 if resource is not None: 2846 try: 2847 self.old_value = resource.getrlimit(resource.RLIMIT_CORE) 2848 resource.setrlimit(resource.RLIMIT_CORE, 2849 (0, self.old_value[1])) 2850 except (ValueError, OSError): 2851 pass 2852 2853 if sys.platform == 'darwin': 2854 # Check if the 'Crash Reporter' on OSX was configured 2855 # in 'Developer' mode and warn that it will get triggered 2856 # when it is. 2857 # 2858 # This assumes that this context manager is used in tests 2859 # that might trigger the next manager. 2860 cmd = ['/usr/bin/defaults', 'read', 2861 'com.apple.CrashReporter', 'DialogType'] 2862 proc = subprocess.Popen(cmd, 2863 stdout=subprocess.PIPE, 2864 stderr=subprocess.PIPE) 2865 with proc: 2866 stdout = proc.communicate()[0] 2867 if stdout.strip() == b'developer': 2868 print("this test triggers the Crash Reporter, " 2869 "that is intentional", end='', flush=True) 2870 2871 return self 2872 2873 def __exit__(self, *ignore_exc): 2874 """Restore Windows ErrorMode or core file behavior to initial value.""" 2875 if self.old_value is None: 2876 return 2877 2878 if sys.platform.startswith('win'): 2879 self._k32.SetErrorMode(self.old_value) 2880 2881 if self.old_modes: 2882 import msvcrt 2883 for report_type, (old_mode, old_file) in self.old_modes.items(): 2884 msvcrt.CrtSetReportMode(report_type, old_mode) 2885 msvcrt.CrtSetReportFile(report_type, old_file) 2886 else: 2887 if resource is not None: 2888 try: 2889 resource.setrlimit(resource.RLIMIT_CORE, self.old_value) 2890 except (ValueError, OSError): 2891 pass 2892 2893 2894def patch(test_instance, object_to_patch, attr_name, new_value): 2895 """Override 'object_to_patch'.'attr_name' with 'new_value'. 2896 2897 Also, add a cleanup procedure to 'test_instance' to restore 2898 'object_to_patch' value for 'attr_name'. 2899 The 'attr_name' should be a valid attribute for 'object_to_patch'. 2900 2901 """ 2902 # check that 'attr_name' is a real attribute for 'object_to_patch' 2903 # will raise AttributeError if it does not exist 2904 getattr(object_to_patch, attr_name) 2905 2906 # keep a copy of the old value 2907 attr_is_local = False 2908 try: 2909 old_value = object_to_patch.__dict__[attr_name] 2910 except (AttributeError, KeyError): 2911 old_value = getattr(object_to_patch, attr_name, None) 2912 else: 2913 attr_is_local = True 2914 2915 # restore the value when the test is done 2916 def cleanup(): 2917 if attr_is_local: 2918 setattr(object_to_patch, attr_name, old_value) 2919 else: 2920 delattr(object_to_patch, attr_name) 2921 2922 test_instance.addCleanup(cleanup) 2923 2924 # actually override the attribute 2925 setattr(object_to_patch, attr_name, new_value) 2926 2927 2928def run_in_subinterp(code): 2929 """ 2930 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 2931 module is enabled. 2932 """ 2933 # Issue #10915, #15751: PyGILState_*() functions don't work with 2934 # sub-interpreters, the tracemalloc module uses these functions internally 2935 try: 2936 import tracemalloc 2937 except ImportError: 2938 pass 2939 else: 2940 if tracemalloc.is_tracing(): 2941 raise unittest.SkipTest("run_in_subinterp() cannot be used " 2942 "if tracemalloc module is tracing " 2943 "memory allocations") 2944 import _testcapi 2945 return _testcapi.run_in_subinterp(code) 2946 2947 2948def check_free_after_iterating(test, iter, cls, args=()): 2949 class A(cls): 2950 def __del__(self): 2951 nonlocal done 2952 done = True 2953 try: 2954 next(it) 2955 except StopIteration: 2956 pass 2957 2958 done = False 2959 it = iter(A(*args)) 2960 # Issue 26494: Shouldn't crash 2961 test.assertRaises(StopIteration, next, it) 2962 # The sequence should be deallocated just after the end of iterating 2963 gc_collect() 2964 test.assertTrue(done) 2965 2966 2967def missing_compiler_executable(cmd_names=[]): 2968 """Check if the compiler components used to build the interpreter exist. 2969 2970 Check for the existence of the compiler executables whose names are listed 2971 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 2972 and return the first missing executable or None when none is found 2973 missing. 2974 2975 """ 2976 from distutils import ccompiler, sysconfig, spawn 2977 compiler = ccompiler.new_compiler() 2978 sysconfig.customize_compiler(compiler) 2979 for name in compiler.executables: 2980 if cmd_names and name not in cmd_names: 2981 continue 2982 cmd = getattr(compiler, name) 2983 if cmd_names: 2984 assert cmd is not None, \ 2985 "the '%s' executable is not configured" % name 2986 elif not cmd: 2987 continue 2988 if spawn.find_executable(cmd[0]) is None: 2989 return cmd[0] 2990 2991 2992_is_android_emulator = None 2993def setswitchinterval(interval): 2994 # Setting a very low gil interval on the Android emulator causes python 2995 # to hang (issue #26939). 2996 minimum_interval = 1e-5 2997 if is_android and interval < minimum_interval: 2998 global _is_android_emulator 2999 if _is_android_emulator is None: 3000 _is_android_emulator = (subprocess.check_output( 3001 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 3002 if _is_android_emulator: 3003 interval = minimum_interval 3004 return sys.setswitchinterval(interval) 3005 3006 3007@contextlib.contextmanager 3008def disable_faulthandler(): 3009 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 3010 # sys.stderr with a StringIO which has no file descriptor when a test 3011 # is run with -W/--verbose3. 3012 fd = sys.__stderr__.fileno() 3013 3014 is_enabled = faulthandler.is_enabled() 3015 try: 3016 faulthandler.disable() 3017 yield 3018 finally: 3019 if is_enabled: 3020 faulthandler.enable(file=fd, all_threads=True) 3021 3022 3023def fd_count(): 3024 """Count the number of open file descriptors. 3025 """ 3026 if sys.platform.startswith(('linux', 'freebsd')): 3027 try: 3028 names = os.listdir("/proc/self/fd") 3029 # Subtract one because listdir() internally opens a file 3030 # descriptor to list the content of the /proc/self/fd/ directory. 3031 return len(names) - 1 3032 except FileNotFoundError: 3033 pass 3034 3035 MAXFD = 256 3036 if hasattr(os, 'sysconf'): 3037 try: 3038 MAXFD = os.sysconf("SC_OPEN_MAX") 3039 except OSError: 3040 pass 3041 3042 old_modes = None 3043 if sys.platform == 'win32': 3044 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 3045 # on invalid file descriptor if Python is compiled in debug mode 3046 try: 3047 import msvcrt 3048 msvcrt.CrtSetReportMode 3049 except (AttributeError, ImportError): 3050 # no msvcrt or a release build 3051 pass 3052 else: 3053 old_modes = {} 3054 for report_type in (msvcrt.CRT_WARN, 3055 msvcrt.CRT_ERROR, 3056 msvcrt.CRT_ASSERT): 3057 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) 3058 3059 try: 3060 count = 0 3061 for fd in range(MAXFD): 3062 try: 3063 # Prefer dup() over fstat(). fstat() can require input/output 3064 # whereas dup() doesn't. 3065 fd2 = os.dup(fd) 3066 except OSError as e: 3067 if e.errno != errno.EBADF: 3068 raise 3069 else: 3070 os.close(fd2) 3071 count += 1 3072 finally: 3073 if old_modes is not None: 3074 for report_type in (msvcrt.CRT_WARN, 3075 msvcrt.CRT_ERROR, 3076 msvcrt.CRT_ASSERT): 3077 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 3078 3079 return count 3080 3081 3082class SaveSignals: 3083 """ 3084 Save and restore signal handlers. 3085 3086 This class is only able to save/restore signal handlers registered 3087 by the Python signal module: see bpo-13285 for "external" signal 3088 handlers. 3089 """ 3090 3091 def __init__(self): 3092 import signal 3093 self.signal = signal 3094 self.signals = signal.valid_signals() 3095 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 3096 for signame in ('SIGKILL', 'SIGSTOP'): 3097 try: 3098 signum = getattr(signal, signame) 3099 except AttributeError: 3100 continue 3101 self.signals.remove(signum) 3102 self.handlers = {} 3103 3104 def save(self): 3105 for signum in self.signals: 3106 handler = self.signal.getsignal(signum) 3107 if handler is None: 3108 # getsignal() returns None if a signal handler was not 3109 # registered by the Python signal module, 3110 # and the handler is not SIG_DFL nor SIG_IGN. 3111 # 3112 # Ignore the signal: we cannot restore the handler. 3113 continue 3114 self.handlers[signum] = handler 3115 3116 def restore(self): 3117 for signum, handler in self.handlers.items(): 3118 self.signal.signal(signum, handler) 3119 3120 3121def with_pymalloc(): 3122 import _testcapi 3123 return _testcapi.WITH_PYMALLOC 3124 3125 3126class FakePath: 3127 """Simple implementing of the path protocol. 3128 """ 3129 def __init__(self, path): 3130 self.path = path 3131 3132 def __repr__(self): 3133 return f'<FakePath {self.path!r}>' 3134 3135 def __fspath__(self): 3136 if (isinstance(self.path, BaseException) or 3137 isinstance(self.path, type) and 3138 issubclass(self.path, BaseException)): 3139 raise self.path 3140 else: 3141 return self.path 3142 3143 3144class _ALWAYS_EQ: 3145 """ 3146 Object that is equal to anything. 3147 """ 3148 def __eq__(self, other): 3149 return True 3150 def __ne__(self, other): 3151 return False 3152 3153ALWAYS_EQ = _ALWAYS_EQ() 3154 3155@functools.total_ordering 3156class _LARGEST: 3157 """ 3158 Object that is greater than anything (except itself). 3159 """ 3160 def __eq__(self, other): 3161 return isinstance(other, _LARGEST) 3162 def __lt__(self, other): 3163 return False 3164 3165LARGEST = _LARGEST() 3166 3167@functools.total_ordering 3168class _SMALLEST: 3169 """ 3170 Object that is less than anything (except itself). 3171 """ 3172 def __eq__(self, other): 3173 return isinstance(other, _SMALLEST) 3174 def __gt__(self, other): 3175 return False 3176 3177SMALLEST = _SMALLEST() 3178 3179def maybe_get_event_loop_policy(): 3180 """Return the global event loop policy if one is set, else return None.""" 3181 return asyncio.events._event_loop_policy 3182 3183# Helpers for testing hashing. 3184NHASHBITS = sys.hash_info.width # number of bits in hash() result 3185assert NHASHBITS in (32, 64) 3186 3187# Return mean and sdev of number of collisions when tossing nballs balls 3188# uniformly at random into nbins bins. By definition, the number of 3189# collisions is the number of balls minus the number of occupied bins at 3190# the end. 3191def collision_stats(nbins, nballs): 3192 n, k = nbins, nballs 3193 # prob a bin empty after k trials = (1 - 1/n)**k 3194 # mean # empty is then n * (1 - 1/n)**k 3195 # so mean # occupied is n - n * (1 - 1/n)**k 3196 # so collisions = k - (n - n*(1 - 1/n)**k) 3197 # 3198 # For the variance: 3199 # n*(n-1)*(1-2/n)**k + meanempty - meanempty**2 = 3200 # n*(n-1)*(1-2/n)**k + meanempty * (1 - meanempty) 3201 # 3202 # Massive cancellation occurs, and, e.g., for a 64-bit hash code 3203 # 1-1/2**64 rounds uselessly to 1.0. Rather than make heroic (and 3204 # error-prone) efforts to rework the naive formulas to avoid those, 3205 # we use the `decimal` module to get plenty of extra precision. 3206 # 3207 # Note: the exact values are straightforward to compute with 3208 # rationals, but in context that's unbearably slow, requiring 3209 # multi-million bit arithmetic. 3210 import decimal 3211 with decimal.localcontext() as ctx: 3212 bits = n.bit_length() * 2 # bits in n**2 3213 # At least that many bits will likely cancel out. 3214 # Use that many decimal digits instead. 3215 ctx.prec = max(bits, 30) 3216 dn = decimal.Decimal(n) 3217 p1empty = ((dn - 1) / dn) ** k 3218 meanempty = n * p1empty 3219 occupied = n - meanempty 3220 collisions = k - occupied 3221 var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty) 3222 return float(collisions), float(var.sqrt()) 3223 3224 3225class catch_unraisable_exception: 3226 """ 3227 Context manager catching unraisable exception using sys.unraisablehook. 3228 3229 Storing the exception value (cm.unraisable.exc_value) creates a reference 3230 cycle. The reference cycle is broken explicitly when the context manager 3231 exits. 3232 3233 Storing the object (cm.unraisable.object) can resurrect it if it is set to 3234 an object which is being finalized. Exiting the context manager clears the 3235 stored object. 3236 3237 Usage: 3238 3239 with support.catch_unraisable_exception() as cm: 3240 # code creating an "unraisable exception" 3241 ... 3242 3243 # check the unraisable exception: use cm.unraisable 3244 ... 3245 3246 # cm.unraisable attribute no longer exists at this point 3247 # (to break a reference cycle) 3248 """ 3249 3250 def __init__(self): 3251 self.unraisable = None 3252 self._old_hook = None 3253 3254 def _hook(self, unraisable): 3255 # Storing unraisable.object can resurrect an object which is being 3256 # finalized. Storing unraisable.exc_value creates a reference cycle. 3257 self.unraisable = unraisable 3258 3259 def __enter__(self): 3260 self._old_hook = sys.unraisablehook 3261 sys.unraisablehook = self._hook 3262 return self 3263 3264 def __exit__(self, *exc_info): 3265 sys.unraisablehook = self._old_hook 3266 del self.unraisable 3267 3268 3269class catch_threading_exception: 3270 """ 3271 Context manager catching threading.Thread exception using 3272 threading.excepthook. 3273 3274 Attributes set when an exception is catched: 3275 3276 * exc_type 3277 * exc_value 3278 * exc_traceback 3279 * thread 3280 3281 See threading.excepthook() documentation for these attributes. 3282 3283 These attributes are deleted at the context manager exit. 3284 3285 Usage: 3286 3287 with support.catch_threading_exception() as cm: 3288 # code spawning a thread which raises an exception 3289 ... 3290 3291 # check the thread exception, use cm attributes: 3292 # exc_type, exc_value, exc_traceback, thread 3293 ... 3294 3295 # exc_type, exc_value, exc_traceback, thread attributes of cm no longer 3296 # exists at this point 3297 # (to avoid reference cycles) 3298 """ 3299 3300 def __init__(self): 3301 self.exc_type = None 3302 self.exc_value = None 3303 self.exc_traceback = None 3304 self.thread = None 3305 self._old_hook = None 3306 3307 def _hook(self, args): 3308 self.exc_type = args.exc_type 3309 self.exc_value = args.exc_value 3310 self.exc_traceback = args.exc_traceback 3311 self.thread = args.thread 3312 3313 def __enter__(self): 3314 self._old_hook = threading.excepthook 3315 threading.excepthook = self._hook 3316 return self 3317 3318 def __exit__(self, *exc_info): 3319 threading.excepthook = self._old_hook 3320 del self.exc_type 3321 del self.exc_value 3322 del self.exc_traceback 3323 del self.thread 3324