1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import collections.abc 7import contextlib 8import errno 9import faulthandler 10import fnmatch 11import functools 12import gc 13import importlib 14import importlib.util 15import logging.handlers 16import nntplib 17import os 18import platform 19import re 20import shutil 21import socket 22import stat 23import struct 24import subprocess 25import sys 26import sysconfig 27import tempfile 28import time 29import types 30import unittest 31import urllib.error 32import warnings 33 34try: 35 import _thread, threading 36except ImportError: 37 _thread = None 38 threading = None 39try: 40 import multiprocessing.process 41except ImportError: 42 multiprocessing = None 43 44try: 45 import zlib 46except ImportError: 47 zlib = None 48 49try: 50 import gzip 51except ImportError: 52 gzip = None 53 54try: 55 import bz2 56except ImportError: 57 bz2 = None 58 59try: 60 import lzma 61except ImportError: 62 lzma = None 63 64try: 65 import resource 66except ImportError: 67 resource = None 68 69__all__ = [ 70 # globals 71 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 72 # exceptions 73 "Error", "TestFailed", "ResourceDenied", 74 # imports 75 "import_module", "import_fresh_module", "CleanImport", 76 # modules 77 "unload", "forget", 78 # io 79 "record_original_stdout", "get_original_stdout", "captured_stdout", 80 "captured_stdin", "captured_stderr", 81 # filesystem 82 "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", 83 "create_empty_file", "can_symlink", "fs_is_case_insensitive", 84 # unittest 85 "is_resource_enabled", "requires", "requires_freebsd_version", 86 "requires_linux_version", "requires_mac_ver", "check_syntax_error", 87 "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", 88 "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest", 89 "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", 90 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 91 "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", 92 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 93 "check__all__", "requires_android_level", "requires_multiprocessing_queue", 94 # sys 95 "is_jython", "is_android", "check_impl_detail", "unix_shell", 96 "setswitchinterval", "android_not_root", 97 # network 98 "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", 99 "bind_unix_socket", 100 # processes 101 'temp_umask', "reap_children", 102 # logging 103 "TestHandler", 104 # threads 105 "threading_setup", "threading_cleanup", "reap_threads", "start_threads", 106 # miscellaneous 107 "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard", 108 "run_with_locale", "swap_item", 109 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 110 "run_with_tz", "PGO", "missing_compiler_executable", 111 ] 112 113class Error(Exception): 114 """Base class for regression test exceptions.""" 115 116class TestFailed(Error): 117 """Test failed.""" 118 119class ResourceDenied(unittest.SkipTest): 120 """Test skipped because it requested a disallowed resource. 121 122 This is raised when a test calls requires() for a resource that 123 has not be enabled. It is used to distinguish between expected 124 and unexpected skips. 125 """ 126 127@contextlib.contextmanager 128def _ignore_deprecated_imports(ignore=True): 129 """Context manager to suppress package and module deprecation 130 warnings when importing them. 131 132 If ignore is False, this context manager has no effect. 133 """ 134 if ignore: 135 with warnings.catch_warnings(): 136 warnings.filterwarnings("ignore", ".+ (module|package)", 137 DeprecationWarning) 138 yield 139 else: 140 yield 141 142 143def import_module(name, deprecated=False, *, required_on=()): 144 """Import and return the module to be tested, raising SkipTest if 145 it is not available. 146 147 If deprecated is True, any module or package deprecation messages 148 will be suppressed. If a module is required on a platform but optional for 149 others, set required_on to an iterable of platform prefixes which will be 150 compared against sys.platform. 151 """ 152 with _ignore_deprecated_imports(deprecated): 153 try: 154 return importlib.import_module(name) 155 except ImportError as msg: 156 if sys.platform.startswith(tuple(required_on)): 157 raise 158 raise unittest.SkipTest(str(msg)) 159 160 161def _save_and_remove_module(name, orig_modules): 162 """Helper function to save and remove a module from sys.modules 163 164 Raise ImportError if the module can't be imported. 165 """ 166 # try to import the module and raise an error if it can't be imported 167 if name not in sys.modules: 168 __import__(name) 169 del sys.modules[name] 170 for modname in list(sys.modules): 171 if modname == name or modname.startswith(name + '.'): 172 orig_modules[modname] = sys.modules[modname] 173 del sys.modules[modname] 174 175def _save_and_block_module(name, orig_modules): 176 """Helper function to save and block a module in sys.modules 177 178 Return True if the module was in sys.modules, False otherwise. 179 """ 180 saved = True 181 try: 182 orig_modules[name] = sys.modules[name] 183 except KeyError: 184 saved = False 185 sys.modules[name] = None 186 return saved 187 188 189def anticipate_failure(condition): 190 """Decorator to mark a test that is known to be broken in some cases 191 192 Any use of this decorator should have a comment identifying the 193 associated tracker issue. 194 """ 195 if condition: 196 return unittest.expectedFailure 197 return lambda f: f 198 199def load_package_tests(pkg_dir, loader, standard_tests, pattern): 200 """Generic load_tests implementation for simple test packages. 201 202 Most packages can implement load_tests using this function as follows: 203 204 def load_tests(*args): 205 return load_package_tests(os.path.dirname(__file__), *args) 206 """ 207 if pattern is None: 208 pattern = "test*" 209 top_dir = os.path.dirname( # Lib 210 os.path.dirname( # test 211 os.path.dirname(__file__))) # support 212 package_tests = loader.discover(start_dir=pkg_dir, 213 top_level_dir=top_dir, 214 pattern=pattern) 215 standard_tests.addTests(package_tests) 216 return standard_tests 217 218 219def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 220 """Import and return a module, deliberately bypassing sys.modules. 221 222 This function imports and returns a fresh copy of the named Python module 223 by removing the named module from sys.modules before doing the import. 224 Note that unlike reload, the original module is not affected by 225 this operation. 226 227 *fresh* is an iterable of additional module names that are also removed 228 from the sys.modules cache before doing the import. 229 230 *blocked* is an iterable of module names that are replaced with None 231 in the module cache during the import to ensure that attempts to import 232 them raise ImportError. 233 234 The named module and any modules named in the *fresh* and *blocked* 235 parameters are saved before starting the import and then reinserted into 236 sys.modules when the fresh import is complete. 237 238 Module and package deprecation messages are suppressed during this import 239 if *deprecated* is True. 240 241 This function will raise ImportError if the named module cannot be 242 imported. 243 """ 244 # NOTE: test_heapq, test_json and test_warnings include extra sanity checks 245 # to make sure that this utility function is working as expected 246 with _ignore_deprecated_imports(deprecated): 247 # Keep track of modules saved for later restoration as well 248 # as those which just need a blocking entry removed 249 orig_modules = {} 250 names_to_remove = [] 251 _save_and_remove_module(name, orig_modules) 252 try: 253 for fresh_name in fresh: 254 _save_and_remove_module(fresh_name, orig_modules) 255 for blocked_name in blocked: 256 if not _save_and_block_module(blocked_name, orig_modules): 257 names_to_remove.append(blocked_name) 258 fresh_module = importlib.import_module(name) 259 except ImportError: 260 fresh_module = None 261 finally: 262 for orig_name, module in orig_modules.items(): 263 sys.modules[orig_name] = module 264 for name_to_remove in names_to_remove: 265 del sys.modules[name_to_remove] 266 return fresh_module 267 268 269def get_attribute(obj, name): 270 """Get an attribute, raising SkipTest if AttributeError is raised.""" 271 try: 272 attribute = getattr(obj, name) 273 except AttributeError: 274 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 275 else: 276 return attribute 277 278verbose = 1 # Flag set to 0 by regrtest.py 279use_resources = None # Flag set to [] by regrtest.py 280max_memuse = 0 # Disable bigmem tests (they will still be run with 281 # small sizes, to make sure they work.) 282real_max_memuse = 0 283failfast = False 284match_tests = None 285 286# _original_stdout is meant to hold stdout at the time regrtest began. 287# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 288# The point is to have some flavor of stdout the user can actually see. 289_original_stdout = None 290def record_original_stdout(stdout): 291 global _original_stdout 292 _original_stdout = stdout 293 294def get_original_stdout(): 295 return _original_stdout or sys.stdout 296 297def unload(name): 298 try: 299 del sys.modules[name] 300 except KeyError: 301 pass 302 303def _force_run(path, func, *args): 304 try: 305 return func(*args) 306 except OSError as err: 307 if verbose >= 2: 308 print('%s: %s' % (err.__class__.__name__, err)) 309 print('re-run %s%r' % (func.__name__, args)) 310 os.chmod(path, stat.S_IRWXU) 311 return func(*args) 312 313if sys.platform.startswith("win"): 314 def _waitfor(func, pathname, waitall=False): 315 # Perform the operation 316 func(pathname) 317 # Now setup the wait loop 318 if waitall: 319 dirname = pathname 320 else: 321 dirname, name = os.path.split(pathname) 322 dirname = dirname or '.' 323 # Check for `pathname` to be removed from the filesystem. 324 # The exponential backoff of the timeout amounts to a total 325 # of ~1 second after which the deletion is probably an error 326 # anyway. 327 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 328 # required when contention occurs. 329 timeout = 0.001 330 while timeout < 1.0: 331 # Note we are only testing for the existence of the file(s) in 332 # the contents of the directory regardless of any security or 333 # access rights. If we have made it this far, we have sufficient 334 # permissions to do that much using Python's equivalent of the 335 # Windows API FindFirstFile. 336 # Other Windows APIs can fail or give incorrect results when 337 # dealing with files that are pending deletion. 338 L = os.listdir(dirname) 339 if not (L if waitall else name in L): 340 return 341 # Increase the timeout and try again 342 time.sleep(timeout) 343 timeout *= 2 344 warnings.warn('tests may fail, delete still pending for ' + pathname, 345 RuntimeWarning, stacklevel=4) 346 347 def _unlink(filename): 348 _waitfor(os.unlink, filename) 349 350 def _rmdir(dirname): 351 _waitfor(os.rmdir, dirname) 352 353 def _rmtree(path): 354 def _rmtree_inner(path): 355 for name in _force_run(path, os.listdir, path): 356 fullname = os.path.join(path, name) 357 try: 358 mode = os.lstat(fullname).st_mode 359 except OSError as exc: 360 print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), 361 file=sys.__stderr__) 362 mode = 0 363 if stat.S_ISDIR(mode): 364 _waitfor(_rmtree_inner, fullname, waitall=True) 365 _force_run(fullname, os.rmdir, fullname) 366 else: 367 _force_run(fullname, os.unlink, fullname) 368 _waitfor(_rmtree_inner, path, waitall=True) 369 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 370else: 371 _unlink = os.unlink 372 _rmdir = os.rmdir 373 374 def _rmtree(path): 375 try: 376 shutil.rmtree(path) 377 return 378 except OSError: 379 pass 380 381 def _rmtree_inner(path): 382 for name in _force_run(path, os.listdir, path): 383 fullname = os.path.join(path, name) 384 try: 385 mode = os.lstat(fullname).st_mode 386 except OSError: 387 mode = 0 388 if stat.S_ISDIR(mode): 389 _rmtree_inner(fullname) 390 _force_run(path, os.rmdir, fullname) 391 else: 392 _force_run(path, os.unlink, fullname) 393 _rmtree_inner(path) 394 os.rmdir(path) 395 396def unlink(filename): 397 try: 398 _unlink(filename) 399 except (FileNotFoundError, NotADirectoryError): 400 pass 401 402def rmdir(dirname): 403 try: 404 _rmdir(dirname) 405 except FileNotFoundError: 406 pass 407 408def rmtree(path): 409 try: 410 _rmtree(path) 411 except FileNotFoundError: 412 pass 413 414def make_legacy_pyc(source): 415 """Move a PEP 3147/488 pyc file to its legacy pyc location. 416 417 :param source: The file system path to the source file. The source file 418 does not need to exist, however the PEP 3147/488 pyc file must exist. 419 :return: The file system path to the legacy pyc file. 420 """ 421 pyc_file = importlib.util.cache_from_source(source) 422 up_one = os.path.dirname(os.path.abspath(source)) 423 legacy_pyc = os.path.join(up_one, source + 'c') 424 os.rename(pyc_file, legacy_pyc) 425 return legacy_pyc 426 427def forget(modname): 428 """'Forget' a module was ever imported. 429 430 This removes the module from sys.modules and deletes any PEP 3147/488 or 431 legacy .pyc files. 432 """ 433 unload(modname) 434 for dirname in sys.path: 435 source = os.path.join(dirname, modname + '.py') 436 # It doesn't matter if they exist or not, unlink all possible 437 # combinations of PEP 3147/488 and legacy pyc files. 438 unlink(source + 'c') 439 for opt in ('', 1, 2): 440 unlink(importlib.util.cache_from_source(source, optimization=opt)) 441 442# Check whether a gui is actually available 443def _is_gui_available(): 444 if hasattr(_is_gui_available, 'result'): 445 return _is_gui_available.result 446 reason = None 447 if sys.platform.startswith('win'): 448 # if Python is running as a service (such as the buildbot service), 449 # gui interaction may be disallowed 450 import ctypes 451 import ctypes.wintypes 452 UOI_FLAGS = 1 453 WSF_VISIBLE = 0x0001 454 class USEROBJECTFLAGS(ctypes.Structure): 455 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 456 ("fReserved", ctypes.wintypes.BOOL), 457 ("dwFlags", ctypes.wintypes.DWORD)] 458 dll = ctypes.windll.user32 459 h = dll.GetProcessWindowStation() 460 if not h: 461 raise ctypes.WinError() 462 uof = USEROBJECTFLAGS() 463 needed = ctypes.wintypes.DWORD() 464 res = dll.GetUserObjectInformationW(h, 465 UOI_FLAGS, 466 ctypes.byref(uof), 467 ctypes.sizeof(uof), 468 ctypes.byref(needed)) 469 if not res: 470 raise ctypes.WinError() 471 if not bool(uof.dwFlags & WSF_VISIBLE): 472 reason = "gui not available (WSF_VISIBLE flag not set)" 473 elif sys.platform == 'darwin': 474 # The Aqua Tk implementations on OS X can abort the process if 475 # being called in an environment where a window server connection 476 # cannot be made, for instance when invoked by a buildbot or ssh 477 # process not running under the same user id as the current console 478 # user. To avoid that, raise an exception if the window manager 479 # connection is not available. 480 from ctypes import cdll, c_int, pointer, Structure 481 from ctypes.util import find_library 482 483 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 484 485 if app_services.CGMainDisplayID() == 0: 486 reason = "gui tests cannot run without OS X window manager" 487 else: 488 class ProcessSerialNumber(Structure): 489 _fields_ = [("highLongOfPSN", c_int), 490 ("lowLongOfPSN", c_int)] 491 psn = ProcessSerialNumber() 492 psn_p = pointer(psn) 493 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 494 (app_services.SetFrontProcess(psn_p) < 0) ): 495 reason = "cannot run without OS X gui process" 496 497 # check on every platform whether tkinter can actually do anything 498 if not reason: 499 try: 500 from tkinter import Tk 501 root = Tk() 502 root.withdraw() 503 root.update() 504 root.destroy() 505 except Exception as e: 506 err_string = str(e) 507 if len(err_string) > 50: 508 err_string = err_string[:50] + ' [...]' 509 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 510 err_string) 511 512 _is_gui_available.reason = reason 513 _is_gui_available.result = not reason 514 515 return _is_gui_available.result 516 517def is_resource_enabled(resource): 518 """Test whether a resource is enabled. 519 520 Known resources are set by regrtest.py. If not running under regrtest.py, 521 all resources are assumed enabled unless use_resources has been set. 522 """ 523 return use_resources is None or resource in use_resources 524 525def requires(resource, msg=None): 526 """Raise ResourceDenied if the specified resource is not available.""" 527 if not is_resource_enabled(resource): 528 if msg is None: 529 msg = "Use of the %r resource not enabled" % resource 530 raise ResourceDenied(msg) 531 if resource == 'gui' and not _is_gui_available(): 532 raise ResourceDenied(_is_gui_available.reason) 533 534def _requires_unix_version(sysname, min_version): 535 """Decorator raising SkipTest if the OS is `sysname` and the version is less 536 than `min_version`. 537 538 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 539 the FreeBSD version is less than 7.2. 540 """ 541 def decorator(func): 542 @functools.wraps(func) 543 def wrapper(*args, **kw): 544 if platform.system() == sysname: 545 version_txt = platform.release().split('-', 1)[0] 546 try: 547 version = tuple(map(int, version_txt.split('.'))) 548 except ValueError: 549 pass 550 else: 551 if version < min_version: 552 min_version_txt = '.'.join(map(str, min_version)) 553 raise unittest.SkipTest( 554 "%s version %s or higher required, not %s" 555 % (sysname, min_version_txt, version_txt)) 556 return func(*args, **kw) 557 wrapper.min_version = min_version 558 return wrapper 559 return decorator 560 561def requires_freebsd_version(*min_version): 562 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 563 less than `min_version`. 564 565 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 566 version is less than 7.2. 567 """ 568 return _requires_unix_version('FreeBSD', min_version) 569 570def requires_linux_version(*min_version): 571 """Decorator raising SkipTest if the OS is Linux and the Linux version is 572 less than `min_version`. 573 574 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 575 version is less than 2.6.32. 576 """ 577 return _requires_unix_version('Linux', min_version) 578 579def requires_mac_ver(*min_version): 580 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 581 version if less than min_version. 582 583 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 584 is lesser than 10.5. 585 """ 586 def decorator(func): 587 @functools.wraps(func) 588 def wrapper(*args, **kw): 589 if sys.platform == 'darwin': 590 version_txt = platform.mac_ver()[0] 591 try: 592 version = tuple(map(int, version_txt.split('.'))) 593 except ValueError: 594 pass 595 else: 596 if version < min_version: 597 min_version_txt = '.'.join(map(str, min_version)) 598 raise unittest.SkipTest( 599 "Mac OS X %s or higher required, not %s" 600 % (min_version_txt, version_txt)) 601 return func(*args, **kw) 602 wrapper.min_version = min_version 603 return wrapper 604 return decorator 605 606 607# Don't use "localhost", since resolving it uses the DNS under recent 608# Windows versions (see issue #18792). 609HOST = "127.0.0.1" 610HOSTv6 = "::1" 611 612 613def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 614 """Returns an unused port that should be suitable for binding. This is 615 achieved by creating a temporary socket with the same family and type as 616 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 617 the specified host address (defaults to 0.0.0.0) with the port set to 0, 618 eliciting an unused ephemeral port from the OS. The temporary socket is 619 then closed and deleted, and the ephemeral port is returned. 620 621 Either this method or bind_port() should be used for any tests where a 622 server socket needs to be bound to a particular port for the duration of 623 the test. Which one to use depends on whether the calling code is creating 624 a python socket, or if an unused port needs to be provided in a constructor 625 or passed to an external program (i.e. the -accept argument to openssl's 626 s_server mode). Always prefer bind_port() over find_unused_port() where 627 possible. Hard coded ports should *NEVER* be used. As soon as a server 628 socket is bound to a hard coded port, the ability to run multiple instances 629 of the test simultaneously on the same host is compromised, which makes the 630 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 631 may simply manifest as a failed test, which can be recovered from without 632 intervention in most cases, but on Windows, the entire python process can 633 completely and utterly wedge, requiring someone to log in to the buildbot 634 and manually kill the affected process. 635 636 (This is easy to reproduce on Windows, unfortunately, and can be traced to 637 the SO_REUSEADDR socket option having different semantics on Windows versus 638 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 639 listen and then accept connections on identical host/ports. An EADDRINUSE 640 OSError will be raised at some point (depending on the platform and 641 the order bind and listen were called on each socket). 642 643 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 644 will ever be raised when attempting to bind two identical host/ports. When 645 accept() is called on each socket, the second caller's process will steal 646 the port from the first caller, leaving them both in an awkwardly wedged 647 state where they'll no longer respond to any signals or graceful kills, and 648 must be forcibly killed via OpenProcess()/TerminateProcess(). 649 650 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 651 instead of SO_REUSEADDR, which effectively affords the same semantics as 652 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 653 Source world compared to Windows ones, this is a common mistake. A quick 654 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 655 openssl.exe is called with the 's_server' option, for example. See 656 http://bugs.python.org/issue2550 for more info. The following site also 657 has a very thorough description about the implications of both REUSEADDR 658 and EXCLUSIVEADDRUSE on Windows: 659 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 660 661 XXX: although this approach is a vast improvement on previous attempts to 662 elicit unused ports, it rests heavily on the assumption that the ephemeral 663 port returned to us by the OS won't immediately be dished back out to some 664 other process when we close and delete our temporary socket but before our 665 calling code has a chance to bind the returned port. We can deal with this 666 issue if/when we come across it. 667 """ 668 669 tempsock = socket.socket(family, socktype) 670 port = bind_port(tempsock) 671 tempsock.close() 672 del tempsock 673 return port 674 675def bind_port(sock, host=HOST): 676 """Bind the socket to a free port and return the port number. Relies on 677 ephemeral ports in order to ensure we are using an unbound port. This is 678 important as many tests may be running simultaneously, especially in a 679 buildbot environment. This method raises an exception if the sock.family 680 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 681 or SO_REUSEPORT set on it. Tests should *never* set these socket options 682 for TCP/IP sockets. The only case for setting these options is testing 683 multicasting via multiple UDP sockets. 684 685 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 686 on Windows), it will be set on the socket. This will prevent anyone else 687 from bind()'ing to our host/port for the duration of the test. 688 """ 689 690 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 691 if hasattr(socket, 'SO_REUSEADDR'): 692 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 693 raise TestFailed("tests should never set the SO_REUSEADDR " \ 694 "socket option on TCP/IP sockets!") 695 if hasattr(socket, 'SO_REUSEPORT'): 696 try: 697 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 698 raise TestFailed("tests should never set the SO_REUSEPORT " \ 699 "socket option on TCP/IP sockets!") 700 except OSError: 701 # Python's socket module was compiled using modern headers 702 # thus defining SO_REUSEPORT but this process is running 703 # under an older kernel that does not support SO_REUSEPORT. 704 pass 705 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 706 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 707 708 sock.bind((host, 0)) 709 port = sock.getsockname()[1] 710 return port 711 712def bind_unix_socket(sock, addr): 713 """Bind a unix socket, raising SkipTest if PermissionError is raised.""" 714 assert sock.family == socket.AF_UNIX 715 try: 716 sock.bind(addr) 717 except PermissionError: 718 sock.close() 719 raise unittest.SkipTest('cannot bind AF_UNIX sockets') 720 721def _is_ipv6_enabled(): 722 """Check whether IPv6 is enabled on this host.""" 723 if socket.has_ipv6: 724 sock = None 725 try: 726 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 727 sock.bind((HOSTv6, 0)) 728 return True 729 except OSError: 730 pass 731 finally: 732 if sock: 733 sock.close() 734 return False 735 736IPV6_ENABLED = _is_ipv6_enabled() 737 738def system_must_validate_cert(f): 739 """Skip the test on TLS certificate validation failures.""" 740 @functools.wraps(f) 741 def dec(*args, **kwargs): 742 try: 743 f(*args, **kwargs) 744 except IOError as e: 745 if "CERTIFICATE_VERIFY_FAILED" in str(e): 746 raise unittest.SkipTest("system does not contain " 747 "necessary certificates") 748 raise 749 return dec 750 751# A constant likely larger than the underlying OS pipe buffer size, to 752# make writes blocking. 753# Windows limit seems to be around 512 B, and many Unix kernels have a 754# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 755# (see issue #17835 for a discussion of this number). 756PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 757 758# A constant likely larger than the underlying OS socket buffer size, to make 759# writes blocking. 760# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 761# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 762# for a discussion of this number). 763SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 764 765# decorator for skipping tests on non-IEEE 754 platforms 766requires_IEEE_754 = unittest.skipUnless( 767 float.__getformat__("double").startswith("IEEE"), 768 "test requires IEEE 754 doubles") 769 770requires_zlib = unittest.skipUnless(zlib, 'requires zlib') 771 772requires_gzip = unittest.skipUnless(gzip, 'requires gzip') 773 774requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') 775 776requires_lzma = unittest.skipUnless(lzma, 'requires lzma') 777 778is_jython = sys.platform.startswith('java') 779 780_ANDROID_API_LEVEL = sysconfig.get_config_var('ANDROID_API_LEVEL') 781is_android = (_ANDROID_API_LEVEL is not None and _ANDROID_API_LEVEL > 0) 782android_not_root = (is_android and os.geteuid() != 0) 783 784if sys.platform != 'win32': 785 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 786else: 787 unix_shell = None 788 789# Filename used for testing 790if os.name == 'java': 791 # Jython disallows @ in module names 792 TESTFN = '$test' 793else: 794 TESTFN = '@test' 795 796# Disambiguate TESTFN for parallel testing, while letting it remain a valid 797# module name. 798TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 799 800# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 801# or None if there is no such character. 802FS_NONASCII = None 803for character in ( 804 # First try printable and common characters to have a readable filename. 805 # For each character, the encoding list are just example of encodings able 806 # to encode the character (the list is not exhaustive). 807 808 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 809 '\u00E6', 810 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 811 '\u0130', 812 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 813 '\u0141', 814 # U+03C6 (Greek Small Letter Phi): cp1253 815 '\u03C6', 816 # U+041A (Cyrillic Capital Letter Ka): cp1251 817 '\u041A', 818 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 819 '\u05D0', 820 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 821 '\u060C', 822 # U+062A (Arabic Letter Teh): cp720 823 '\u062A', 824 # U+0E01 (Thai Character Ko Kai): cp874 825 '\u0E01', 826 827 # Then try more "special" characters. "special" because they may be 828 # interpreted or displayed differently depending on the exact locale 829 # encoding and the font. 830 831 # U+00A0 (No-Break Space) 832 '\u00A0', 833 # U+20AC (Euro Sign) 834 '\u20AC', 835): 836 try: 837 os.fsdecode(os.fsencode(character)) 838 except UnicodeError: 839 pass 840 else: 841 FS_NONASCII = character 842 break 843 844# TESTFN_UNICODE is a non-ascii filename 845TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" 846if sys.platform == 'darwin': 847 # In Mac OS X's VFS API file names are, by definition, canonically 848 # decomposed Unicode, encoded using UTF-8. See QA1173: 849 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 850 import unicodedata 851 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 852TESTFN_ENCODING = sys.getfilesystemencoding() 853 854# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 855# encoded by the filesystem encoding (in strict mode). It can be None if we 856# cannot generate such filename. 857TESTFN_UNENCODABLE = None 858if os.name == 'nt': 859 # skip win32s (0) or Windows 9x/ME (1) 860 if sys.getwindowsversion().platform >= 2: 861 # Different kinds of characters from various languages to minimize the 862 # probability that the whole name is encodable to MBCS (issue #9819) 863 TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" 864 try: 865 TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) 866 except UnicodeEncodeError: 867 pass 868 else: 869 print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 870 'Unicode filename tests may not be effective' 871 % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) 872 TESTFN_UNENCODABLE = None 873# Mac OS X denies unencodable filenames (invalid utf-8) 874elif sys.platform != 'darwin': 875 try: 876 # ascii and utf-8 cannot encode the byte 0xff 877 b'\xff'.decode(TESTFN_ENCODING) 878 except UnicodeDecodeError: 879 # 0xff will be encoded using the surrogate character u+DCFF 880 TESTFN_UNENCODABLE = TESTFN \ 881 + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') 882 else: 883 # File system encoding (eg. ISO-8859-* encodings) can encode 884 # the byte 0xff. Skip some unicode filename tests. 885 pass 886 887# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 888# decoded from the filesystem encoding (in strict mode). It can be None if we 889# cannot generate such filename (ex: the latin1 encoding can decode any byte 890# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 891# to the surrogateescape error handler (PEP 383), but not from the filesystem 892# encoding in strict mode. 893TESTFN_UNDECODABLE = None 894for name in ( 895 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 896 # accepts it to create a file or a directory, or don't accept to enter to 897 # such directory (when the bytes name is used). So test b'\xe7' first: it is 898 # not decodable from cp932. 899 b'\xe7w\xf0', 900 # undecodable from ASCII, UTF-8 901 b'\xff', 902 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 903 # and cp857 904 b'\xae\xd5' 905 # undecodable from UTF-8 (UNIX and Mac OS X) 906 b'\xed\xb2\x80', b'\xed\xb4\x80', 907 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 908 # cp1253, cp1254, cp1255, cp1257, cp1258 909 b'\x81\x98', 910): 911 try: 912 name.decode(TESTFN_ENCODING) 913 except UnicodeDecodeError: 914 TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name 915 break 916 917if FS_NONASCII: 918 TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII 919else: 920 TESTFN_NONASCII = None 921 922# Save the initial cwd 923SAVEDCWD = os.getcwd() 924 925# Set by libregrtest/main.py so we can skip tests that are not 926# useful for PGO 927PGO = False 928 929@contextlib.contextmanager 930def temp_dir(path=None, quiet=False): 931 """Return a context manager that creates a temporary directory. 932 933 Arguments: 934 935 path: the directory to create temporarily. If omitted or None, 936 defaults to creating a temporary directory using tempfile.mkdtemp. 937 938 quiet: if False (the default), the context manager raises an exception 939 on error. Otherwise, if the path is specified and cannot be 940 created, only a warning is issued. 941 942 """ 943 dir_created = False 944 if path is None: 945 path = tempfile.mkdtemp() 946 dir_created = True 947 path = os.path.realpath(path) 948 else: 949 try: 950 os.mkdir(path) 951 dir_created = True 952 except OSError: 953 if not quiet: 954 raise 955 warnings.warn('tests may fail, unable to create temp dir: ' + path, 956 RuntimeWarning, stacklevel=3) 957 try: 958 yield path 959 finally: 960 if dir_created: 961 rmtree(path) 962 963@contextlib.contextmanager 964def change_cwd(path, quiet=False): 965 """Return a context manager that changes the current working directory. 966 967 Arguments: 968 969 path: the directory to use as the temporary current working directory. 970 971 quiet: if False (the default), the context manager raises an exception 972 on error. Otherwise, it issues only a warning and keeps the current 973 working directory the same. 974 975 """ 976 saved_dir = os.getcwd() 977 try: 978 os.chdir(path) 979 except OSError: 980 if not quiet: 981 raise 982 warnings.warn('tests may fail, unable to change CWD to: ' + path, 983 RuntimeWarning, stacklevel=3) 984 try: 985 yield os.getcwd() 986 finally: 987 os.chdir(saved_dir) 988 989 990@contextlib.contextmanager 991def temp_cwd(name='tempcwd', quiet=False): 992 """ 993 Context manager that temporarily creates and changes the CWD. 994 995 The function temporarily changes the current working directory 996 after creating a temporary directory in the current directory with 997 name *name*. If *name* is None, the temporary directory is 998 created using tempfile.mkdtemp. 999 1000 If *quiet* is False (default) and it is not possible to 1001 create or change the CWD, an error is raised. If *quiet* is True, 1002 only a warning is raised and the original CWD is used. 1003 1004 """ 1005 with temp_dir(path=name, quiet=quiet) as temp_path: 1006 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 1007 yield cwd_dir 1008 1009if hasattr(os, "umask"): 1010 @contextlib.contextmanager 1011 def temp_umask(umask): 1012 """Context manager that temporarily sets the process umask.""" 1013 oldmask = os.umask(umask) 1014 try: 1015 yield 1016 finally: 1017 os.umask(oldmask) 1018 1019# TEST_HOME_DIR refers to the top level directory of the "test" package 1020# that contains Python's regression test suite 1021TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 1022TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 1023 1024# TEST_DATA_DIR is used as a target download location for remote resources 1025TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 1026 1027def findfile(filename, subdir=None): 1028 """Try to find a file on sys.path or in the test directory. If it is not 1029 found the argument passed to the function is returned (this does not 1030 necessarily signal failure; could still be the legitimate path). 1031 1032 Setting *subdir* indicates a relative path to use to find the file 1033 rather than looking directly in the path directories. 1034 """ 1035 if os.path.isabs(filename): 1036 return filename 1037 if subdir is not None: 1038 filename = os.path.join(subdir, filename) 1039 path = [TEST_HOME_DIR] + sys.path 1040 for dn in path: 1041 fn = os.path.join(dn, filename) 1042 if os.path.exists(fn): return fn 1043 return filename 1044 1045def create_empty_file(filename): 1046 """Create an empty file. If the file already exists, truncate it.""" 1047 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 1048 os.close(fd) 1049 1050def sortdict(dict): 1051 "Like repr(dict), but in sorted order." 1052 items = sorted(dict.items()) 1053 reprpairs = ["%r: %r" % pair for pair in items] 1054 withcommas = ", ".join(reprpairs) 1055 return "{%s}" % withcommas 1056 1057def make_bad_fd(): 1058 """ 1059 Create an invalid file descriptor by opening and closing a file and return 1060 its fd. 1061 """ 1062 file = open(TESTFN, "wb") 1063 try: 1064 return file.fileno() 1065 finally: 1066 file.close() 1067 unlink(TESTFN) 1068 1069def check_syntax_error(testcase, statement, *, lineno=None, offset=None): 1070 with testcase.assertRaises(SyntaxError) as cm: 1071 compile(statement, '<test string>', 'exec') 1072 err = cm.exception 1073 testcase.assertIsNotNone(err.lineno) 1074 if lineno is not None: 1075 testcase.assertEqual(err.lineno, lineno) 1076 testcase.assertIsNotNone(err.offset) 1077 if offset is not None: 1078 testcase.assertEqual(err.offset, offset) 1079 1080def open_urlresource(url, *args, **kw): 1081 import urllib.request, urllib.parse 1082 1083 check = kw.pop('check', None) 1084 1085 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 1086 1087 fn = os.path.join(TEST_DATA_DIR, filename) 1088 1089 def check_valid_file(fn): 1090 f = open(fn, *args, **kw) 1091 if check is None: 1092 return f 1093 elif check(f): 1094 f.seek(0) 1095 return f 1096 f.close() 1097 1098 if os.path.exists(fn): 1099 f = check_valid_file(fn) 1100 if f is not None: 1101 return f 1102 unlink(fn) 1103 1104 # Verify the requirement before downloading the file 1105 requires('urlfetch') 1106 1107 if verbose: 1108 print('\tfetching %s ...' % url, file=get_original_stdout()) 1109 opener = urllib.request.build_opener() 1110 if gzip: 1111 opener.addheaders.append(('Accept-Encoding', 'gzip')) 1112 f = opener.open(url, timeout=15) 1113 if gzip and f.headers.get('Content-Encoding') == 'gzip': 1114 f = gzip.GzipFile(fileobj=f) 1115 try: 1116 with open(fn, "wb") as out: 1117 s = f.read() 1118 while s: 1119 out.write(s) 1120 s = f.read() 1121 finally: 1122 f.close() 1123 1124 f = check_valid_file(fn) 1125 if f is not None: 1126 return f 1127 raise TestFailed('invalid resource %r' % fn) 1128 1129 1130class WarningsRecorder(object): 1131 """Convenience wrapper for the warnings list returned on 1132 entry to the warnings.catch_warnings() context manager. 1133 """ 1134 def __init__(self, warnings_list): 1135 self._warnings = warnings_list 1136 self._last = 0 1137 1138 def __getattr__(self, attr): 1139 if len(self._warnings) > self._last: 1140 return getattr(self._warnings[-1], attr) 1141 elif attr in warnings.WarningMessage._WARNING_DETAILS: 1142 return None 1143 raise AttributeError("%r has no attribute %r" % (self, attr)) 1144 1145 @property 1146 def warnings(self): 1147 return self._warnings[self._last:] 1148 1149 def reset(self): 1150 self._last = len(self._warnings) 1151 1152 1153def _filterwarnings(filters, quiet=False): 1154 """Catch the warnings, then check if all the expected 1155 warnings have been raised and re-raise unexpected warnings. 1156 If 'quiet' is True, only re-raise the unexpected warnings. 1157 """ 1158 # Clear the warning registry of the calling module 1159 # in order to re-raise the warnings. 1160 frame = sys._getframe(2) 1161 registry = frame.f_globals.get('__warningregistry__') 1162 if registry: 1163 registry.clear() 1164 with warnings.catch_warnings(record=True) as w: 1165 # Set filter "always" to record all warnings. Because 1166 # test_warnings swap the module, we need to look up in 1167 # the sys.modules dictionary. 1168 sys.modules['warnings'].simplefilter("always") 1169 yield WarningsRecorder(w) 1170 # Filter the recorded warnings 1171 reraise = list(w) 1172 missing = [] 1173 for msg, cat in filters: 1174 seen = False 1175 for w in reraise[:]: 1176 warning = w.message 1177 # Filter out the matching messages 1178 if (re.match(msg, str(warning), re.I) and 1179 issubclass(warning.__class__, cat)): 1180 seen = True 1181 reraise.remove(w) 1182 if not seen and not quiet: 1183 # This filter caught nothing 1184 missing.append((msg, cat.__name__)) 1185 if reraise: 1186 raise AssertionError("unhandled warning %s" % reraise[0]) 1187 if missing: 1188 raise AssertionError("filter (%r, %s) did not catch any warning" % 1189 missing[0]) 1190 1191 1192@contextlib.contextmanager 1193def check_warnings(*filters, **kwargs): 1194 """Context manager to silence warnings. 1195 1196 Accept 2-tuples as positional arguments: 1197 ("message regexp", WarningCategory) 1198 1199 Optional argument: 1200 - if 'quiet' is True, it does not fail if a filter catches nothing 1201 (default True without argument, 1202 default False if some filters are defined) 1203 1204 Without argument, it defaults to: 1205 check_warnings(("", Warning), quiet=True) 1206 """ 1207 quiet = kwargs.get('quiet') 1208 if not filters: 1209 filters = (("", Warning),) 1210 # Preserve backward compatibility 1211 if quiet is None: 1212 quiet = True 1213 return _filterwarnings(filters, quiet) 1214 1215 1216@contextlib.contextmanager 1217def check_no_resource_warning(testcase): 1218 """Context manager to check that no ResourceWarning is emitted. 1219 1220 Usage: 1221 1222 with check_no_resource_warning(self): 1223 f = open(...) 1224 ... 1225 del f 1226 1227 You must remove the object which may emit ResourceWarning before 1228 the end of the context manager. 1229 """ 1230 with warnings.catch_warnings(record=True) as warns: 1231 warnings.filterwarnings('always', category=ResourceWarning) 1232 yield 1233 gc_collect() 1234 testcase.assertEqual(warns, []) 1235 1236 1237class CleanImport(object): 1238 """Context manager to force import to return a new module reference. 1239 1240 This is useful for testing module-level behaviours, such as 1241 the emission of a DeprecationWarning on import. 1242 1243 Use like this: 1244 1245 with CleanImport("foo"): 1246 importlib.import_module("foo") # new reference 1247 """ 1248 1249 def __init__(self, *module_names): 1250 self.original_modules = sys.modules.copy() 1251 for module_name in module_names: 1252 if module_name in sys.modules: 1253 module = sys.modules[module_name] 1254 # It is possible that module_name is just an alias for 1255 # another module (e.g. stub for modules renamed in 3.x). 1256 # In that case, we also need delete the real module to clear 1257 # the import cache. 1258 if module.__name__ != module_name: 1259 del sys.modules[module.__name__] 1260 del sys.modules[module_name] 1261 1262 def __enter__(self): 1263 return self 1264 1265 def __exit__(self, *ignore_exc): 1266 sys.modules.update(self.original_modules) 1267 1268 1269class EnvironmentVarGuard(collections.abc.MutableMapping): 1270 1271 """Class to help protect the environment variable properly. Can be used as 1272 a context manager.""" 1273 1274 def __init__(self): 1275 self._environ = os.environ 1276 self._changed = {} 1277 1278 def __getitem__(self, envvar): 1279 return self._environ[envvar] 1280 1281 def __setitem__(self, envvar, value): 1282 # Remember the initial value on the first access 1283 if envvar not in self._changed: 1284 self._changed[envvar] = self._environ.get(envvar) 1285 self._environ[envvar] = value 1286 1287 def __delitem__(self, envvar): 1288 # Remember the initial value on the first access 1289 if envvar not in self._changed: 1290 self._changed[envvar] = self._environ.get(envvar) 1291 if envvar in self._environ: 1292 del self._environ[envvar] 1293 1294 def keys(self): 1295 return self._environ.keys() 1296 1297 def __iter__(self): 1298 return iter(self._environ) 1299 1300 def __len__(self): 1301 return len(self._environ) 1302 1303 def set(self, envvar, value): 1304 self[envvar] = value 1305 1306 def unset(self, envvar): 1307 del self[envvar] 1308 1309 def __enter__(self): 1310 return self 1311 1312 def __exit__(self, *ignore_exc): 1313 for (k, v) in self._changed.items(): 1314 if v is None: 1315 if k in self._environ: 1316 del self._environ[k] 1317 else: 1318 self._environ[k] = v 1319 os.environ = self._environ 1320 1321 1322class DirsOnSysPath(object): 1323 """Context manager to temporarily add directories to sys.path. 1324 1325 This makes a copy of sys.path, appends any directories given 1326 as positional arguments, then reverts sys.path to the copied 1327 settings when the context ends. 1328 1329 Note that *all* sys.path modifications in the body of the 1330 context manager, including replacement of the object, 1331 will be reverted at the end of the block. 1332 """ 1333 1334 def __init__(self, *paths): 1335 self.original_value = sys.path[:] 1336 self.original_object = sys.path 1337 sys.path.extend(paths) 1338 1339 def __enter__(self): 1340 return self 1341 1342 def __exit__(self, *ignore_exc): 1343 sys.path = self.original_object 1344 sys.path[:] = self.original_value 1345 1346 1347class TransientResource(object): 1348 1349 """Raise ResourceDenied if an exception is raised while the context manager 1350 is in effect that matches the specified exception and attributes.""" 1351 1352 def __init__(self, exc, **kwargs): 1353 self.exc = exc 1354 self.attrs = kwargs 1355 1356 def __enter__(self): 1357 return self 1358 1359 def __exit__(self, type_=None, value=None, traceback=None): 1360 """If type_ is a subclass of self.exc and value has attributes matching 1361 self.attrs, raise ResourceDenied. Otherwise let the exception 1362 propagate (if any).""" 1363 if type_ is not None and issubclass(self.exc, type_): 1364 for attr, attr_value in self.attrs.items(): 1365 if not hasattr(value, attr): 1366 break 1367 if getattr(value, attr) != attr_value: 1368 break 1369 else: 1370 raise ResourceDenied("an optional resource is not available") 1371 1372# Context managers that raise ResourceDenied when various issues 1373# with the Internet connection manifest themselves as exceptions. 1374# XXX deprecate these and use transient_internet() instead 1375time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 1376socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1377ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1378 1379 1380@contextlib.contextmanager 1381def transient_internet(resource_name, *, timeout=30.0, errnos=()): 1382 """Return a context manager that raises ResourceDenied when various issues 1383 with the Internet connection manifest themselves as exceptions.""" 1384 default_errnos = [ 1385 ('ECONNREFUSED', 111), 1386 ('ECONNRESET', 104), 1387 ('EHOSTUNREACH', 113), 1388 ('ENETUNREACH', 101), 1389 ('ETIMEDOUT', 110), 1390 ] 1391 default_gai_errnos = [ 1392 ('EAI_AGAIN', -3), 1393 ('EAI_FAIL', -4), 1394 ('EAI_NONAME', -2), 1395 ('EAI_NODATA', -5), 1396 # Encountered when trying to resolve IPv6-only hostnames 1397 ('WSANO_DATA', 11004), 1398 ] 1399 1400 denied = ResourceDenied("Resource %r is not available" % resource_name) 1401 captured_errnos = errnos 1402 gai_errnos = [] 1403 if not captured_errnos: 1404 captured_errnos = [getattr(errno, name, num) 1405 for (name, num) in default_errnos] 1406 gai_errnos = [getattr(socket, name, num) 1407 for (name, num) in default_gai_errnos] 1408 1409 def filter_error(err): 1410 n = getattr(err, 'errno', None) 1411 if (isinstance(err, socket.timeout) or 1412 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1413 (isinstance(err, urllib.error.HTTPError) and 1414 500 <= err.code <= 599) or 1415 (isinstance(err, urllib.error.URLError) and 1416 (("ConnectionRefusedError" in err.reason) or 1417 ("TimeoutError" in err.reason) or 1418 ("EOFError" in err.reason))) or 1419 n in captured_errnos): 1420 if not verbose: 1421 sys.stderr.write(denied.args[0] + "\n") 1422 raise denied from err 1423 1424 old_timeout = socket.getdefaulttimeout() 1425 try: 1426 if timeout is not None: 1427 socket.setdefaulttimeout(timeout) 1428 yield 1429 except nntplib.NNTPTemporaryError as err: 1430 if verbose: 1431 sys.stderr.write(denied.args[0] + "\n") 1432 raise denied from err 1433 except OSError as err: 1434 # urllib can wrap original socket errors multiple times (!), we must 1435 # unwrap to get at the original error. 1436 while True: 1437 a = err.args 1438 if len(a) >= 1 and isinstance(a[0], OSError): 1439 err = a[0] 1440 # The error can also be wrapped as args[1]: 1441 # except socket.error as msg: 1442 # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) 1443 elif len(a) >= 2 and isinstance(a[1], OSError): 1444 err = a[1] 1445 else: 1446 break 1447 filter_error(err) 1448 raise 1449 # XXX should we catch generic exceptions and look for their 1450 # __cause__ or __context__? 1451 finally: 1452 socket.setdefaulttimeout(old_timeout) 1453 1454 1455@contextlib.contextmanager 1456def captured_output(stream_name): 1457 """Return a context manager used by captured_stdout/stdin/stderr 1458 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1459 import io 1460 orig_stdout = getattr(sys, stream_name) 1461 setattr(sys, stream_name, io.StringIO()) 1462 try: 1463 yield getattr(sys, stream_name) 1464 finally: 1465 setattr(sys, stream_name, orig_stdout) 1466 1467def captured_stdout(): 1468 """Capture the output of sys.stdout: 1469 1470 with captured_stdout() as stdout: 1471 print("hello") 1472 self.assertEqual(stdout.getvalue(), "hello\\n") 1473 """ 1474 return captured_output("stdout") 1475 1476def captured_stderr(): 1477 """Capture the output of sys.stderr: 1478 1479 with captured_stderr() as stderr: 1480 print("hello", file=sys.stderr) 1481 self.assertEqual(stderr.getvalue(), "hello\\n") 1482 """ 1483 return captured_output("stderr") 1484 1485def captured_stdin(): 1486 """Capture the input to sys.stdin: 1487 1488 with captured_stdin() as stdin: 1489 stdin.write('hello\\n') 1490 stdin.seek(0) 1491 # call test code that consumes from sys.stdin 1492 captured = input() 1493 self.assertEqual(captured, "hello") 1494 """ 1495 return captured_output("stdin") 1496 1497 1498def gc_collect(): 1499 """Force as many objects as possible to be collected. 1500 1501 In non-CPython implementations of Python, this is needed because timely 1502 deallocation is not guaranteed by the garbage collector. (Even in CPython 1503 this can be the case in case of reference cycles.) This means that __del__ 1504 methods may be called later than expected and weakrefs may remain alive for 1505 longer than expected. This function tries its best to force all garbage 1506 objects to disappear. 1507 """ 1508 gc.collect() 1509 if is_jython: 1510 time.sleep(0.1) 1511 gc.collect() 1512 gc.collect() 1513 1514@contextlib.contextmanager 1515def disable_gc(): 1516 have_gc = gc.isenabled() 1517 gc.disable() 1518 try: 1519 yield 1520 finally: 1521 if have_gc: 1522 gc.enable() 1523 1524 1525def python_is_optimized(): 1526 """Find if Python was built with optimizations.""" 1527 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1528 final_opt = "" 1529 for opt in cflags.split(): 1530 if opt.startswith('-O'): 1531 final_opt = opt 1532 return final_opt not in ('', '-O0', '-Og') 1533 1534 1535_header = 'nP' 1536_align = '0n' 1537if hasattr(sys, "gettotalrefcount"): 1538 _header = '2P' + _header 1539 _align = '0P' 1540_vheader = _header + 'n' 1541 1542def calcobjsize(fmt): 1543 return struct.calcsize(_header + fmt + _align) 1544 1545def calcvobjsize(fmt): 1546 return struct.calcsize(_vheader + fmt + _align) 1547 1548 1549_TPFLAGS_HAVE_GC = 1<<14 1550_TPFLAGS_HEAPTYPE = 1<<9 1551 1552def check_sizeof(test, o, size): 1553 import _testcapi 1554 result = sys.getsizeof(o) 1555 # add GC header size 1556 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1557 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1558 size += _testcapi.SIZEOF_PYGC_HEAD 1559 msg = 'wrong size for %s: got %d, expected %d' \ 1560 % (type(o), result, size) 1561 test.assertEqual(result, size, msg) 1562 1563#======================================================================= 1564# Decorator for running a function in a different locale, correctly resetting 1565# it afterwards. 1566 1567def run_with_locale(catstr, *locales): 1568 def decorator(func): 1569 def inner(*args, **kwds): 1570 try: 1571 import locale 1572 category = getattr(locale, catstr) 1573 orig_locale = locale.setlocale(category) 1574 except AttributeError: 1575 # if the test author gives us an invalid category string 1576 raise 1577 except: 1578 # cannot retrieve original locale, so do nothing 1579 locale = orig_locale = None 1580 else: 1581 for loc in locales: 1582 try: 1583 locale.setlocale(category, loc) 1584 break 1585 except: 1586 pass 1587 1588 # now run the function, resetting the locale on exceptions 1589 try: 1590 return func(*args, **kwds) 1591 finally: 1592 if locale and orig_locale: 1593 locale.setlocale(category, orig_locale) 1594 inner.__name__ = func.__name__ 1595 inner.__doc__ = func.__doc__ 1596 return inner 1597 return decorator 1598 1599#======================================================================= 1600# Decorator for running a function in a specific timezone, correctly 1601# resetting it afterwards. 1602 1603def run_with_tz(tz): 1604 def decorator(func): 1605 def inner(*args, **kwds): 1606 try: 1607 tzset = time.tzset 1608 except AttributeError: 1609 raise unittest.SkipTest("tzset required") 1610 if 'TZ' in os.environ: 1611 orig_tz = os.environ['TZ'] 1612 else: 1613 orig_tz = None 1614 os.environ['TZ'] = tz 1615 tzset() 1616 1617 # now run the function, resetting the tz on exceptions 1618 try: 1619 return func(*args, **kwds) 1620 finally: 1621 if orig_tz is None: 1622 del os.environ['TZ'] 1623 else: 1624 os.environ['TZ'] = orig_tz 1625 time.tzset() 1626 1627 inner.__name__ = func.__name__ 1628 inner.__doc__ = func.__doc__ 1629 return inner 1630 return decorator 1631 1632#======================================================================= 1633# Big-memory-test support. Separate from 'resources' because memory use 1634# should be configurable. 1635 1636# Some handy shorthands. Note that these are used for byte-limits as well 1637# as size-limits, in the various bigmem tests 1638_1M = 1024*1024 1639_1G = 1024 * _1M 1640_2G = 2 * _1G 1641_4G = 4 * _1G 1642 1643MAX_Py_ssize_t = sys.maxsize 1644 1645def set_memlimit(limit): 1646 global max_memuse 1647 global real_max_memuse 1648 sizes = { 1649 'k': 1024, 1650 'm': _1M, 1651 'g': _1G, 1652 't': 1024*_1G, 1653 } 1654 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1655 re.IGNORECASE | re.VERBOSE) 1656 if m is None: 1657 raise ValueError('Invalid memory limit %r' % (limit,)) 1658 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1659 real_max_memuse = memlimit 1660 if memlimit > MAX_Py_ssize_t: 1661 memlimit = MAX_Py_ssize_t 1662 if memlimit < _2G - 1: 1663 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1664 max_memuse = memlimit 1665 1666class _MemoryWatchdog: 1667 """An object which periodically watches the process' memory consumption 1668 and prints it out. 1669 """ 1670 1671 def __init__(self): 1672 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1673 self.started = False 1674 1675 def start(self): 1676 try: 1677 f = open(self.procfile, 'r') 1678 except OSError as e: 1679 warnings.warn('/proc not available for stats: {}'.format(e), 1680 RuntimeWarning) 1681 sys.stderr.flush() 1682 return 1683 1684 watchdog_script = findfile("memory_watchdog.py") 1685 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1686 stdin=f, stderr=subprocess.DEVNULL) 1687 f.close() 1688 self.started = True 1689 1690 def stop(self): 1691 if self.started: 1692 self.mem_watchdog.terminate() 1693 self.mem_watchdog.wait() 1694 1695 1696def bigmemtest(size, memuse, dry_run=True): 1697 """Decorator for bigmem tests. 1698 1699 'size' is a requested size for the test (in arbitrary, test-interpreted 1700 units.) 'memuse' is the number of bytes per unit for the test, or a good 1701 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1702 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1703 1704 The 'size' argument is normally passed to the decorated test method as an 1705 extra argument. If 'dry_run' is true, the value passed to the test method 1706 may be less than the requested value. If 'dry_run' is false, it means the 1707 test doesn't support dummy runs when -M is not specified. 1708 """ 1709 def decorator(f): 1710 def wrapper(self): 1711 size = wrapper.size 1712 memuse = wrapper.memuse 1713 if not real_max_memuse: 1714 maxsize = 5147 1715 else: 1716 maxsize = size 1717 1718 if ((real_max_memuse or not dry_run) 1719 and real_max_memuse < maxsize * memuse): 1720 raise unittest.SkipTest( 1721 "not enough memory: %.1fG minimum needed" 1722 % (size * memuse / (1024 ** 3))) 1723 1724 if real_max_memuse and verbose: 1725 print() 1726 print(" ... expected peak memory use: {peak:.1f}G" 1727 .format(peak=size * memuse / (1024 ** 3))) 1728 watchdog = _MemoryWatchdog() 1729 watchdog.start() 1730 else: 1731 watchdog = None 1732 1733 try: 1734 return f(self, maxsize) 1735 finally: 1736 if watchdog: 1737 watchdog.stop() 1738 1739 wrapper.size = size 1740 wrapper.memuse = memuse 1741 return wrapper 1742 return decorator 1743 1744def bigaddrspacetest(f): 1745 """Decorator for tests that fill the address space.""" 1746 def wrapper(self): 1747 if max_memuse < MAX_Py_ssize_t: 1748 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1749 raise unittest.SkipTest( 1750 "not enough memory: try a 32-bit build instead") 1751 else: 1752 raise unittest.SkipTest( 1753 "not enough memory: %.1fG minimum needed" 1754 % (MAX_Py_ssize_t / (1024 ** 3))) 1755 else: 1756 return f(self) 1757 return wrapper 1758 1759#======================================================================= 1760# unittest integration. 1761 1762class BasicTestRunner: 1763 def run(self, test): 1764 result = unittest.TestResult() 1765 test(result) 1766 return result 1767 1768def _id(obj): 1769 return obj 1770 1771def requires_resource(resource): 1772 if resource == 'gui' and not _is_gui_available(): 1773 return unittest.skip(_is_gui_available.reason) 1774 if is_resource_enabled(resource): 1775 return _id 1776 else: 1777 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1778 1779def requires_android_level(level, reason): 1780 if is_android and _ANDROID_API_LEVEL < level: 1781 return unittest.skip('%s at Android API level %d' % 1782 (reason, _ANDROID_API_LEVEL)) 1783 else: 1784 return _id 1785 1786def cpython_only(test): 1787 """ 1788 Decorator for tests only applicable on CPython. 1789 """ 1790 return impl_detail(cpython=True)(test) 1791 1792def impl_detail(msg=None, **guards): 1793 if check_impl_detail(**guards): 1794 return _id 1795 if msg is None: 1796 guardnames, default = _parse_guards(guards) 1797 if default: 1798 msg = "implementation detail not available on {0}" 1799 else: 1800 msg = "implementation detail specific to {0}" 1801 guardnames = sorted(guardnames.keys()) 1802 msg = msg.format(' or '.join(guardnames)) 1803 return unittest.skip(msg) 1804 1805_have_mp_queue = None 1806def requires_multiprocessing_queue(test): 1807 """Skip decorator for tests that use multiprocessing.Queue.""" 1808 global _have_mp_queue 1809 if _have_mp_queue is None: 1810 import multiprocessing 1811 # Without a functioning shared semaphore implementation attempts to 1812 # instantiate a Queue will result in an ImportError (issue #3770). 1813 try: 1814 multiprocessing.Queue() 1815 _have_mp_queue = True 1816 except ImportError: 1817 _have_mp_queue = False 1818 msg = "requires a functioning shared semaphore implementation" 1819 return test if _have_mp_queue else unittest.skip(msg)(test) 1820 1821def _parse_guards(guards): 1822 # Returns a tuple ({platform_name: run_me}, default_value) 1823 if not guards: 1824 return ({'cpython': True}, False) 1825 is_true = list(guards.values())[0] 1826 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1827 return (guards, not is_true) 1828 1829# Use the following check to guard CPython's implementation-specific tests -- 1830# or to run them only on the implementation(s) guarded by the arguments. 1831def check_impl_detail(**guards): 1832 """This function returns True or False depending on the host platform. 1833 Examples: 1834 if check_impl_detail(): # only on CPython (default) 1835 if check_impl_detail(jython=True): # only on Jython 1836 if check_impl_detail(cpython=False): # everywhere except on CPython 1837 """ 1838 guards, default = _parse_guards(guards) 1839 return guards.get(platform.python_implementation().lower(), default) 1840 1841 1842def no_tracing(func): 1843 """Decorator to temporarily turn off tracing for the duration of a test.""" 1844 if not hasattr(sys, 'gettrace'): 1845 return func 1846 else: 1847 @functools.wraps(func) 1848 def wrapper(*args, **kwargs): 1849 original_trace = sys.gettrace() 1850 try: 1851 sys.settrace(None) 1852 return func(*args, **kwargs) 1853 finally: 1854 sys.settrace(original_trace) 1855 return wrapper 1856 1857 1858def refcount_test(test): 1859 """Decorator for tests which involve reference counting. 1860 1861 To start, the decorator does not run the test if is not run by CPython. 1862 After that, any trace function is unset during the test to prevent 1863 unexpected refcounts caused by the trace function. 1864 1865 """ 1866 return no_tracing(cpython_only(test)) 1867 1868 1869def _filter_suite(suite, pred): 1870 """Recursively filter test cases in a suite based on a predicate.""" 1871 newtests = [] 1872 for test in suite._tests: 1873 if isinstance(test, unittest.TestSuite): 1874 _filter_suite(test, pred) 1875 newtests.append(test) 1876 else: 1877 if pred(test): 1878 newtests.append(test) 1879 suite._tests = newtests 1880 1881def _run_suite(suite): 1882 """Run tests from a unittest.TestSuite-derived class.""" 1883 if verbose: 1884 runner = unittest.TextTestRunner(sys.stdout, verbosity=2, 1885 failfast=failfast) 1886 else: 1887 runner = BasicTestRunner() 1888 1889 result = runner.run(suite) 1890 if not result.wasSuccessful(): 1891 if len(result.errors) == 1 and not result.failures: 1892 err = result.errors[0][1] 1893 elif len(result.failures) == 1 and not result.errors: 1894 err = result.failures[0][1] 1895 else: 1896 err = "multiple errors occurred" 1897 if not verbose: err += "; run in verbose mode for details" 1898 raise TestFailed(err) 1899 1900 1901def run_unittest(*classes): 1902 """Run tests from unittest.TestCase-derived classes.""" 1903 valid_types = (unittest.TestSuite, unittest.TestCase) 1904 suite = unittest.TestSuite() 1905 for cls in classes: 1906 if isinstance(cls, str): 1907 if cls in sys.modules: 1908 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1909 else: 1910 raise ValueError("str arguments must be keys in sys.modules") 1911 elif isinstance(cls, valid_types): 1912 suite.addTest(cls) 1913 else: 1914 suite.addTest(unittest.makeSuite(cls)) 1915 def case_pred(test): 1916 if match_tests is None: 1917 return True 1918 for name in test.id().split("."): 1919 if fnmatch.fnmatchcase(name, match_tests): 1920 return True 1921 return False 1922 _filter_suite(suite, case_pred) 1923 _run_suite(suite) 1924 1925#======================================================================= 1926# Check for the presence of docstrings. 1927 1928# Rather than trying to enumerate all the cases where docstrings may be 1929# disabled, we just check for that directly 1930 1931def _check_docstrings(): 1932 """Just used to check if docstrings are enabled""" 1933 1934MISSING_C_DOCSTRINGS = (check_impl_detail() and 1935 sys.platform != 'win32' and 1936 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 1937 1938HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 1939 not MISSING_C_DOCSTRINGS) 1940 1941requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1942 "test requires docstrings") 1943 1944 1945#======================================================================= 1946# doctest driver. 1947 1948def run_doctest(module, verbosity=None, optionflags=0): 1949 """Run doctest on the given module. Return (#failures, #tests). 1950 1951 If optional argument verbosity is not specified (or is None), pass 1952 support's belief about verbosity on to doctest. Else doctest's 1953 usual behavior is used (it searches sys.argv for -v). 1954 """ 1955 1956 import doctest 1957 1958 if verbosity is None: 1959 verbosity = verbose 1960 else: 1961 verbosity = None 1962 1963 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 1964 if f: 1965 raise TestFailed("%d of %d doctests failed" % (f, t)) 1966 if verbose: 1967 print('doctest (%s) ... %d tests with zero failures' % 1968 (module.__name__, t)) 1969 return f, t 1970 1971 1972#======================================================================= 1973# Support for saving and restoring the imported modules. 1974 1975def modules_setup(): 1976 return sys.modules.copy(), 1977 1978def modules_cleanup(oldmodules): 1979 # Encoders/decoders are registered permanently within the internal 1980 # codec cache. If we destroy the corresponding modules their 1981 # globals will be set to None which will trip up the cached functions. 1982 encodings = [(k, v) for k, v in sys.modules.items() 1983 if k.startswith('encodings.')] 1984 sys.modules.clear() 1985 sys.modules.update(encodings) 1986 # XXX: This kind of problem can affect more than just encodings. In particular 1987 # extension modules (such as _ssl) don't cope with reloading properly. 1988 # Really, test modules should be cleaning out the test specific modules they 1989 # know they added (ala test_runpy) rather than relying on this function (as 1990 # test_importhooks and test_pkg do currently). 1991 # Implicitly imported *real* modules should be left alone (see issue 10556). 1992 sys.modules.update(oldmodules) 1993 1994#======================================================================= 1995# Threading support to prevent reporting refleaks when running regrtest.py -R 1996 1997# NOTE: we use thread._count() rather than threading.enumerate() (or the 1998# moral equivalent thereof) because a threading.Thread object is still alive 1999# until its __bootstrap() method has returned, even after it has been 2000# unregistered from the threading module. 2001# thread._count(), on the other hand, only gets decremented *after* the 2002# __bootstrap() method has returned, which gives us reliable reference counts 2003# at the end of a test run. 2004 2005def threading_setup(): 2006 if _thread: 2007 return _thread._count(), threading._dangling.copy() 2008 else: 2009 return 1, () 2010 2011def threading_cleanup(*original_values): 2012 if not _thread: 2013 return 2014 _MAX_COUNT = 100 2015 for count in range(_MAX_COUNT): 2016 values = _thread._count(), threading._dangling 2017 if values == original_values: 2018 break 2019 time.sleep(0.01) 2020 gc_collect() 2021 # XXX print a warning in case of failure? 2022 2023def reap_threads(func): 2024 """Use this function when threads are being used. This will 2025 ensure that the threads are cleaned up even when the test fails. 2026 If threading is unavailable this function does nothing. 2027 """ 2028 if not _thread: 2029 return func 2030 2031 @functools.wraps(func) 2032 def decorator(*args): 2033 key = threading_setup() 2034 try: 2035 return func(*args) 2036 finally: 2037 threading_cleanup(*key) 2038 return decorator 2039 2040def reap_children(): 2041 """Use this function at the end of test_main() whenever sub-processes 2042 are started. This will help ensure that no extra children (zombies) 2043 stick around to hog resources and create problems when looking 2044 for refleaks. 2045 """ 2046 2047 # Reap all our dead child processes so we don't leave zombies around. 2048 # These hog resources and might be causing some of the buildbots to die. 2049 if hasattr(os, 'waitpid'): 2050 any_process = -1 2051 while True: 2052 try: 2053 # This will raise an exception on Windows. That's ok. 2054 pid, status = os.waitpid(any_process, os.WNOHANG) 2055 if pid == 0: 2056 break 2057 except: 2058 break 2059 2060@contextlib.contextmanager 2061def start_threads(threads, unlock=None): 2062 threads = list(threads) 2063 started = [] 2064 try: 2065 try: 2066 for t in threads: 2067 t.start() 2068 started.append(t) 2069 except: 2070 if verbose: 2071 print("Can't start %d threads, only %d threads started" % 2072 (len(threads), len(started))) 2073 raise 2074 yield 2075 finally: 2076 try: 2077 if unlock: 2078 unlock() 2079 endtime = starttime = time.time() 2080 for timeout in range(1, 16): 2081 endtime += 60 2082 for t in started: 2083 t.join(max(endtime - time.time(), 0.01)) 2084 started = [t for t in started if t.isAlive()] 2085 if not started: 2086 break 2087 if verbose: 2088 print('Unable to join %d threads during a period of ' 2089 '%d minutes' % (len(started), timeout)) 2090 finally: 2091 started = [t for t in started if t.isAlive()] 2092 if started: 2093 faulthandler.dump_traceback(sys.stdout) 2094 raise AssertionError('Unable to join %d threads' % len(started)) 2095 2096@contextlib.contextmanager 2097def swap_attr(obj, attr, new_val): 2098 """Temporary swap out an attribute with a new object. 2099 2100 Usage: 2101 with swap_attr(obj, "attr", 5): 2102 ... 2103 2104 This will set obj.attr to 5 for the duration of the with: block, 2105 restoring the old value at the end of the block. If `attr` doesn't 2106 exist on `obj`, it will be created and then deleted at the end of the 2107 block. 2108 """ 2109 if hasattr(obj, attr): 2110 real_val = getattr(obj, attr) 2111 setattr(obj, attr, new_val) 2112 try: 2113 yield 2114 finally: 2115 setattr(obj, attr, real_val) 2116 else: 2117 setattr(obj, attr, new_val) 2118 try: 2119 yield 2120 finally: 2121 delattr(obj, attr) 2122 2123@contextlib.contextmanager 2124def swap_item(obj, item, new_val): 2125 """Temporary swap out an item with a new object. 2126 2127 Usage: 2128 with swap_item(obj, "item", 5): 2129 ... 2130 2131 This will set obj["item"] to 5 for the duration of the with: block, 2132 restoring the old value at the end of the block. If `item` doesn't 2133 exist on `obj`, it will be created and then deleted at the end of the 2134 block. 2135 """ 2136 if item in obj: 2137 real_val = obj[item] 2138 obj[item] = new_val 2139 try: 2140 yield 2141 finally: 2142 obj[item] = real_val 2143 else: 2144 obj[item] = new_val 2145 try: 2146 yield 2147 finally: 2148 del obj[item] 2149 2150def strip_python_stderr(stderr): 2151 """Strip the stderr of a Python process from potential debug output 2152 emitted by the interpreter. 2153 2154 This will typically be run on the result of the communicate() method 2155 of a subprocess.Popen object. 2156 """ 2157 stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip() 2158 return stderr 2159 2160requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), 2161 'types are immortal if COUNT_ALLOCS is defined') 2162 2163def args_from_interpreter_flags(): 2164 """Return a list of command-line arguments reproducing the current 2165 settings in sys.flags and sys.warnoptions.""" 2166 return subprocess._args_from_interpreter_flags() 2167 2168def optim_args_from_interpreter_flags(): 2169 """Return a list of command-line arguments reproducing the current 2170 optimization settings in sys.flags.""" 2171 return subprocess._optim_args_from_interpreter_flags() 2172 2173#============================================================ 2174# Support for assertions about logging. 2175#============================================================ 2176 2177class TestHandler(logging.handlers.BufferingHandler): 2178 def __init__(self, matcher): 2179 # BufferingHandler takes a "capacity" argument 2180 # so as to know when to flush. As we're overriding 2181 # shouldFlush anyway, we can set a capacity of zero. 2182 # You can call flush() manually to clear out the 2183 # buffer. 2184 logging.handlers.BufferingHandler.__init__(self, 0) 2185 self.matcher = matcher 2186 2187 def shouldFlush(self): 2188 return False 2189 2190 def emit(self, record): 2191 self.format(record) 2192 self.buffer.append(record.__dict__) 2193 2194 def matches(self, **kwargs): 2195 """ 2196 Look for a saved dict whose keys/values match the supplied arguments. 2197 """ 2198 result = False 2199 for d in self.buffer: 2200 if self.matcher.matches(d, **kwargs): 2201 result = True 2202 break 2203 return result 2204 2205class Matcher(object): 2206 2207 _partial_matches = ('msg', 'message') 2208 2209 def matches(self, d, **kwargs): 2210 """ 2211 Try to match a single dict with the supplied arguments. 2212 2213 Keys whose values are strings and which are in self._partial_matches 2214 will be checked for partial (i.e. substring) matches. You can extend 2215 this scheme to (for example) do regular expression matching, etc. 2216 """ 2217 result = True 2218 for k in kwargs: 2219 v = kwargs[k] 2220 dv = d.get(k) 2221 if not self.match_value(k, dv, v): 2222 result = False 2223 break 2224 return result 2225 2226 def match_value(self, k, dv, v): 2227 """ 2228 Try to match a single stored value (dv) with a supplied value (v). 2229 """ 2230 if type(v) != type(dv): 2231 result = False 2232 elif type(dv) is not str or k not in self._partial_matches: 2233 result = (v == dv) 2234 else: 2235 result = dv.find(v) >= 0 2236 return result 2237 2238 2239_can_symlink = None 2240def can_symlink(): 2241 global _can_symlink 2242 if _can_symlink is not None: 2243 return _can_symlink 2244 symlink_path = TESTFN + "can_symlink" 2245 try: 2246 os.symlink(TESTFN, symlink_path) 2247 can = True 2248 except (OSError, NotImplementedError, AttributeError): 2249 can = False 2250 else: 2251 os.remove(symlink_path) 2252 _can_symlink = can 2253 return can 2254 2255def skip_unless_symlink(test): 2256 """Skip decorator for tests that require functional symlink""" 2257 ok = can_symlink() 2258 msg = "Requires functional symlink implementation" 2259 return test if ok else unittest.skip(msg)(test) 2260 2261_can_xattr = None 2262def can_xattr(): 2263 global _can_xattr 2264 if _can_xattr is not None: 2265 return _can_xattr 2266 if not hasattr(os, "setxattr"): 2267 can = False 2268 else: 2269 tmp_fp, tmp_name = tempfile.mkstemp() 2270 try: 2271 with open(TESTFN, "wb") as fp: 2272 try: 2273 # TESTFN & tempfile may use different file systems with 2274 # different capabilities 2275 os.setxattr(tmp_fp, b"user.test", b"") 2276 os.setxattr(fp.fileno(), b"user.test", b"") 2277 # Kernels < 2.6.39 don't respect setxattr flags. 2278 kernel_version = platform.release() 2279 m = re.match(r"2.6.(\d{1,2})", kernel_version) 2280 can = m is None or int(m.group(1)) >= 39 2281 except OSError: 2282 can = False 2283 finally: 2284 unlink(TESTFN) 2285 unlink(tmp_name) 2286 _can_xattr = can 2287 return can 2288 2289def skip_unless_xattr(test): 2290 """Skip decorator for tests that require functional extended attributes""" 2291 ok = can_xattr() 2292 msg = "no non-broken extended attribute support" 2293 return test if ok else unittest.skip(msg)(test) 2294 2295 2296def fs_is_case_insensitive(directory): 2297 """Detects if the file system for the specified directory is case-insensitive.""" 2298 with tempfile.NamedTemporaryFile(dir=directory) as base: 2299 base_path = base.name 2300 case_path = base_path.upper() 2301 if case_path == base_path: 2302 case_path = base_path.lower() 2303 try: 2304 return os.path.samefile(base_path, case_path) 2305 except FileNotFoundError: 2306 return False 2307 2308 2309def detect_api_mismatch(ref_api, other_api, *, ignore=()): 2310 """Returns the set of items in ref_api not in other_api, except for a 2311 defined list of items to be ignored in this check. 2312 2313 By default this skips private attributes beginning with '_' but 2314 includes all magic methods, i.e. those starting and ending in '__'. 2315 """ 2316 missing_items = set(dir(ref_api)) - set(dir(other_api)) 2317 if ignore: 2318 missing_items -= set(ignore) 2319 missing_items = set(m for m in missing_items 2320 if not m.startswith('_') or m.endswith('__')) 2321 return missing_items 2322 2323 2324def check__all__(test_case, module, name_of_module=None, extra=(), 2325 blacklist=()): 2326 """Assert that the __all__ variable of 'module' contains all public names. 2327 2328 The module's public names (its API) are detected automatically based on 2329 whether they match the public name convention and were defined in 2330 'module'. 2331 2332 The 'name_of_module' argument can specify (as a string or tuple thereof) 2333 what module(s) an API could be defined in in order to be detected as a 2334 public API. One case for this is when 'module' imports part of its public 2335 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 2336 2337 The 'extra' argument can be a set of names that wouldn't otherwise be 2338 automatically detected as "public", like objects without a proper 2339 '__module__' attriubute. If provided, it will be added to the 2340 automatically detected ones. 2341 2342 The 'blacklist' argument can be a set of names that must not be treated 2343 as part of the public API even though their names indicate otherwise. 2344 2345 Usage: 2346 import bar 2347 import foo 2348 import unittest 2349 from test import support 2350 2351 class MiscTestCase(unittest.TestCase): 2352 def test__all__(self): 2353 support.check__all__(self, foo) 2354 2355 class OtherTestCase(unittest.TestCase): 2356 def test__all__(self): 2357 extra = {'BAR_CONST', 'FOO_CONST'} 2358 blacklist = {'baz'} # Undocumented name. 2359 # bar imports part of its API from _bar. 2360 support.check__all__(self, bar, ('bar', '_bar'), 2361 extra=extra, blacklist=blacklist) 2362 2363 """ 2364 2365 if name_of_module is None: 2366 name_of_module = (module.__name__, ) 2367 elif isinstance(name_of_module, str): 2368 name_of_module = (name_of_module, ) 2369 2370 expected = set(extra) 2371 2372 for name in dir(module): 2373 if name.startswith('_') or name in blacklist: 2374 continue 2375 obj = getattr(module, name) 2376 if (getattr(obj, '__module__', None) in name_of_module or 2377 (not hasattr(obj, '__module__') and 2378 not isinstance(obj, types.ModuleType))): 2379 expected.add(name) 2380 test_case.assertCountEqual(module.__all__, expected) 2381 2382 2383class SuppressCrashReport: 2384 """Try to prevent a crash report from popping up. 2385 2386 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 2387 disable the creation of coredump file. 2388 """ 2389 old_value = None 2390 old_modes = None 2391 2392 def __enter__(self): 2393 """On Windows, disable Windows Error Reporting dialogs using 2394 SetErrorMode. 2395 2396 On UNIX, try to save the previous core file size limit, then set 2397 soft limit to 0. 2398 """ 2399 if sys.platform.startswith('win'): 2400 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 2401 # GetErrorMode is not available on Windows XP and Windows Server 2003, 2402 # but SetErrorMode returns the previous value, so we can use that 2403 import ctypes 2404 self._k32 = ctypes.windll.kernel32 2405 SEM_NOGPFAULTERRORBOX = 0x02 2406 self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) 2407 self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) 2408 2409 # Suppress assert dialogs in debug builds 2410 # (see http://bugs.python.org/issue23314) 2411 try: 2412 import msvcrt 2413 msvcrt.CrtSetReportMode 2414 except (AttributeError, ImportError): 2415 # no msvcrt or a release build 2416 pass 2417 else: 2418 self.old_modes = {} 2419 for report_type in [msvcrt.CRT_WARN, 2420 msvcrt.CRT_ERROR, 2421 msvcrt.CRT_ASSERT]: 2422 old_mode = msvcrt.CrtSetReportMode(report_type, 2423 msvcrt.CRTDBG_MODE_FILE) 2424 old_file = msvcrt.CrtSetReportFile(report_type, 2425 msvcrt.CRTDBG_FILE_STDERR) 2426 self.old_modes[report_type] = old_mode, old_file 2427 2428 else: 2429 if resource is not None: 2430 try: 2431 self.old_value = resource.getrlimit(resource.RLIMIT_CORE) 2432 resource.setrlimit(resource.RLIMIT_CORE, 2433 (0, self.old_value[1])) 2434 except (ValueError, OSError): 2435 pass 2436 if sys.platform == 'darwin': 2437 # Check if the 'Crash Reporter' on OSX was configured 2438 # in 'Developer' mode and warn that it will get triggered 2439 # when it is. 2440 # 2441 # This assumes that this context manager is used in tests 2442 # that might trigger the next manager. 2443 value = subprocess.Popen(['/usr/bin/defaults', 'read', 2444 'com.apple.CrashReporter', 'DialogType'], 2445 stdout=subprocess.PIPE).communicate()[0] 2446 if value.strip() == b'developer': 2447 print("this test triggers the Crash Reporter, " 2448 "that is intentional", end='', flush=True) 2449 2450 return self 2451 2452 def __exit__(self, *ignore_exc): 2453 """Restore Windows ErrorMode or core file behavior to initial value.""" 2454 if self.old_value is None: 2455 return 2456 2457 if sys.platform.startswith('win'): 2458 self._k32.SetErrorMode(self.old_value) 2459 2460 if self.old_modes: 2461 import msvcrt 2462 for report_type, (old_mode, old_file) in self.old_modes.items(): 2463 msvcrt.CrtSetReportMode(report_type, old_mode) 2464 msvcrt.CrtSetReportFile(report_type, old_file) 2465 else: 2466 if resource is not None: 2467 try: 2468 resource.setrlimit(resource.RLIMIT_CORE, self.old_value) 2469 except (ValueError, OSError): 2470 pass 2471 2472 2473def patch(test_instance, object_to_patch, attr_name, new_value): 2474 """Override 'object_to_patch'.'attr_name' with 'new_value'. 2475 2476 Also, add a cleanup procedure to 'test_instance' to restore 2477 'object_to_patch' value for 'attr_name'. 2478 The 'attr_name' should be a valid attribute for 'object_to_patch'. 2479 2480 """ 2481 # check that 'attr_name' is a real attribute for 'object_to_patch' 2482 # will raise AttributeError if it does not exist 2483 getattr(object_to_patch, attr_name) 2484 2485 # keep a copy of the old value 2486 attr_is_local = False 2487 try: 2488 old_value = object_to_patch.__dict__[attr_name] 2489 except (AttributeError, KeyError): 2490 old_value = getattr(object_to_patch, attr_name, None) 2491 else: 2492 attr_is_local = True 2493 2494 # restore the value when the test is done 2495 def cleanup(): 2496 if attr_is_local: 2497 setattr(object_to_patch, attr_name, old_value) 2498 else: 2499 delattr(object_to_patch, attr_name) 2500 2501 test_instance.addCleanup(cleanup) 2502 2503 # actually override the attribute 2504 setattr(object_to_patch, attr_name, new_value) 2505 2506 2507def run_in_subinterp(code): 2508 """ 2509 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 2510 module is enabled. 2511 """ 2512 # Issue #10915, #15751: PyGILState_*() functions don't work with 2513 # sub-interpreters, the tracemalloc module uses these functions internally 2514 try: 2515 import tracemalloc 2516 except ImportError: 2517 pass 2518 else: 2519 if tracemalloc.is_tracing(): 2520 raise unittest.SkipTest("run_in_subinterp() cannot be used " 2521 "if tracemalloc module is tracing " 2522 "memory allocations") 2523 import _testcapi 2524 return _testcapi.run_in_subinterp(code) 2525 2526 2527def check_free_after_iterating(test, iter, cls, args=()): 2528 class A(cls): 2529 def __del__(self): 2530 nonlocal done 2531 done = True 2532 try: 2533 next(it) 2534 except StopIteration: 2535 pass 2536 2537 done = False 2538 it = iter(A(*args)) 2539 # Issue 26494: Shouldn't crash 2540 test.assertRaises(StopIteration, next, it) 2541 # The sequence should be deallocated just after the end of iterating 2542 gc_collect() 2543 test.assertTrue(done) 2544 2545 2546def missing_compiler_executable(cmd_names=[]): 2547 """Check if the compiler components used to build the interpreter exist. 2548 2549 Check for the existence of the compiler executables whose names are listed 2550 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 2551 and return the first missing executable or None when none is found 2552 missing. 2553 2554 """ 2555 from distutils import ccompiler, sysconfig, spawn 2556 compiler = ccompiler.new_compiler() 2557 sysconfig.customize_compiler(compiler) 2558 for name in compiler.executables: 2559 if cmd_names and name not in cmd_names: 2560 continue 2561 cmd = getattr(compiler, name) 2562 if cmd_names: 2563 assert cmd is not None, \ 2564 "the '%s' executable is not configured" % name 2565 elif cmd is None: 2566 continue 2567 if spawn.find_executable(cmd[0]) is None: 2568 return cmd[0] 2569 2570 2571_is_android_emulator = None 2572def setswitchinterval(interval): 2573 # Setting a very low gil interval on the Android emulator causes python 2574 # to hang (issue #26939). 2575 minimum_interval = 1e-5 2576 if is_android and interval < minimum_interval: 2577 global _is_android_emulator 2578 if _is_android_emulator is None: 2579 _is_android_emulator = (subprocess.check_output( 2580 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 2581 if _is_android_emulator: 2582 interval = minimum_interval 2583 return sys.setswitchinterval(interval) 2584