1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('support must be imported from the test package') 5 6import collections.abc 7import contextlib 8import datetime 9import errno 10import faulthandler 11import fnmatch 12import functools 13import gc 14import importlib 15import importlib.util 16import io 17import logging.handlers 18import nntplib 19import os 20import platform 21import re 22import shutil 23import socket 24import stat 25import struct 26import subprocess 27import sys 28import sysconfig 29import tempfile 30import _thread 31import threading 32import time 33import types 34import unittest 35import urllib.error 36import warnings 37 38from .testresult import get_test_runner 39 40try: 41 import multiprocessing.process 42except ImportError: 43 multiprocessing = None 44 45try: 46 import zlib 47except ImportError: 48 zlib = None 49 50try: 51 import gzip 52except ImportError: 53 gzip = None 54 55try: 56 import bz2 57except ImportError: 58 bz2 = None 59 60try: 61 import lzma 62except ImportError: 63 lzma = None 64 65try: 66 import resource 67except ImportError: 68 resource = None 69 70__all__ = [ 71 # globals 72 "PIPE_MAX_SIZE", "verbose", "max_memuse", "use_resources", "failfast", 73 # exceptions 74 "Error", "TestFailed", "TestDidNotRun", "ResourceDenied", 75 # imports 76 "import_module", "import_fresh_module", "CleanImport", 77 # modules 78 "unload", "forget", 79 # io 80 "record_original_stdout", "get_original_stdout", "captured_stdout", 81 "captured_stdin", "captured_stderr", 82 # filesystem 83 "TESTFN", "SAVEDCWD", "unlink", "rmtree", "temp_cwd", "findfile", 84 "create_empty_file", "can_symlink", "fs_is_case_insensitive", 85 # unittest 86 "is_resource_enabled", "requires", "requires_freebsd_version", 87 "requires_linux_version", "requires_mac_ver", "check_syntax_error", 88 "TransientResource", "time_out", "socket_peer_reset", "ioerror_peer_reset", 89 "transient_internet", "BasicTestRunner", "run_unittest", "run_doctest", 90 "skip_unless_symlink", "requires_gzip", "requires_bz2", "requires_lzma", 91 "bigmemtest", "bigaddrspacetest", "cpython_only", "get_attribute", 92 "requires_IEEE_754", "skip_unless_xattr", "requires_zlib", 93 "anticipate_failure", "load_package_tests", "detect_api_mismatch", 94 "check__all__", "skip_unless_bind_unix_socket", 95 # sys 96 "is_jython", "is_android", "check_impl_detail", "unix_shell", 97 "setswitchinterval", 98 # network 99 "HOST", "IPV6_ENABLED", "find_unused_port", "bind_port", "open_urlresource", 100 "bind_unix_socket", 101 # processes 102 'temp_umask', "reap_children", 103 # logging 104 "TestHandler", 105 # threads 106 "threading_setup", "threading_cleanup", "reap_threads", "start_threads", 107 # miscellaneous 108 "check_warnings", "check_no_resource_warning", "EnvironmentVarGuard", 109 "run_with_locale", "swap_item", 110 "swap_attr", "Matcher", "set_memlimit", "SuppressCrashReport", "sortdict", 111 "run_with_tz", "PGO", "missing_compiler_executable", "fd_count", 112 ] 113 114class Error(Exception): 115 """Base class for regression test exceptions.""" 116 117class TestFailed(Error): 118 """Test failed.""" 119 120class TestDidNotRun(Error): 121 """Test did not run any subtests.""" 122 123class ResourceDenied(unittest.SkipTest): 124 """Test skipped because it requested a disallowed resource. 125 126 This is raised when a test calls requires() for a resource that 127 has not be enabled. It is used to distinguish between expected 128 and unexpected skips. 129 """ 130 131@contextlib.contextmanager 132def _ignore_deprecated_imports(ignore=True): 133 """Context manager to suppress package and module deprecation 134 warnings when importing them. 135 136 If ignore is False, this context manager has no effect. 137 """ 138 if ignore: 139 with warnings.catch_warnings(): 140 warnings.filterwarnings("ignore", ".+ (module|package)", 141 DeprecationWarning) 142 yield 143 else: 144 yield 145 146 147def import_module(name, deprecated=False, *, required_on=()): 148 """Import and return the module to be tested, raising SkipTest if 149 it is not available. 150 151 If deprecated is True, any module or package deprecation messages 152 will be suppressed. If a module is required on a platform but optional for 153 others, set required_on to an iterable of platform prefixes which will be 154 compared against sys.platform. 155 """ 156 with _ignore_deprecated_imports(deprecated): 157 try: 158 return importlib.import_module(name) 159 except ImportError as msg: 160 if sys.platform.startswith(tuple(required_on)): 161 raise 162 raise unittest.SkipTest(str(msg)) 163 164 165def _save_and_remove_module(name, orig_modules): 166 """Helper function to save and remove a module from sys.modules 167 168 Raise ImportError if the module can't be imported. 169 """ 170 # try to import the module and raise an error if it can't be imported 171 if name not in sys.modules: 172 __import__(name) 173 del sys.modules[name] 174 for modname in list(sys.modules): 175 if modname == name or modname.startswith(name + '.'): 176 orig_modules[modname] = sys.modules[modname] 177 del sys.modules[modname] 178 179def _save_and_block_module(name, orig_modules): 180 """Helper function to save and block a module in sys.modules 181 182 Return True if the module was in sys.modules, False otherwise. 183 """ 184 saved = True 185 try: 186 orig_modules[name] = sys.modules[name] 187 except KeyError: 188 saved = False 189 sys.modules[name] = None 190 return saved 191 192 193def anticipate_failure(condition): 194 """Decorator to mark a test that is known to be broken in some cases 195 196 Any use of this decorator should have a comment identifying the 197 associated tracker issue. 198 """ 199 if condition: 200 return unittest.expectedFailure 201 return lambda f: f 202 203def load_package_tests(pkg_dir, loader, standard_tests, pattern): 204 """Generic load_tests implementation for simple test packages. 205 206 Most packages can implement load_tests using this function as follows: 207 208 def load_tests(*args): 209 return load_package_tests(os.path.dirname(__file__), *args) 210 """ 211 if pattern is None: 212 pattern = "test*" 213 top_dir = os.path.dirname( # Lib 214 os.path.dirname( # test 215 os.path.dirname(__file__))) # support 216 package_tests = loader.discover(start_dir=pkg_dir, 217 top_level_dir=top_dir, 218 pattern=pattern) 219 standard_tests.addTests(package_tests) 220 return standard_tests 221 222 223def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 224 """Import and return a module, deliberately bypassing sys.modules. 225 226 This function imports and returns a fresh copy of the named Python module 227 by removing the named module from sys.modules before doing the import. 228 Note that unlike reload, the original module is not affected by 229 this operation. 230 231 *fresh* is an iterable of additional module names that are also removed 232 from the sys.modules cache before doing the import. 233 234 *blocked* is an iterable of module names that are replaced with None 235 in the module cache during the import to ensure that attempts to import 236 them raise ImportError. 237 238 The named module and any modules named in the *fresh* and *blocked* 239 parameters are saved before starting the import and then reinserted into 240 sys.modules when the fresh import is complete. 241 242 Module and package deprecation messages are suppressed during this import 243 if *deprecated* is True. 244 245 This function will raise ImportError if the named module cannot be 246 imported. 247 """ 248 # NOTE: test_heapq, test_json and test_warnings include extra sanity checks 249 # to make sure that this utility function is working as expected 250 with _ignore_deprecated_imports(deprecated): 251 # Keep track of modules saved for later restoration as well 252 # as those which just need a blocking entry removed 253 orig_modules = {} 254 names_to_remove = [] 255 _save_and_remove_module(name, orig_modules) 256 try: 257 for fresh_name in fresh: 258 _save_and_remove_module(fresh_name, orig_modules) 259 for blocked_name in blocked: 260 if not _save_and_block_module(blocked_name, orig_modules): 261 names_to_remove.append(blocked_name) 262 fresh_module = importlib.import_module(name) 263 except ImportError: 264 fresh_module = None 265 finally: 266 for orig_name, module in orig_modules.items(): 267 sys.modules[orig_name] = module 268 for name_to_remove in names_to_remove: 269 del sys.modules[name_to_remove] 270 return fresh_module 271 272 273def get_attribute(obj, name): 274 """Get an attribute, raising SkipTest if AttributeError is raised.""" 275 try: 276 attribute = getattr(obj, name) 277 except AttributeError: 278 raise unittest.SkipTest("object %r has no attribute %r" % (obj, name)) 279 else: 280 return attribute 281 282verbose = 1 # Flag set to 0 by regrtest.py 283use_resources = None # Flag set to [] by regrtest.py 284max_memuse = 0 # Disable bigmem tests (they will still be run with 285 # small sizes, to make sure they work.) 286real_max_memuse = 0 287junit_xml_list = None # list of testsuite XML elements 288failfast = False 289 290# _original_stdout is meant to hold stdout at the time regrtest began. 291# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 292# The point is to have some flavor of stdout the user can actually see. 293_original_stdout = None 294def record_original_stdout(stdout): 295 global _original_stdout 296 _original_stdout = stdout 297 298def get_original_stdout(): 299 return _original_stdout or sys.stdout 300 301def unload(name): 302 try: 303 del sys.modules[name] 304 except KeyError: 305 pass 306 307def _force_run(path, func, *args): 308 try: 309 return func(*args) 310 except OSError as err: 311 if verbose >= 2: 312 print('%s: %s' % (err.__class__.__name__, err)) 313 print('re-run %s%r' % (func.__name__, args)) 314 os.chmod(path, stat.S_IRWXU) 315 return func(*args) 316 317if sys.platform.startswith("win"): 318 def _waitfor(func, pathname, waitall=False): 319 # Perform the operation 320 func(pathname) 321 # Now setup the wait loop 322 if waitall: 323 dirname = pathname 324 else: 325 dirname, name = os.path.split(pathname) 326 dirname = dirname or '.' 327 # Check for `pathname` to be removed from the filesystem. 328 # The exponential backoff of the timeout amounts to a total 329 # of ~1 second after which the deletion is probably an error 330 # anyway. 331 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 332 # required when contention occurs. 333 timeout = 0.001 334 while timeout < 1.0: 335 # Note we are only testing for the existence of the file(s) in 336 # the contents of the directory regardless of any security or 337 # access rights. If we have made it this far, we have sufficient 338 # permissions to do that much using Python's equivalent of the 339 # Windows API FindFirstFile. 340 # Other Windows APIs can fail or give incorrect results when 341 # dealing with files that are pending deletion. 342 L = os.listdir(dirname) 343 if not (L if waitall else name in L): 344 return 345 # Increase the timeout and try again 346 time.sleep(timeout) 347 timeout *= 2 348 warnings.warn('tests may fail, delete still pending for ' + pathname, 349 RuntimeWarning, stacklevel=4) 350 351 def _unlink(filename): 352 _waitfor(os.unlink, filename) 353 354 def _rmdir(dirname): 355 _waitfor(os.rmdir, dirname) 356 357 def _rmtree(path): 358 def _rmtree_inner(path): 359 for name in _force_run(path, os.listdir, path): 360 fullname = os.path.join(path, name) 361 try: 362 mode = os.lstat(fullname).st_mode 363 except OSError as exc: 364 print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc), 365 file=sys.__stderr__) 366 mode = 0 367 if stat.S_ISDIR(mode): 368 _waitfor(_rmtree_inner, fullname, waitall=True) 369 _force_run(fullname, os.rmdir, fullname) 370 else: 371 _force_run(fullname, os.unlink, fullname) 372 _waitfor(_rmtree_inner, path, waitall=True) 373 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 374 375 def _longpath(path): 376 try: 377 import ctypes 378 except ImportError: 379 # No ctypes means we can't expands paths. 380 pass 381 else: 382 buffer = ctypes.create_unicode_buffer(len(path) * 2) 383 length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer, 384 len(buffer)) 385 if length: 386 return buffer[:length] 387 return path 388else: 389 _unlink = os.unlink 390 _rmdir = os.rmdir 391 392 def _rmtree(path): 393 try: 394 shutil.rmtree(path) 395 return 396 except OSError: 397 pass 398 399 def _rmtree_inner(path): 400 for name in _force_run(path, os.listdir, path): 401 fullname = os.path.join(path, name) 402 try: 403 mode = os.lstat(fullname).st_mode 404 except OSError: 405 mode = 0 406 if stat.S_ISDIR(mode): 407 _rmtree_inner(fullname) 408 _force_run(path, os.rmdir, fullname) 409 else: 410 _force_run(path, os.unlink, fullname) 411 _rmtree_inner(path) 412 os.rmdir(path) 413 414 def _longpath(path): 415 return path 416 417def unlink(filename): 418 try: 419 _unlink(filename) 420 except (FileNotFoundError, NotADirectoryError): 421 pass 422 423def rmdir(dirname): 424 try: 425 _rmdir(dirname) 426 except FileNotFoundError: 427 pass 428 429def rmtree(path): 430 try: 431 _rmtree(path) 432 except FileNotFoundError: 433 pass 434 435def make_legacy_pyc(source): 436 """Move a PEP 3147/488 pyc file to its legacy pyc location. 437 438 :param source: The file system path to the source file. The source file 439 does not need to exist, however the PEP 3147/488 pyc file must exist. 440 :return: The file system path to the legacy pyc file. 441 """ 442 pyc_file = importlib.util.cache_from_source(source) 443 up_one = os.path.dirname(os.path.abspath(source)) 444 legacy_pyc = os.path.join(up_one, source + 'c') 445 os.rename(pyc_file, legacy_pyc) 446 return legacy_pyc 447 448def forget(modname): 449 """'Forget' a module was ever imported. 450 451 This removes the module from sys.modules and deletes any PEP 3147/488 or 452 legacy .pyc files. 453 """ 454 unload(modname) 455 for dirname in sys.path: 456 source = os.path.join(dirname, modname + '.py') 457 # It doesn't matter if they exist or not, unlink all possible 458 # combinations of PEP 3147/488 and legacy pyc files. 459 unlink(source + 'c') 460 for opt in ('', 1, 2): 461 unlink(importlib.util.cache_from_source(source, optimization=opt)) 462 463# Check whether a gui is actually available 464def _is_gui_available(): 465 if hasattr(_is_gui_available, 'result'): 466 return _is_gui_available.result 467 reason = None 468 if sys.platform.startswith('win'): 469 # if Python is running as a service (such as the buildbot service), 470 # gui interaction may be disallowed 471 import ctypes 472 import ctypes.wintypes 473 UOI_FLAGS = 1 474 WSF_VISIBLE = 0x0001 475 class USEROBJECTFLAGS(ctypes.Structure): 476 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 477 ("fReserved", ctypes.wintypes.BOOL), 478 ("dwFlags", ctypes.wintypes.DWORD)] 479 dll = ctypes.windll.user32 480 h = dll.GetProcessWindowStation() 481 if not h: 482 raise ctypes.WinError() 483 uof = USEROBJECTFLAGS() 484 needed = ctypes.wintypes.DWORD() 485 res = dll.GetUserObjectInformationW(h, 486 UOI_FLAGS, 487 ctypes.byref(uof), 488 ctypes.sizeof(uof), 489 ctypes.byref(needed)) 490 if not res: 491 raise ctypes.WinError() 492 if not bool(uof.dwFlags & WSF_VISIBLE): 493 reason = "gui not available (WSF_VISIBLE flag not set)" 494 elif sys.platform == 'darwin': 495 # The Aqua Tk implementations on OS X can abort the process if 496 # being called in an environment where a window server connection 497 # cannot be made, for instance when invoked by a buildbot or ssh 498 # process not running under the same user id as the current console 499 # user. To avoid that, raise an exception if the window manager 500 # connection is not available. 501 from ctypes import cdll, c_int, pointer, Structure 502 from ctypes.util import find_library 503 504 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 505 506 if app_services.CGMainDisplayID() == 0: 507 reason = "gui tests cannot run without OS X window manager" 508 else: 509 class ProcessSerialNumber(Structure): 510 _fields_ = [("highLongOfPSN", c_int), 511 ("lowLongOfPSN", c_int)] 512 psn = ProcessSerialNumber() 513 psn_p = pointer(psn) 514 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 515 (app_services.SetFrontProcess(psn_p) < 0) ): 516 reason = "cannot run without OS X gui process" 517 518 # check on every platform whether tkinter can actually do anything 519 if not reason: 520 try: 521 from tkinter import Tk 522 root = Tk() 523 root.withdraw() 524 root.update() 525 root.destroy() 526 except Exception as e: 527 err_string = str(e) 528 if len(err_string) > 50: 529 err_string = err_string[:50] + ' [...]' 530 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 531 err_string) 532 533 _is_gui_available.reason = reason 534 _is_gui_available.result = not reason 535 536 return _is_gui_available.result 537 538def is_resource_enabled(resource): 539 """Test whether a resource is enabled. 540 541 Known resources are set by regrtest.py. If not running under regrtest.py, 542 all resources are assumed enabled unless use_resources has been set. 543 """ 544 return use_resources is None or resource in use_resources 545 546def requires(resource, msg=None): 547 """Raise ResourceDenied if the specified resource is not available.""" 548 if not is_resource_enabled(resource): 549 if msg is None: 550 msg = "Use of the %r resource not enabled" % resource 551 raise ResourceDenied(msg) 552 if resource == 'gui' and not _is_gui_available(): 553 raise ResourceDenied(_is_gui_available.reason) 554 555def _requires_unix_version(sysname, min_version): 556 """Decorator raising SkipTest if the OS is `sysname` and the version is less 557 than `min_version`. 558 559 For example, @_requires_unix_version('FreeBSD', (7, 2)) raises SkipTest if 560 the FreeBSD version is less than 7.2. 561 """ 562 def decorator(func): 563 @functools.wraps(func) 564 def wrapper(*args, **kw): 565 if platform.system() == sysname: 566 version_txt = platform.release().split('-', 1)[0] 567 try: 568 version = tuple(map(int, version_txt.split('.'))) 569 except ValueError: 570 pass 571 else: 572 if version < min_version: 573 min_version_txt = '.'.join(map(str, min_version)) 574 raise unittest.SkipTest( 575 "%s version %s or higher required, not %s" 576 % (sysname, min_version_txt, version_txt)) 577 return func(*args, **kw) 578 wrapper.min_version = min_version 579 return wrapper 580 return decorator 581 582def requires_freebsd_version(*min_version): 583 """Decorator raising SkipTest if the OS is FreeBSD and the FreeBSD version is 584 less than `min_version`. 585 586 For example, @requires_freebsd_version(7, 2) raises SkipTest if the FreeBSD 587 version is less than 7.2. 588 """ 589 return _requires_unix_version('FreeBSD', min_version) 590 591def requires_linux_version(*min_version): 592 """Decorator raising SkipTest if the OS is Linux and the Linux version is 593 less than `min_version`. 594 595 For example, @requires_linux_version(2, 6, 32) raises SkipTest if the Linux 596 version is less than 2.6.32. 597 """ 598 return _requires_unix_version('Linux', min_version) 599 600def requires_mac_ver(*min_version): 601 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 602 version if less than min_version. 603 604 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 605 is lesser than 10.5. 606 """ 607 def decorator(func): 608 @functools.wraps(func) 609 def wrapper(*args, **kw): 610 if sys.platform == 'darwin': 611 version_txt = platform.mac_ver()[0] 612 try: 613 version = tuple(map(int, version_txt.split('.'))) 614 except ValueError: 615 pass 616 else: 617 if version < min_version: 618 min_version_txt = '.'.join(map(str, min_version)) 619 raise unittest.SkipTest( 620 "Mac OS X %s or higher required, not %s" 621 % (min_version_txt, version_txt)) 622 return func(*args, **kw) 623 wrapper.min_version = min_version 624 return wrapper 625 return decorator 626 627 628HOST = "localhost" 629HOSTv4 = "127.0.0.1" 630HOSTv6 = "::1" 631 632 633def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 634 """Returns an unused port that should be suitable for binding. This is 635 achieved by creating a temporary socket with the same family and type as 636 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 637 the specified host address (defaults to 0.0.0.0) with the port set to 0, 638 eliciting an unused ephemeral port from the OS. The temporary socket is 639 then closed and deleted, and the ephemeral port is returned. 640 641 Either this method or bind_port() should be used for any tests where a 642 server socket needs to be bound to a particular port for the duration of 643 the test. Which one to use depends on whether the calling code is creating 644 a python socket, or if an unused port needs to be provided in a constructor 645 or passed to an external program (i.e. the -accept argument to openssl's 646 s_server mode). Always prefer bind_port() over find_unused_port() where 647 possible. Hard coded ports should *NEVER* be used. As soon as a server 648 socket is bound to a hard coded port, the ability to run multiple instances 649 of the test simultaneously on the same host is compromised, which makes the 650 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 651 may simply manifest as a failed test, which can be recovered from without 652 intervention in most cases, but on Windows, the entire python process can 653 completely and utterly wedge, requiring someone to log in to the buildbot 654 and manually kill the affected process. 655 656 (This is easy to reproduce on Windows, unfortunately, and can be traced to 657 the SO_REUSEADDR socket option having different semantics on Windows versus 658 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 659 listen and then accept connections on identical host/ports. An EADDRINUSE 660 OSError will be raised at some point (depending on the platform and 661 the order bind and listen were called on each socket). 662 663 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 664 will ever be raised when attempting to bind two identical host/ports. When 665 accept() is called on each socket, the second caller's process will steal 666 the port from the first caller, leaving them both in an awkwardly wedged 667 state where they'll no longer respond to any signals or graceful kills, and 668 must be forcibly killed via OpenProcess()/TerminateProcess(). 669 670 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 671 instead of SO_REUSEADDR, which effectively affords the same semantics as 672 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 673 Source world compared to Windows ones, this is a common mistake. A quick 674 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 675 openssl.exe is called with the 's_server' option, for example. See 676 http://bugs.python.org/issue2550 for more info. The following site also 677 has a very thorough description about the implications of both REUSEADDR 678 and EXCLUSIVEADDRUSE on Windows: 679 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 680 681 XXX: although this approach is a vast improvement on previous attempts to 682 elicit unused ports, it rests heavily on the assumption that the ephemeral 683 port returned to us by the OS won't immediately be dished back out to some 684 other process when we close and delete our temporary socket but before our 685 calling code has a chance to bind the returned port. We can deal with this 686 issue if/when we come across it. 687 """ 688 689 tempsock = socket.socket(family, socktype) 690 port = bind_port(tempsock) 691 tempsock.close() 692 del tempsock 693 return port 694 695def bind_port(sock, host=HOST): 696 """Bind the socket to a free port and return the port number. Relies on 697 ephemeral ports in order to ensure we are using an unbound port. This is 698 important as many tests may be running simultaneously, especially in a 699 buildbot environment. This method raises an exception if the sock.family 700 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 701 or SO_REUSEPORT set on it. Tests should *never* set these socket options 702 for TCP/IP sockets. The only case for setting these options is testing 703 multicasting via multiple UDP sockets. 704 705 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 706 on Windows), it will be set on the socket. This will prevent anyone else 707 from bind()'ing to our host/port for the duration of the test. 708 """ 709 710 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 711 if hasattr(socket, 'SO_REUSEADDR'): 712 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 713 raise TestFailed("tests should never set the SO_REUSEADDR " \ 714 "socket option on TCP/IP sockets!") 715 if hasattr(socket, 'SO_REUSEPORT'): 716 try: 717 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 718 raise TestFailed("tests should never set the SO_REUSEPORT " \ 719 "socket option on TCP/IP sockets!") 720 except OSError: 721 # Python's socket module was compiled using modern headers 722 # thus defining SO_REUSEPORT but this process is running 723 # under an older kernel that does not support SO_REUSEPORT. 724 pass 725 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 726 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 727 728 sock.bind((host, 0)) 729 port = sock.getsockname()[1] 730 return port 731 732def bind_unix_socket(sock, addr): 733 """Bind a unix socket, raising SkipTest if PermissionError is raised.""" 734 assert sock.family == socket.AF_UNIX 735 try: 736 sock.bind(addr) 737 except PermissionError: 738 sock.close() 739 raise unittest.SkipTest('cannot bind AF_UNIX sockets') 740 741def _is_ipv6_enabled(): 742 """Check whether IPv6 is enabled on this host.""" 743 if socket.has_ipv6: 744 sock = None 745 try: 746 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 747 sock.bind((HOSTv6, 0)) 748 return True 749 except OSError: 750 pass 751 finally: 752 if sock: 753 sock.close() 754 return False 755 756IPV6_ENABLED = _is_ipv6_enabled() 757 758def system_must_validate_cert(f): 759 """Skip the test on TLS certificate validation failures.""" 760 @functools.wraps(f) 761 def dec(*args, **kwargs): 762 try: 763 f(*args, **kwargs) 764 except OSError as e: 765 if "CERTIFICATE_VERIFY_FAILED" in str(e): 766 raise unittest.SkipTest("system does not contain " 767 "necessary certificates") 768 raise 769 return dec 770 771# A constant likely larger than the underlying OS pipe buffer size, to 772# make writes blocking. 773# Windows limit seems to be around 512 B, and many Unix kernels have a 774# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 775# (see issue #17835 for a discussion of this number). 776PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 777 778# A constant likely larger than the underlying OS socket buffer size, to make 779# writes blocking. 780# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 781# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 782# for a discussion of this number). 783SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 784 785# decorator for skipping tests on non-IEEE 754 platforms 786requires_IEEE_754 = unittest.skipUnless( 787 float.__getformat__("double").startswith("IEEE"), 788 "test requires IEEE 754 doubles") 789 790requires_zlib = unittest.skipUnless(zlib, 'requires zlib') 791 792requires_gzip = unittest.skipUnless(gzip, 'requires gzip') 793 794requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') 795 796requires_lzma = unittest.skipUnless(lzma, 'requires lzma') 797 798is_jython = sys.platform.startswith('java') 799 800is_android = hasattr(sys, 'getandroidapilevel') 801 802if sys.platform != 'win32': 803 unix_shell = '/system/bin/sh' if is_android else '/bin/sh' 804else: 805 unix_shell = None 806 807# Filename used for testing 808if os.name == 'java': 809 # Jython disallows @ in module names 810 TESTFN = '$test' 811else: 812 TESTFN = '@test' 813 814# Disambiguate TESTFN for parallel testing, while letting it remain a valid 815# module name. 816TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 817 818# Define the URL of a dedicated HTTP server for the network tests. 819# The URL must use clear-text HTTP: no redirection to encrypted HTTPS. 820TEST_HTTP_URL = "http://www.pythontest.net" 821 822# FS_NONASCII: non-ASCII character encodable by os.fsencode(), 823# or None if there is no such character. 824FS_NONASCII = None 825for character in ( 826 # First try printable and common characters to have a readable filename. 827 # For each character, the encoding list are just example of encodings able 828 # to encode the character (the list is not exhaustive). 829 830 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 831 '\u00E6', 832 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 833 '\u0130', 834 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 835 '\u0141', 836 # U+03C6 (Greek Small Letter Phi): cp1253 837 '\u03C6', 838 # U+041A (Cyrillic Capital Letter Ka): cp1251 839 '\u041A', 840 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 841 '\u05D0', 842 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 843 '\u060C', 844 # U+062A (Arabic Letter Teh): cp720 845 '\u062A', 846 # U+0E01 (Thai Character Ko Kai): cp874 847 '\u0E01', 848 849 # Then try more "special" characters. "special" because they may be 850 # interpreted or displayed differently depending on the exact locale 851 # encoding and the font. 852 853 # U+00A0 (No-Break Space) 854 '\u00A0', 855 # U+20AC (Euro Sign) 856 '\u20AC', 857): 858 try: 859 # If Python is set up to use the legacy 'mbcs' in Windows, 860 # 'replace' error mode is used, and encode() returns b'?' 861 # for characters missing in the ANSI codepage 862 if os.fsdecode(os.fsencode(character)) != character: 863 raise UnicodeError 864 except UnicodeError: 865 pass 866 else: 867 FS_NONASCII = character 868 break 869 870# TESTFN_UNICODE is a non-ascii filename 871TESTFN_UNICODE = TESTFN + "-\xe0\xf2\u0258\u0141\u011f" 872if sys.platform == 'darwin': 873 # In Mac OS X's VFS API file names are, by definition, canonically 874 # decomposed Unicode, encoded using UTF-8. See QA1173: 875 # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html 876 import unicodedata 877 TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE) 878TESTFN_ENCODING = sys.getfilesystemencoding() 879 880# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be 881# encoded by the filesystem encoding (in strict mode). It can be None if we 882# cannot generate such filename. 883TESTFN_UNENCODABLE = None 884if os.name == 'nt': 885 # skip win32s (0) or Windows 9x/ME (1) 886 if sys.getwindowsversion().platform >= 2: 887 # Different kinds of characters from various languages to minimize the 888 # probability that the whole name is encodable to MBCS (issue #9819) 889 TESTFN_UNENCODABLE = TESTFN + "-\u5171\u0141\u2661\u0363\uDC80" 890 try: 891 TESTFN_UNENCODABLE.encode(TESTFN_ENCODING) 892 except UnicodeEncodeError: 893 pass 894 else: 895 print('WARNING: The filename %r CAN be encoded by the filesystem encoding (%s). ' 896 'Unicode filename tests may not be effective' 897 % (TESTFN_UNENCODABLE, TESTFN_ENCODING)) 898 TESTFN_UNENCODABLE = None 899# Mac OS X denies unencodable filenames (invalid utf-8) 900elif sys.platform != 'darwin': 901 try: 902 # ascii and utf-8 cannot encode the byte 0xff 903 b'\xff'.decode(TESTFN_ENCODING) 904 except UnicodeDecodeError: 905 # 0xff will be encoded using the surrogate character u+DCFF 906 TESTFN_UNENCODABLE = TESTFN \ 907 + b'-\xff'.decode(TESTFN_ENCODING, 'surrogateescape') 908 else: 909 # File system encoding (eg. ISO-8859-* encodings) can encode 910 # the byte 0xff. Skip some unicode filename tests. 911 pass 912 913# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be 914# decoded from the filesystem encoding (in strict mode). It can be None if we 915# cannot generate such filename (ex: the latin1 encoding can decode any byte 916# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks 917# to the surrogateescape error handler (PEP 383), but not from the filesystem 918# encoding in strict mode. 919TESTFN_UNDECODABLE = None 920for name in ( 921 # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows 922 # accepts it to create a file or a directory, or don't accept to enter to 923 # such directory (when the bytes name is used). So test b'\xe7' first: it is 924 # not decodable from cp932. 925 b'\xe7w\xf0', 926 # undecodable from ASCII, UTF-8 927 b'\xff', 928 # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856 929 # and cp857 930 b'\xae\xd5' 931 # undecodable from UTF-8 (UNIX and Mac OS X) 932 b'\xed\xb2\x80', b'\xed\xb4\x80', 933 # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252, 934 # cp1253, cp1254, cp1255, cp1257, cp1258 935 b'\x81\x98', 936): 937 try: 938 name.decode(TESTFN_ENCODING) 939 except UnicodeDecodeError: 940 TESTFN_UNDECODABLE = os.fsencode(TESTFN) + name 941 break 942 943if FS_NONASCII: 944 TESTFN_NONASCII = TESTFN + '-' + FS_NONASCII 945else: 946 TESTFN_NONASCII = None 947 948# Save the initial cwd 949SAVEDCWD = os.getcwd() 950 951# Set by libregrtest/main.py so we can skip tests that are not 952# useful for PGO 953PGO = False 954 955@contextlib.contextmanager 956def temp_dir(path=None, quiet=False): 957 """Return a context manager that creates a temporary directory. 958 959 Arguments: 960 961 path: the directory to create temporarily. If omitted or None, 962 defaults to creating a temporary directory using tempfile.mkdtemp. 963 964 quiet: if False (the default), the context manager raises an exception 965 on error. Otherwise, if the path is specified and cannot be 966 created, only a warning is issued. 967 968 """ 969 dir_created = False 970 if path is None: 971 path = tempfile.mkdtemp() 972 dir_created = True 973 path = os.path.realpath(path) 974 else: 975 try: 976 os.mkdir(path) 977 dir_created = True 978 except OSError as exc: 979 if not quiet: 980 raise 981 warnings.warn(f'tests may fail, unable to create ' 982 f'temporary directory {path!r}: {exc}', 983 RuntimeWarning, stacklevel=3) 984 if dir_created: 985 pid = os.getpid() 986 try: 987 yield path 988 finally: 989 # In case the process forks, let only the parent remove the 990 # directory. The child has a diffent process id. (bpo-30028) 991 if dir_created and pid == os.getpid(): 992 rmtree(path) 993 994@contextlib.contextmanager 995def change_cwd(path, quiet=False): 996 """Return a context manager that changes the current working directory. 997 998 Arguments: 999 1000 path: the directory to use as the temporary current working directory. 1001 1002 quiet: if False (the default), the context manager raises an exception 1003 on error. Otherwise, it issues only a warning and keeps the current 1004 working directory the same. 1005 1006 """ 1007 saved_dir = os.getcwd() 1008 try: 1009 os.chdir(path) 1010 except OSError as exc: 1011 if not quiet: 1012 raise 1013 warnings.warn(f'tests may fail, unable to change the current working ' 1014 f'directory to {path!r}: {exc}', 1015 RuntimeWarning, stacklevel=3) 1016 try: 1017 yield os.getcwd() 1018 finally: 1019 os.chdir(saved_dir) 1020 1021 1022@contextlib.contextmanager 1023def temp_cwd(name='tempcwd', quiet=False): 1024 """ 1025 Context manager that temporarily creates and changes the CWD. 1026 1027 The function temporarily changes the current working directory 1028 after creating a temporary directory in the current directory with 1029 name *name*. If *name* is None, the temporary directory is 1030 created using tempfile.mkdtemp. 1031 1032 If *quiet* is False (default) and it is not possible to 1033 create or change the CWD, an error is raised. If *quiet* is True, 1034 only a warning is raised and the original CWD is used. 1035 1036 """ 1037 with temp_dir(path=name, quiet=quiet) as temp_path: 1038 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 1039 yield cwd_dir 1040 1041if hasattr(os, "umask"): 1042 @contextlib.contextmanager 1043 def temp_umask(umask): 1044 """Context manager that temporarily sets the process umask.""" 1045 oldmask = os.umask(umask) 1046 try: 1047 yield 1048 finally: 1049 os.umask(oldmask) 1050 1051# TEST_HOME_DIR refers to the top level directory of the "test" package 1052# that contains Python's regression test suite 1053TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 1054TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 1055 1056# TEST_DATA_DIR is used as a target download location for remote resources 1057TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 1058 1059def findfile(filename, subdir=None): 1060 """Try to find a file on sys.path or in the test directory. If it is not 1061 found the argument passed to the function is returned (this does not 1062 necessarily signal failure; could still be the legitimate path). 1063 1064 Setting *subdir* indicates a relative path to use to find the file 1065 rather than looking directly in the path directories. 1066 """ 1067 if os.path.isabs(filename): 1068 return filename 1069 if subdir is not None: 1070 filename = os.path.join(subdir, filename) 1071 path = [TEST_HOME_DIR] + sys.path 1072 for dn in path: 1073 fn = os.path.join(dn, filename) 1074 if os.path.exists(fn): return fn 1075 return filename 1076 1077def create_empty_file(filename): 1078 """Create an empty file. If the file already exists, truncate it.""" 1079 fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) 1080 os.close(fd) 1081 1082def sortdict(dict): 1083 "Like repr(dict), but in sorted order." 1084 items = sorted(dict.items()) 1085 reprpairs = ["%r: %r" % pair for pair in items] 1086 withcommas = ", ".join(reprpairs) 1087 return "{%s}" % withcommas 1088 1089def make_bad_fd(): 1090 """ 1091 Create an invalid file descriptor by opening and closing a file and return 1092 its fd. 1093 """ 1094 file = open(TESTFN, "wb") 1095 try: 1096 return file.fileno() 1097 finally: 1098 file.close() 1099 unlink(TESTFN) 1100 1101def check_syntax_error(testcase, statement, *, lineno=None, offset=None): 1102 with testcase.assertRaises(SyntaxError) as cm: 1103 compile(statement, '<test string>', 'exec') 1104 err = cm.exception 1105 testcase.assertIsNotNone(err.lineno) 1106 if lineno is not None: 1107 testcase.assertEqual(err.lineno, lineno) 1108 testcase.assertIsNotNone(err.offset) 1109 if offset is not None: 1110 testcase.assertEqual(err.offset, offset) 1111 1112def open_urlresource(url, *args, **kw): 1113 import urllib.request, urllib.parse 1114 1115 check = kw.pop('check', None) 1116 1117 filename = urllib.parse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 1118 1119 fn = os.path.join(TEST_DATA_DIR, filename) 1120 1121 def check_valid_file(fn): 1122 f = open(fn, *args, **kw) 1123 if check is None: 1124 return f 1125 elif check(f): 1126 f.seek(0) 1127 return f 1128 f.close() 1129 1130 if os.path.exists(fn): 1131 f = check_valid_file(fn) 1132 if f is not None: 1133 return f 1134 unlink(fn) 1135 1136 # Verify the requirement before downloading the file 1137 requires('urlfetch') 1138 1139 if verbose: 1140 print('\tfetching %s ...' % url, file=get_original_stdout()) 1141 opener = urllib.request.build_opener() 1142 if gzip: 1143 opener.addheaders.append(('Accept-Encoding', 'gzip')) 1144 f = opener.open(url, timeout=15) 1145 if gzip and f.headers.get('Content-Encoding') == 'gzip': 1146 f = gzip.GzipFile(fileobj=f) 1147 try: 1148 with open(fn, "wb") as out: 1149 s = f.read() 1150 while s: 1151 out.write(s) 1152 s = f.read() 1153 finally: 1154 f.close() 1155 1156 f = check_valid_file(fn) 1157 if f is not None: 1158 return f 1159 raise TestFailed('invalid resource %r' % fn) 1160 1161 1162class WarningsRecorder(object): 1163 """Convenience wrapper for the warnings list returned on 1164 entry to the warnings.catch_warnings() context manager. 1165 """ 1166 def __init__(self, warnings_list): 1167 self._warnings = warnings_list 1168 self._last = 0 1169 1170 def __getattr__(self, attr): 1171 if len(self._warnings) > self._last: 1172 return getattr(self._warnings[-1], attr) 1173 elif attr in warnings.WarningMessage._WARNING_DETAILS: 1174 return None 1175 raise AttributeError("%r has no attribute %r" % (self, attr)) 1176 1177 @property 1178 def warnings(self): 1179 return self._warnings[self._last:] 1180 1181 def reset(self): 1182 self._last = len(self._warnings) 1183 1184 1185def _filterwarnings(filters, quiet=False): 1186 """Catch the warnings, then check if all the expected 1187 warnings have been raised and re-raise unexpected warnings. 1188 If 'quiet' is True, only re-raise the unexpected warnings. 1189 """ 1190 # Clear the warning registry of the calling module 1191 # in order to re-raise the warnings. 1192 frame = sys._getframe(2) 1193 registry = frame.f_globals.get('__warningregistry__') 1194 if registry: 1195 registry.clear() 1196 with warnings.catch_warnings(record=True) as w: 1197 # Set filter "always" to record all warnings. Because 1198 # test_warnings swap the module, we need to look up in 1199 # the sys.modules dictionary. 1200 sys.modules['warnings'].simplefilter("always") 1201 yield WarningsRecorder(w) 1202 # Filter the recorded warnings 1203 reraise = list(w) 1204 missing = [] 1205 for msg, cat in filters: 1206 seen = False 1207 for w in reraise[:]: 1208 warning = w.message 1209 # Filter out the matching messages 1210 if (re.match(msg, str(warning), re.I) and 1211 issubclass(warning.__class__, cat)): 1212 seen = True 1213 reraise.remove(w) 1214 if not seen and not quiet: 1215 # This filter caught nothing 1216 missing.append((msg, cat.__name__)) 1217 if reraise: 1218 raise AssertionError("unhandled warning %s" % reraise[0]) 1219 if missing: 1220 raise AssertionError("filter (%r, %s) did not catch any warning" % 1221 missing[0]) 1222 1223 1224@contextlib.contextmanager 1225def check_warnings(*filters, **kwargs): 1226 """Context manager to silence warnings. 1227 1228 Accept 2-tuples as positional arguments: 1229 ("message regexp", WarningCategory) 1230 1231 Optional argument: 1232 - if 'quiet' is True, it does not fail if a filter catches nothing 1233 (default True without argument, 1234 default False if some filters are defined) 1235 1236 Without argument, it defaults to: 1237 check_warnings(("", Warning), quiet=True) 1238 """ 1239 quiet = kwargs.get('quiet') 1240 if not filters: 1241 filters = (("", Warning),) 1242 # Preserve backward compatibility 1243 if quiet is None: 1244 quiet = True 1245 return _filterwarnings(filters, quiet) 1246 1247 1248@contextlib.contextmanager 1249def check_no_resource_warning(testcase): 1250 """Context manager to check that no ResourceWarning is emitted. 1251 1252 Usage: 1253 1254 with check_no_resource_warning(self): 1255 f = open(...) 1256 ... 1257 del f 1258 1259 You must remove the object which may emit ResourceWarning before 1260 the end of the context manager. 1261 """ 1262 with warnings.catch_warnings(record=True) as warns: 1263 warnings.filterwarnings('always', category=ResourceWarning) 1264 yield 1265 gc_collect() 1266 testcase.assertEqual(warns, []) 1267 1268 1269class CleanImport(object): 1270 """Context manager to force import to return a new module reference. 1271 1272 This is useful for testing module-level behaviours, such as 1273 the emission of a DeprecationWarning on import. 1274 1275 Use like this: 1276 1277 with CleanImport("foo"): 1278 importlib.import_module("foo") # new reference 1279 """ 1280 1281 def __init__(self, *module_names): 1282 self.original_modules = sys.modules.copy() 1283 for module_name in module_names: 1284 if module_name in sys.modules: 1285 module = sys.modules[module_name] 1286 # It is possible that module_name is just an alias for 1287 # another module (e.g. stub for modules renamed in 3.x). 1288 # In that case, we also need delete the real module to clear 1289 # the import cache. 1290 if module.__name__ != module_name: 1291 del sys.modules[module.__name__] 1292 del sys.modules[module_name] 1293 1294 def __enter__(self): 1295 return self 1296 1297 def __exit__(self, *ignore_exc): 1298 sys.modules.update(self.original_modules) 1299 1300 1301class EnvironmentVarGuard(collections.abc.MutableMapping): 1302 1303 """Class to help protect the environment variable properly. Can be used as 1304 a context manager.""" 1305 1306 def __init__(self): 1307 self._environ = os.environ 1308 self._changed = {} 1309 1310 def __getitem__(self, envvar): 1311 return self._environ[envvar] 1312 1313 def __setitem__(self, envvar, value): 1314 # Remember the initial value on the first access 1315 if envvar not in self._changed: 1316 self._changed[envvar] = self._environ.get(envvar) 1317 self._environ[envvar] = value 1318 1319 def __delitem__(self, envvar): 1320 # Remember the initial value on the first access 1321 if envvar not in self._changed: 1322 self._changed[envvar] = self._environ.get(envvar) 1323 if envvar in self._environ: 1324 del self._environ[envvar] 1325 1326 def keys(self): 1327 return self._environ.keys() 1328 1329 def __iter__(self): 1330 return iter(self._environ) 1331 1332 def __len__(self): 1333 return len(self._environ) 1334 1335 def set(self, envvar, value): 1336 self[envvar] = value 1337 1338 def unset(self, envvar): 1339 del self[envvar] 1340 1341 def __enter__(self): 1342 return self 1343 1344 def __exit__(self, *ignore_exc): 1345 for (k, v) in self._changed.items(): 1346 if v is None: 1347 if k in self._environ: 1348 del self._environ[k] 1349 else: 1350 self._environ[k] = v 1351 os.environ = self._environ 1352 1353 1354class DirsOnSysPath(object): 1355 """Context manager to temporarily add directories to sys.path. 1356 1357 This makes a copy of sys.path, appends any directories given 1358 as positional arguments, then reverts sys.path to the copied 1359 settings when the context ends. 1360 1361 Note that *all* sys.path modifications in the body of the 1362 context manager, including replacement of the object, 1363 will be reverted at the end of the block. 1364 """ 1365 1366 def __init__(self, *paths): 1367 self.original_value = sys.path[:] 1368 self.original_object = sys.path 1369 sys.path.extend(paths) 1370 1371 def __enter__(self): 1372 return self 1373 1374 def __exit__(self, *ignore_exc): 1375 sys.path = self.original_object 1376 sys.path[:] = self.original_value 1377 1378 1379class TransientResource(object): 1380 1381 """Raise ResourceDenied if an exception is raised while the context manager 1382 is in effect that matches the specified exception and attributes.""" 1383 1384 def __init__(self, exc, **kwargs): 1385 self.exc = exc 1386 self.attrs = kwargs 1387 1388 def __enter__(self): 1389 return self 1390 1391 def __exit__(self, type_=None, value=None, traceback=None): 1392 """If type_ is a subclass of self.exc and value has attributes matching 1393 self.attrs, raise ResourceDenied. Otherwise let the exception 1394 propagate (if any).""" 1395 if type_ is not None and issubclass(self.exc, type_): 1396 for attr, attr_value in self.attrs.items(): 1397 if not hasattr(value, attr): 1398 break 1399 if getattr(value, attr) != attr_value: 1400 break 1401 else: 1402 raise ResourceDenied("an optional resource is not available") 1403 1404# Context managers that raise ResourceDenied when various issues 1405# with the Internet connection manifest themselves as exceptions. 1406# XXX deprecate these and use transient_internet() instead 1407time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 1408socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1409ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 1410 1411 1412@contextlib.contextmanager 1413def transient_internet(resource_name, *, timeout=30.0, errnos=()): 1414 """Return a context manager that raises ResourceDenied when various issues 1415 with the Internet connection manifest themselves as exceptions.""" 1416 default_errnos = [ 1417 ('ECONNREFUSED', 111), 1418 ('ECONNRESET', 104), 1419 ('EHOSTUNREACH', 113), 1420 ('ENETUNREACH', 101), 1421 ('ETIMEDOUT', 110), 1422 # socket.create_connection() fails randomly with 1423 # EADDRNOTAVAIL on Travis CI. 1424 ('EADDRNOTAVAIL', 99), 1425 ] 1426 default_gai_errnos = [ 1427 ('EAI_AGAIN', -3), 1428 ('EAI_FAIL', -4), 1429 ('EAI_NONAME', -2), 1430 ('EAI_NODATA', -5), 1431 # Encountered when trying to resolve IPv6-only hostnames 1432 ('WSANO_DATA', 11004), 1433 ] 1434 1435 denied = ResourceDenied("Resource %r is not available" % resource_name) 1436 captured_errnos = errnos 1437 gai_errnos = [] 1438 if not captured_errnos: 1439 captured_errnos = [getattr(errno, name, num) 1440 for (name, num) in default_errnos] 1441 gai_errnos = [getattr(socket, name, num) 1442 for (name, num) in default_gai_errnos] 1443 1444 def filter_error(err): 1445 n = getattr(err, 'errno', None) 1446 if (isinstance(err, socket.timeout) or 1447 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1448 (isinstance(err, urllib.error.HTTPError) and 1449 500 <= err.code <= 599) or 1450 (isinstance(err, urllib.error.URLError) and 1451 (("ConnectionRefusedError" in err.reason) or 1452 ("TimeoutError" in err.reason) or 1453 ("EOFError" in err.reason))) or 1454 n in captured_errnos): 1455 if not verbose: 1456 sys.stderr.write(denied.args[0] + "\n") 1457 raise denied from err 1458 1459 old_timeout = socket.getdefaulttimeout() 1460 try: 1461 if timeout is not None: 1462 socket.setdefaulttimeout(timeout) 1463 yield 1464 except nntplib.NNTPTemporaryError as err: 1465 if verbose: 1466 sys.stderr.write(denied.args[0] + "\n") 1467 raise denied from err 1468 except OSError as err: 1469 # urllib can wrap original socket errors multiple times (!), we must 1470 # unwrap to get at the original error. 1471 while True: 1472 a = err.args 1473 if len(a) >= 1 and isinstance(a[0], OSError): 1474 err = a[0] 1475 # The error can also be wrapped as args[1]: 1476 # except socket.error as msg: 1477 # raise OSError('socket error', msg).with_traceback(sys.exc_info()[2]) 1478 elif len(a) >= 2 and isinstance(a[1], OSError): 1479 err = a[1] 1480 else: 1481 break 1482 filter_error(err) 1483 raise 1484 # XXX should we catch generic exceptions and look for their 1485 # __cause__ or __context__? 1486 finally: 1487 socket.setdefaulttimeout(old_timeout) 1488 1489 1490@contextlib.contextmanager 1491def captured_output(stream_name): 1492 """Return a context manager used by captured_stdout/stdin/stderr 1493 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1494 import io 1495 orig_stdout = getattr(sys, stream_name) 1496 setattr(sys, stream_name, io.StringIO()) 1497 try: 1498 yield getattr(sys, stream_name) 1499 finally: 1500 setattr(sys, stream_name, orig_stdout) 1501 1502def captured_stdout(): 1503 """Capture the output of sys.stdout: 1504 1505 with captured_stdout() as stdout: 1506 print("hello") 1507 self.assertEqual(stdout.getvalue(), "hello\\n") 1508 """ 1509 return captured_output("stdout") 1510 1511def captured_stderr(): 1512 """Capture the output of sys.stderr: 1513 1514 with captured_stderr() as stderr: 1515 print("hello", file=sys.stderr) 1516 self.assertEqual(stderr.getvalue(), "hello\\n") 1517 """ 1518 return captured_output("stderr") 1519 1520def captured_stdin(): 1521 """Capture the input to sys.stdin: 1522 1523 with captured_stdin() as stdin: 1524 stdin.write('hello\\n') 1525 stdin.seek(0) 1526 # call test code that consumes from sys.stdin 1527 captured = input() 1528 self.assertEqual(captured, "hello") 1529 """ 1530 return captured_output("stdin") 1531 1532 1533def gc_collect(): 1534 """Force as many objects as possible to be collected. 1535 1536 In non-CPython implementations of Python, this is needed because timely 1537 deallocation is not guaranteed by the garbage collector. (Even in CPython 1538 this can be the case in case of reference cycles.) This means that __del__ 1539 methods may be called later than expected and weakrefs may remain alive for 1540 longer than expected. This function tries its best to force all garbage 1541 objects to disappear. 1542 """ 1543 gc.collect() 1544 if is_jython: 1545 time.sleep(0.1) 1546 gc.collect() 1547 gc.collect() 1548 1549@contextlib.contextmanager 1550def disable_gc(): 1551 have_gc = gc.isenabled() 1552 gc.disable() 1553 try: 1554 yield 1555 finally: 1556 if have_gc: 1557 gc.enable() 1558 1559 1560def python_is_optimized(): 1561 """Find if Python was built with optimizations.""" 1562 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1563 final_opt = "" 1564 for opt in cflags.split(): 1565 if opt.startswith('-O'): 1566 final_opt = opt 1567 return final_opt not in ('', '-O0', '-Og') 1568 1569 1570_header = 'nP' 1571_align = '0n' 1572if hasattr(sys, "gettotalrefcount"): 1573 _header = '2P' + _header 1574 _align = '0P' 1575_vheader = _header + 'n' 1576 1577def calcobjsize(fmt): 1578 return struct.calcsize(_header + fmt + _align) 1579 1580def calcvobjsize(fmt): 1581 return struct.calcsize(_vheader + fmt + _align) 1582 1583 1584_TPFLAGS_HAVE_GC = 1<<14 1585_TPFLAGS_HEAPTYPE = 1<<9 1586 1587def check_sizeof(test, o, size): 1588 import _testcapi 1589 result = sys.getsizeof(o) 1590 # add GC header size 1591 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1592 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1593 size += _testcapi.SIZEOF_PYGC_HEAD 1594 msg = 'wrong size for %s: got %d, expected %d' \ 1595 % (type(o), result, size) 1596 test.assertEqual(result, size, msg) 1597 1598#======================================================================= 1599# Decorator for running a function in a different locale, correctly resetting 1600# it afterwards. 1601 1602def run_with_locale(catstr, *locales): 1603 def decorator(func): 1604 def inner(*args, **kwds): 1605 try: 1606 import locale 1607 category = getattr(locale, catstr) 1608 orig_locale = locale.setlocale(category) 1609 except AttributeError: 1610 # if the test author gives us an invalid category string 1611 raise 1612 except: 1613 # cannot retrieve original locale, so do nothing 1614 locale = orig_locale = None 1615 else: 1616 for loc in locales: 1617 try: 1618 locale.setlocale(category, loc) 1619 break 1620 except: 1621 pass 1622 1623 # now run the function, resetting the locale on exceptions 1624 try: 1625 return func(*args, **kwds) 1626 finally: 1627 if locale and orig_locale: 1628 locale.setlocale(category, orig_locale) 1629 inner.__name__ = func.__name__ 1630 inner.__doc__ = func.__doc__ 1631 return inner 1632 return decorator 1633 1634#======================================================================= 1635# Decorator for running a function in a specific timezone, correctly 1636# resetting it afterwards. 1637 1638def run_with_tz(tz): 1639 def decorator(func): 1640 def inner(*args, **kwds): 1641 try: 1642 tzset = time.tzset 1643 except AttributeError: 1644 raise unittest.SkipTest("tzset required") 1645 if 'TZ' in os.environ: 1646 orig_tz = os.environ['TZ'] 1647 else: 1648 orig_tz = None 1649 os.environ['TZ'] = tz 1650 tzset() 1651 1652 # now run the function, resetting the tz on exceptions 1653 try: 1654 return func(*args, **kwds) 1655 finally: 1656 if orig_tz is None: 1657 del os.environ['TZ'] 1658 else: 1659 os.environ['TZ'] = orig_tz 1660 time.tzset() 1661 1662 inner.__name__ = func.__name__ 1663 inner.__doc__ = func.__doc__ 1664 return inner 1665 return decorator 1666 1667#======================================================================= 1668# Big-memory-test support. Separate from 'resources' because memory use 1669# should be configurable. 1670 1671# Some handy shorthands. Note that these are used for byte-limits as well 1672# as size-limits, in the various bigmem tests 1673_1M = 1024*1024 1674_1G = 1024 * _1M 1675_2G = 2 * _1G 1676_4G = 4 * _1G 1677 1678MAX_Py_ssize_t = sys.maxsize 1679 1680def set_memlimit(limit): 1681 global max_memuse 1682 global real_max_memuse 1683 sizes = { 1684 'k': 1024, 1685 'm': _1M, 1686 'g': _1G, 1687 't': 1024*_1G, 1688 } 1689 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1690 re.IGNORECASE | re.VERBOSE) 1691 if m is None: 1692 raise ValueError('Invalid memory limit %r' % (limit,)) 1693 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1694 real_max_memuse = memlimit 1695 if memlimit > MAX_Py_ssize_t: 1696 memlimit = MAX_Py_ssize_t 1697 if memlimit < _2G - 1: 1698 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1699 max_memuse = memlimit 1700 1701class _MemoryWatchdog: 1702 """An object which periodically watches the process' memory consumption 1703 and prints it out. 1704 """ 1705 1706 def __init__(self): 1707 self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid()) 1708 self.started = False 1709 1710 def start(self): 1711 try: 1712 f = open(self.procfile, 'r') 1713 except OSError as e: 1714 warnings.warn('/proc not available for stats: {}'.format(e), 1715 RuntimeWarning) 1716 sys.stderr.flush() 1717 return 1718 1719 watchdog_script = findfile("memory_watchdog.py") 1720 self.mem_watchdog = subprocess.Popen([sys.executable, watchdog_script], 1721 stdin=f, stderr=subprocess.DEVNULL) 1722 f.close() 1723 self.started = True 1724 1725 def stop(self): 1726 if self.started: 1727 self.mem_watchdog.terminate() 1728 self.mem_watchdog.wait() 1729 1730 1731def bigmemtest(size, memuse, dry_run=True): 1732 """Decorator for bigmem tests. 1733 1734 'size' is a requested size for the test (in arbitrary, test-interpreted 1735 units.) 'memuse' is the number of bytes per unit for the test, or a good 1736 estimate of it. For example, a test that needs two byte buffers, of 4 GiB 1737 each, could be decorated with @bigmemtest(size=_4G, memuse=2). 1738 1739 The 'size' argument is normally passed to the decorated test method as an 1740 extra argument. If 'dry_run' is true, the value passed to the test method 1741 may be less than the requested value. If 'dry_run' is false, it means the 1742 test doesn't support dummy runs when -M is not specified. 1743 """ 1744 def decorator(f): 1745 def wrapper(self): 1746 size = wrapper.size 1747 memuse = wrapper.memuse 1748 if not real_max_memuse: 1749 maxsize = 5147 1750 else: 1751 maxsize = size 1752 1753 if ((real_max_memuse or not dry_run) 1754 and real_max_memuse < maxsize * memuse): 1755 raise unittest.SkipTest( 1756 "not enough memory: %.1fG minimum needed" 1757 % (size * memuse / (1024 ** 3))) 1758 1759 if real_max_memuse and verbose: 1760 print() 1761 print(" ... expected peak memory use: {peak:.1f}G" 1762 .format(peak=size * memuse / (1024 ** 3))) 1763 watchdog = _MemoryWatchdog() 1764 watchdog.start() 1765 else: 1766 watchdog = None 1767 1768 try: 1769 return f(self, maxsize) 1770 finally: 1771 if watchdog: 1772 watchdog.stop() 1773 1774 wrapper.size = size 1775 wrapper.memuse = memuse 1776 return wrapper 1777 return decorator 1778 1779def bigaddrspacetest(f): 1780 """Decorator for tests that fill the address space.""" 1781 def wrapper(self): 1782 if max_memuse < MAX_Py_ssize_t: 1783 if MAX_Py_ssize_t >= 2**63 - 1 and max_memuse >= 2**31: 1784 raise unittest.SkipTest( 1785 "not enough memory: try a 32-bit build instead") 1786 else: 1787 raise unittest.SkipTest( 1788 "not enough memory: %.1fG minimum needed" 1789 % (MAX_Py_ssize_t / (1024 ** 3))) 1790 else: 1791 return f(self) 1792 return wrapper 1793 1794#======================================================================= 1795# unittest integration. 1796 1797class BasicTestRunner: 1798 def run(self, test): 1799 result = unittest.TestResult() 1800 test(result) 1801 return result 1802 1803def _id(obj): 1804 return obj 1805 1806def requires_resource(resource): 1807 if resource == 'gui' and not _is_gui_available(): 1808 return unittest.skip(_is_gui_available.reason) 1809 if is_resource_enabled(resource): 1810 return _id 1811 else: 1812 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1813 1814def cpython_only(test): 1815 """ 1816 Decorator for tests only applicable on CPython. 1817 """ 1818 return impl_detail(cpython=True)(test) 1819 1820def impl_detail(msg=None, **guards): 1821 if check_impl_detail(**guards): 1822 return _id 1823 if msg is None: 1824 guardnames, default = _parse_guards(guards) 1825 if default: 1826 msg = "implementation detail not available on {0}" 1827 else: 1828 msg = "implementation detail specific to {0}" 1829 guardnames = sorted(guardnames.keys()) 1830 msg = msg.format(' or '.join(guardnames)) 1831 return unittest.skip(msg) 1832 1833def _parse_guards(guards): 1834 # Returns a tuple ({platform_name: run_me}, default_value) 1835 if not guards: 1836 return ({'cpython': True}, False) 1837 is_true = list(guards.values())[0] 1838 assert list(guards.values()) == [is_true] * len(guards) # all True or all False 1839 return (guards, not is_true) 1840 1841# Use the following check to guard CPython's implementation-specific tests -- 1842# or to run them only on the implementation(s) guarded by the arguments. 1843def check_impl_detail(**guards): 1844 """This function returns True or False depending on the host platform. 1845 Examples: 1846 if check_impl_detail(): # only on CPython (default) 1847 if check_impl_detail(jython=True): # only on Jython 1848 if check_impl_detail(cpython=False): # everywhere except on CPython 1849 """ 1850 guards, default = _parse_guards(guards) 1851 return guards.get(platform.python_implementation().lower(), default) 1852 1853 1854def no_tracing(func): 1855 """Decorator to temporarily turn off tracing for the duration of a test.""" 1856 if not hasattr(sys, 'gettrace'): 1857 return func 1858 else: 1859 @functools.wraps(func) 1860 def wrapper(*args, **kwargs): 1861 original_trace = sys.gettrace() 1862 try: 1863 sys.settrace(None) 1864 return func(*args, **kwargs) 1865 finally: 1866 sys.settrace(original_trace) 1867 return wrapper 1868 1869 1870def refcount_test(test): 1871 """Decorator for tests which involve reference counting. 1872 1873 To start, the decorator does not run the test if is not run by CPython. 1874 After that, any trace function is unset during the test to prevent 1875 unexpected refcounts caused by the trace function. 1876 1877 """ 1878 return no_tracing(cpython_only(test)) 1879 1880 1881def _filter_suite(suite, pred): 1882 """Recursively filter test cases in a suite based on a predicate.""" 1883 newtests = [] 1884 for test in suite._tests: 1885 if isinstance(test, unittest.TestSuite): 1886 _filter_suite(test, pred) 1887 newtests.append(test) 1888 else: 1889 if pred(test): 1890 newtests.append(test) 1891 suite._tests = newtests 1892 1893def _run_suite(suite): 1894 """Run tests from a unittest.TestSuite-derived class.""" 1895 runner = get_test_runner(sys.stdout, 1896 verbosity=verbose, 1897 capture_output=(junit_xml_list is not None)) 1898 1899 result = runner.run(suite) 1900 1901 if junit_xml_list is not None: 1902 junit_xml_list.append(result.get_xml_element()) 1903 1904 if not result.testsRun and not result.skipped: 1905 raise TestDidNotRun 1906 if not result.wasSuccessful(): 1907 if len(result.errors) == 1 and not result.failures: 1908 err = result.errors[0][1] 1909 elif len(result.failures) == 1 and not result.errors: 1910 err = result.failures[0][1] 1911 else: 1912 err = "multiple errors occurred" 1913 if not verbose: err += "; run in verbose mode for details" 1914 raise TestFailed(err) 1915 1916 1917# By default, don't filter tests 1918_match_test_func = None 1919_match_test_patterns = None 1920 1921 1922def match_test(test): 1923 # Function used by support.run_unittest() and regrtest --list-cases 1924 if _match_test_func is None: 1925 return True 1926 else: 1927 return _match_test_func(test.id()) 1928 1929 1930def _is_full_match_test(pattern): 1931 # If a pattern contains at least one dot, it's considered 1932 # as a full test identifier. 1933 # Example: 'test.test_os.FileTests.test_access'. 1934 # 1935 # Reject patterns which contain fnmatch patterns: '*', '?', '[...]' 1936 # or '[!...]'. For example, reject 'test_access*'. 1937 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1938 1939 1940def set_match_tests(patterns): 1941 global _match_test_func, _match_test_patterns 1942 1943 if patterns == _match_test_patterns: 1944 # No change: no need to recompile patterns. 1945 return 1946 1947 if not patterns: 1948 func = None 1949 # set_match_tests(None) behaves as set_match_tests(()) 1950 patterns = () 1951 elif all(map(_is_full_match_test, patterns)): 1952 # Simple case: all patterns are full test identifier. 1953 # The test.bisect_cmd utility only uses such full test identifiers. 1954 func = set(patterns).__contains__ 1955 else: 1956 regex = '|'.join(map(fnmatch.translate, patterns)) 1957 # The search *is* case sensitive on purpose: 1958 # don't use flags=re.IGNORECASE 1959 regex_match = re.compile(regex).match 1960 1961 def match_test_regex(test_id): 1962 if regex_match(test_id): 1963 # The regex matchs the whole identifier like 1964 # 'test.test_os.FileTests.test_access' 1965 return True 1966 else: 1967 # Try to match parts of the test identifier. 1968 # For example, split 'test.test_os.FileTests.test_access' 1969 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1970 return any(map(regex_match, test_id.split("."))) 1971 1972 func = match_test_regex 1973 1974 # Create a copy since patterns can be mutable and so modified later 1975 _match_test_patterns = tuple(patterns) 1976 _match_test_func = func 1977 1978 1979 1980def run_unittest(*classes): 1981 """Run tests from unittest.TestCase-derived classes.""" 1982 valid_types = (unittest.TestSuite, unittest.TestCase) 1983 suite = unittest.TestSuite() 1984 for cls in classes: 1985 if isinstance(cls, str): 1986 if cls in sys.modules: 1987 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1988 else: 1989 raise ValueError("str arguments must be keys in sys.modules") 1990 elif isinstance(cls, valid_types): 1991 suite.addTest(cls) 1992 else: 1993 suite.addTest(unittest.makeSuite(cls)) 1994 _filter_suite(suite, match_test) 1995 _run_suite(suite) 1996 1997#======================================================================= 1998# Check for the presence of docstrings. 1999 2000# Rather than trying to enumerate all the cases where docstrings may be 2001# disabled, we just check for that directly 2002 2003def _check_docstrings(): 2004 """Just used to check if docstrings are enabled""" 2005 2006MISSING_C_DOCSTRINGS = (check_impl_detail() and 2007 sys.platform != 'win32' and 2008 not sysconfig.get_config_var('WITH_DOC_STRINGS')) 2009 2010HAVE_DOCSTRINGS = (_check_docstrings.__doc__ is not None and 2011 not MISSING_C_DOCSTRINGS) 2012 2013requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 2014 "test requires docstrings") 2015 2016 2017#======================================================================= 2018# doctest driver. 2019 2020def run_doctest(module, verbosity=None, optionflags=0): 2021 """Run doctest on the given module. Return (#failures, #tests). 2022 2023 If optional argument verbosity is not specified (or is None), pass 2024 support's belief about verbosity on to doctest. Else doctest's 2025 usual behavior is used (it searches sys.argv for -v). 2026 """ 2027 2028 import doctest 2029 2030 if verbosity is None: 2031 verbosity = verbose 2032 else: 2033 verbosity = None 2034 2035 f, t = doctest.testmod(module, verbose=verbosity, optionflags=optionflags) 2036 if f: 2037 raise TestFailed("%d of %d doctests failed" % (f, t)) 2038 if verbose: 2039 print('doctest (%s) ... %d tests with zero failures' % 2040 (module.__name__, t)) 2041 return f, t 2042 2043 2044#======================================================================= 2045# Support for saving and restoring the imported modules. 2046 2047def modules_setup(): 2048 return sys.modules.copy(), 2049 2050def modules_cleanup(oldmodules): 2051 # Encoders/decoders are registered permanently within the internal 2052 # codec cache. If we destroy the corresponding modules their 2053 # globals will be set to None which will trip up the cached functions. 2054 encodings = [(k, v) for k, v in sys.modules.items() 2055 if k.startswith('encodings.')] 2056 sys.modules.clear() 2057 sys.modules.update(encodings) 2058 # XXX: This kind of problem can affect more than just encodings. In particular 2059 # extension modules (such as _ssl) don't cope with reloading properly. 2060 # Really, test modules should be cleaning out the test specific modules they 2061 # know they added (ala test_runpy) rather than relying on this function (as 2062 # test_importhooks and test_pkg do currently). 2063 # Implicitly imported *real* modules should be left alone (see issue 10556). 2064 sys.modules.update(oldmodules) 2065 2066#======================================================================= 2067# Threading support to prevent reporting refleaks when running regrtest.py -R 2068 2069# Flag used by saved_test_environment of test.libregrtest.save_env, 2070# to check if a test modified the environment. The flag should be set to False 2071# before running a new test. 2072# 2073# For example, threading_cleanup() sets the flag is the function fails 2074# to cleanup threads. 2075environment_altered = False 2076 2077# NOTE: we use thread._count() rather than threading.enumerate() (or the 2078# moral equivalent thereof) because a threading.Thread object is still alive 2079# until its __bootstrap() method has returned, even after it has been 2080# unregistered from the threading module. 2081# thread._count(), on the other hand, only gets decremented *after* the 2082# __bootstrap() method has returned, which gives us reliable reference counts 2083# at the end of a test run. 2084 2085def threading_setup(): 2086 return _thread._count(), threading._dangling.copy() 2087 2088def threading_cleanup(*original_values): 2089 global environment_altered 2090 2091 _MAX_COUNT = 100 2092 2093 for count in range(_MAX_COUNT): 2094 values = _thread._count(), threading._dangling 2095 if values == original_values: 2096 break 2097 2098 if not count: 2099 # Display a warning at the first iteration 2100 environment_altered = True 2101 dangling_threads = values[1] 2102 print("Warning -- threading_cleanup() failed to cleanup " 2103 "%s threads (count: %s, dangling: %s)" 2104 % (values[0] - original_values[0], 2105 values[0], len(dangling_threads)), 2106 file=sys.stderr) 2107 for thread in dangling_threads: 2108 print(f"Dangling thread: {thread!r}", file=sys.stderr) 2109 sys.stderr.flush() 2110 2111 # Don't hold references to threads 2112 dangling_threads = None 2113 values = None 2114 2115 time.sleep(0.01) 2116 gc_collect() 2117 2118 2119def reap_threads(func): 2120 """Use this function when threads are being used. This will 2121 ensure that the threads are cleaned up even when the test fails. 2122 """ 2123 @functools.wraps(func) 2124 def decorator(*args): 2125 key = threading_setup() 2126 try: 2127 return func(*args) 2128 finally: 2129 threading_cleanup(*key) 2130 return decorator 2131 2132 2133@contextlib.contextmanager 2134def wait_threads_exit(timeout=60.0): 2135 """ 2136 bpo-31234: Context manager to wait until all threads created in the with 2137 statement exit. 2138 2139 Use _thread.count() to check if threads exited. Indirectly, wait until 2140 threads exit the internal t_bootstrap() C function of the _thread module. 2141 2142 threading_setup() and threading_cleanup() are designed to emit a warning 2143 if a test leaves running threads in the background. This context manager 2144 is designed to cleanup threads started by the _thread.start_new_thread() 2145 which doesn't allow to wait for thread exit, whereas thread.Thread has a 2146 join() method. 2147 """ 2148 old_count = _thread._count() 2149 try: 2150 yield 2151 finally: 2152 start_time = time.monotonic() 2153 deadline = start_time + timeout 2154 while True: 2155 count = _thread._count() 2156 if count <= old_count: 2157 break 2158 if time.monotonic() > deadline: 2159 dt = time.monotonic() - start_time 2160 msg = (f"wait_threads() failed to cleanup {count - old_count} " 2161 f"threads after {dt:.1f} seconds " 2162 f"(count: {count}, old count: {old_count})") 2163 raise AssertionError(msg) 2164 time.sleep(0.010) 2165 gc_collect() 2166 2167 2168def join_thread(thread, timeout=30.0): 2169 """Join a thread. Raise an AssertionError if the thread is still alive 2170 after timeout seconds. 2171 """ 2172 thread.join(timeout) 2173 if thread.is_alive(): 2174 msg = f"failed to join the thread in {timeout:.1f} seconds" 2175 raise AssertionError(msg) 2176 2177 2178def reap_children(): 2179 """Use this function at the end of test_main() whenever sub-processes 2180 are started. This will help ensure that no extra children (zombies) 2181 stick around to hog resources and create problems when looking 2182 for refleaks. 2183 """ 2184 global environment_altered 2185 2186 # Need os.waitpid(-1, os.WNOHANG): Windows is not supported 2187 if not (hasattr(os, 'waitpid') and hasattr(os, 'WNOHANG')): 2188 return 2189 2190 # Reap all our dead child processes so we don't leave zombies around. 2191 # These hog resources and might be causing some of the buildbots to die. 2192 while True: 2193 try: 2194 # Read the exit status of any child process which already completed 2195 pid, status = os.waitpid(-1, os.WNOHANG) 2196 except OSError: 2197 break 2198 2199 if pid == 0: 2200 break 2201 2202 print("Warning -- reap_children() reaped child process %s" 2203 % pid, file=sys.stderr) 2204 environment_altered = True 2205 2206 2207@contextlib.contextmanager 2208def start_threads(threads, unlock=None): 2209 threads = list(threads) 2210 started = [] 2211 try: 2212 try: 2213 for t in threads: 2214 t.start() 2215 started.append(t) 2216 except: 2217 if verbose: 2218 print("Can't start %d threads, only %d threads started" % 2219 (len(threads), len(started))) 2220 raise 2221 yield 2222 finally: 2223 try: 2224 if unlock: 2225 unlock() 2226 endtime = starttime = time.monotonic() 2227 for timeout in range(1, 16): 2228 endtime += 60 2229 for t in started: 2230 t.join(max(endtime - time.monotonic(), 0.01)) 2231 started = [t for t in started if t.is_alive()] 2232 if not started: 2233 break 2234 if verbose: 2235 print('Unable to join %d threads during a period of ' 2236 '%d minutes' % (len(started), timeout)) 2237 finally: 2238 started = [t for t in started if t.is_alive()] 2239 if started: 2240 faulthandler.dump_traceback(sys.stdout) 2241 raise AssertionError('Unable to join %d threads' % len(started)) 2242 2243@contextlib.contextmanager 2244def swap_attr(obj, attr, new_val): 2245 """Temporary swap out an attribute with a new object. 2246 2247 Usage: 2248 with swap_attr(obj, "attr", 5): 2249 ... 2250 2251 This will set obj.attr to 5 for the duration of the with: block, 2252 restoring the old value at the end of the block. If `attr` doesn't 2253 exist on `obj`, it will be created and then deleted at the end of the 2254 block. 2255 2256 The old value (or None if it doesn't exist) will be assigned to the 2257 target of the "as" clause, if there is one. 2258 """ 2259 if hasattr(obj, attr): 2260 real_val = getattr(obj, attr) 2261 setattr(obj, attr, new_val) 2262 try: 2263 yield real_val 2264 finally: 2265 setattr(obj, attr, real_val) 2266 else: 2267 setattr(obj, attr, new_val) 2268 try: 2269 yield 2270 finally: 2271 if hasattr(obj, attr): 2272 delattr(obj, attr) 2273 2274@contextlib.contextmanager 2275def swap_item(obj, item, new_val): 2276 """Temporary swap out an item with a new object. 2277 2278 Usage: 2279 with swap_item(obj, "item", 5): 2280 ... 2281 2282 This will set obj["item"] to 5 for the duration of the with: block, 2283 restoring the old value at the end of the block. If `item` doesn't 2284 exist on `obj`, it will be created and then deleted at the end of the 2285 block. 2286 2287 The old value (or None if it doesn't exist) will be assigned to the 2288 target of the "as" clause, if there is one. 2289 """ 2290 if item in obj: 2291 real_val = obj[item] 2292 obj[item] = new_val 2293 try: 2294 yield real_val 2295 finally: 2296 obj[item] = real_val 2297 else: 2298 obj[item] = new_val 2299 try: 2300 yield 2301 finally: 2302 if item in obj: 2303 del obj[item] 2304 2305def strip_python_stderr(stderr): 2306 """Strip the stderr of a Python process from potential debug output 2307 emitted by the interpreter. 2308 2309 This will typically be run on the result of the communicate() method 2310 of a subprocess.Popen object. 2311 """ 2312 stderr = re.sub(br"\[\d+ refs, \d+ blocks\]\r?\n?", b"", stderr).strip() 2313 return stderr 2314 2315requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), 2316 'types are immortal if COUNT_ALLOCS is defined') 2317 2318def args_from_interpreter_flags(): 2319 """Return a list of command-line arguments reproducing the current 2320 settings in sys.flags and sys.warnoptions.""" 2321 return subprocess._args_from_interpreter_flags() 2322 2323def optim_args_from_interpreter_flags(): 2324 """Return a list of command-line arguments reproducing the current 2325 optimization settings in sys.flags.""" 2326 return subprocess._optim_args_from_interpreter_flags() 2327 2328#============================================================ 2329# Support for assertions about logging. 2330#============================================================ 2331 2332class TestHandler(logging.handlers.BufferingHandler): 2333 def __init__(self, matcher): 2334 # BufferingHandler takes a "capacity" argument 2335 # so as to know when to flush. As we're overriding 2336 # shouldFlush anyway, we can set a capacity of zero. 2337 # You can call flush() manually to clear out the 2338 # buffer. 2339 logging.handlers.BufferingHandler.__init__(self, 0) 2340 self.matcher = matcher 2341 2342 def shouldFlush(self): 2343 return False 2344 2345 def emit(self, record): 2346 self.format(record) 2347 self.buffer.append(record.__dict__) 2348 2349 def matches(self, **kwargs): 2350 """ 2351 Look for a saved dict whose keys/values match the supplied arguments. 2352 """ 2353 result = False 2354 for d in self.buffer: 2355 if self.matcher.matches(d, **kwargs): 2356 result = True 2357 break 2358 return result 2359 2360class Matcher(object): 2361 2362 _partial_matches = ('msg', 'message') 2363 2364 def matches(self, d, **kwargs): 2365 """ 2366 Try to match a single dict with the supplied arguments. 2367 2368 Keys whose values are strings and which are in self._partial_matches 2369 will be checked for partial (i.e. substring) matches. You can extend 2370 this scheme to (for example) do regular expression matching, etc. 2371 """ 2372 result = True 2373 for k in kwargs: 2374 v = kwargs[k] 2375 dv = d.get(k) 2376 if not self.match_value(k, dv, v): 2377 result = False 2378 break 2379 return result 2380 2381 def match_value(self, k, dv, v): 2382 """ 2383 Try to match a single stored value (dv) with a supplied value (v). 2384 """ 2385 if type(v) != type(dv): 2386 result = False 2387 elif type(dv) is not str or k not in self._partial_matches: 2388 result = (v == dv) 2389 else: 2390 result = dv.find(v) >= 0 2391 return result 2392 2393 2394_can_symlink = None 2395def can_symlink(): 2396 global _can_symlink 2397 if _can_symlink is not None: 2398 return _can_symlink 2399 symlink_path = TESTFN + "can_symlink" 2400 try: 2401 os.symlink(TESTFN, symlink_path) 2402 can = True 2403 except (OSError, NotImplementedError, AttributeError): 2404 can = False 2405 else: 2406 os.remove(symlink_path) 2407 _can_symlink = can 2408 return can 2409 2410def skip_unless_symlink(test): 2411 """Skip decorator for tests that require functional symlink""" 2412 ok = can_symlink() 2413 msg = "Requires functional symlink implementation" 2414 return test if ok else unittest.skip(msg)(test) 2415 2416_can_xattr = None 2417def can_xattr(): 2418 global _can_xattr 2419 if _can_xattr is not None: 2420 return _can_xattr 2421 if not hasattr(os, "setxattr"): 2422 can = False 2423 else: 2424 tmp_dir = tempfile.mkdtemp() 2425 tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir) 2426 try: 2427 with open(TESTFN, "wb") as fp: 2428 try: 2429 # TESTFN & tempfile may use different file systems with 2430 # different capabilities 2431 os.setxattr(tmp_fp, b"user.test", b"") 2432 os.setxattr(tmp_name, b"trusted.foo", b"42") 2433 os.setxattr(fp.fileno(), b"user.test", b"") 2434 # Kernels < 2.6.39 don't respect setxattr flags. 2435 kernel_version = platform.release() 2436 m = re.match(r"2.6.(\d{1,2})", kernel_version) 2437 can = m is None or int(m.group(1)) >= 39 2438 except OSError: 2439 can = False 2440 finally: 2441 unlink(TESTFN) 2442 unlink(tmp_name) 2443 rmdir(tmp_dir) 2444 _can_xattr = can 2445 return can 2446 2447def skip_unless_xattr(test): 2448 """Skip decorator for tests that require functional extended attributes""" 2449 ok = can_xattr() 2450 msg = "no non-broken extended attribute support" 2451 return test if ok else unittest.skip(msg)(test) 2452 2453_bind_nix_socket_error = None 2454def skip_unless_bind_unix_socket(test): 2455 """Decorator for tests requiring a functional bind() for unix sockets.""" 2456 if not hasattr(socket, 'AF_UNIX'): 2457 return unittest.skip('No UNIX Sockets')(test) 2458 global _bind_nix_socket_error 2459 if _bind_nix_socket_error is None: 2460 path = TESTFN + "can_bind_unix_socket" 2461 with socket.socket(socket.AF_UNIX) as sock: 2462 try: 2463 sock.bind(path) 2464 _bind_nix_socket_error = False 2465 except OSError as e: 2466 _bind_nix_socket_error = e 2467 finally: 2468 unlink(path) 2469 if _bind_nix_socket_error: 2470 msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error 2471 return unittest.skip(msg)(test) 2472 else: 2473 return test 2474 2475 2476def fs_is_case_insensitive(directory): 2477 """Detects if the file system for the specified directory is case-insensitive.""" 2478 with tempfile.NamedTemporaryFile(dir=directory) as base: 2479 base_path = base.name 2480 case_path = base_path.upper() 2481 if case_path == base_path: 2482 case_path = base_path.lower() 2483 try: 2484 return os.path.samefile(base_path, case_path) 2485 except FileNotFoundError: 2486 return False 2487 2488 2489def detect_api_mismatch(ref_api, other_api, *, ignore=()): 2490 """Returns the set of items in ref_api not in other_api, except for a 2491 defined list of items to be ignored in this check. 2492 2493 By default this skips private attributes beginning with '_' but 2494 includes all magic methods, i.e. those starting and ending in '__'. 2495 """ 2496 missing_items = set(dir(ref_api)) - set(dir(other_api)) 2497 if ignore: 2498 missing_items -= set(ignore) 2499 missing_items = set(m for m in missing_items 2500 if not m.startswith('_') or m.endswith('__')) 2501 return missing_items 2502 2503 2504def check__all__(test_case, module, name_of_module=None, extra=(), 2505 blacklist=()): 2506 """Assert that the __all__ variable of 'module' contains all public names. 2507 2508 The module's public names (its API) are detected automatically based on 2509 whether they match the public name convention and were defined in 2510 'module'. 2511 2512 The 'name_of_module' argument can specify (as a string or tuple thereof) 2513 what module(s) an API could be defined in in order to be detected as a 2514 public API. One case for this is when 'module' imports part of its public 2515 API from other modules, possibly a C backend (like 'csv' and its '_csv'). 2516 2517 The 'extra' argument can be a set of names that wouldn't otherwise be 2518 automatically detected as "public", like objects without a proper 2519 '__module__' attribute. If provided, it will be added to the 2520 automatically detected ones. 2521 2522 The 'blacklist' argument can be a set of names that must not be treated 2523 as part of the public API even though their names indicate otherwise. 2524 2525 Usage: 2526 import bar 2527 import foo 2528 import unittest 2529 from test import support 2530 2531 class MiscTestCase(unittest.TestCase): 2532 def test__all__(self): 2533 support.check__all__(self, foo) 2534 2535 class OtherTestCase(unittest.TestCase): 2536 def test__all__(self): 2537 extra = {'BAR_CONST', 'FOO_CONST'} 2538 blacklist = {'baz'} # Undocumented name. 2539 # bar imports part of its API from _bar. 2540 support.check__all__(self, bar, ('bar', '_bar'), 2541 extra=extra, blacklist=blacklist) 2542 2543 """ 2544 2545 if name_of_module is None: 2546 name_of_module = (module.__name__, ) 2547 elif isinstance(name_of_module, str): 2548 name_of_module = (name_of_module, ) 2549 2550 expected = set(extra) 2551 2552 for name in dir(module): 2553 if name.startswith('_') or name in blacklist: 2554 continue 2555 obj = getattr(module, name) 2556 if (getattr(obj, '__module__', None) in name_of_module or 2557 (not hasattr(obj, '__module__') and 2558 not isinstance(obj, types.ModuleType))): 2559 expected.add(name) 2560 test_case.assertCountEqual(module.__all__, expected) 2561 2562 2563class SuppressCrashReport: 2564 """Try to prevent a crash report from popping up. 2565 2566 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 2567 disable the creation of coredump file. 2568 """ 2569 old_value = None 2570 old_modes = None 2571 2572 def __enter__(self): 2573 """On Windows, disable Windows Error Reporting dialogs using 2574 SetErrorMode. 2575 2576 On UNIX, try to save the previous core file size limit, then set 2577 soft limit to 0. 2578 """ 2579 if sys.platform.startswith('win'): 2580 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 2581 # GetErrorMode is not available on Windows XP and Windows Server 2003, 2582 # but SetErrorMode returns the previous value, so we can use that 2583 import ctypes 2584 self._k32 = ctypes.windll.kernel32 2585 SEM_NOGPFAULTERRORBOX = 0x02 2586 self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) 2587 self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) 2588 2589 # Suppress assert dialogs in debug builds 2590 # (see http://bugs.python.org/issue23314) 2591 try: 2592 import msvcrt 2593 msvcrt.CrtSetReportMode 2594 except (AttributeError, ImportError): 2595 # no msvcrt or a release build 2596 pass 2597 else: 2598 self.old_modes = {} 2599 for report_type in [msvcrt.CRT_WARN, 2600 msvcrt.CRT_ERROR, 2601 msvcrt.CRT_ASSERT]: 2602 old_mode = msvcrt.CrtSetReportMode(report_type, 2603 msvcrt.CRTDBG_MODE_FILE) 2604 old_file = msvcrt.CrtSetReportFile(report_type, 2605 msvcrt.CRTDBG_FILE_STDERR) 2606 self.old_modes[report_type] = old_mode, old_file 2607 2608 else: 2609 if resource is not None: 2610 try: 2611 self.old_value = resource.getrlimit(resource.RLIMIT_CORE) 2612 resource.setrlimit(resource.RLIMIT_CORE, 2613 (0, self.old_value[1])) 2614 except (ValueError, OSError): 2615 pass 2616 2617 if sys.platform == 'darwin': 2618 # Check if the 'Crash Reporter' on OSX was configured 2619 # in 'Developer' mode and warn that it will get triggered 2620 # when it is. 2621 # 2622 # This assumes that this context manager is used in tests 2623 # that might trigger the next manager. 2624 cmd = ['/usr/bin/defaults', 'read', 2625 'com.apple.CrashReporter', 'DialogType'] 2626 proc = subprocess.Popen(cmd, 2627 stdout=subprocess.PIPE, 2628 stderr=subprocess.PIPE) 2629 with proc: 2630 stdout = proc.communicate()[0] 2631 if stdout.strip() == b'developer': 2632 print("this test triggers the Crash Reporter, " 2633 "that is intentional", end='', flush=True) 2634 2635 return self 2636 2637 def __exit__(self, *ignore_exc): 2638 """Restore Windows ErrorMode or core file behavior to initial value.""" 2639 if self.old_value is None: 2640 return 2641 2642 if sys.platform.startswith('win'): 2643 self._k32.SetErrorMode(self.old_value) 2644 2645 if self.old_modes: 2646 import msvcrt 2647 for report_type, (old_mode, old_file) in self.old_modes.items(): 2648 msvcrt.CrtSetReportMode(report_type, old_mode) 2649 msvcrt.CrtSetReportFile(report_type, old_file) 2650 else: 2651 if resource is not None: 2652 try: 2653 resource.setrlimit(resource.RLIMIT_CORE, self.old_value) 2654 except (ValueError, OSError): 2655 pass 2656 2657 2658def patch(test_instance, object_to_patch, attr_name, new_value): 2659 """Override 'object_to_patch'.'attr_name' with 'new_value'. 2660 2661 Also, add a cleanup procedure to 'test_instance' to restore 2662 'object_to_patch' value for 'attr_name'. 2663 The 'attr_name' should be a valid attribute for 'object_to_patch'. 2664 2665 """ 2666 # check that 'attr_name' is a real attribute for 'object_to_patch' 2667 # will raise AttributeError if it does not exist 2668 getattr(object_to_patch, attr_name) 2669 2670 # keep a copy of the old value 2671 attr_is_local = False 2672 try: 2673 old_value = object_to_patch.__dict__[attr_name] 2674 except (AttributeError, KeyError): 2675 old_value = getattr(object_to_patch, attr_name, None) 2676 else: 2677 attr_is_local = True 2678 2679 # restore the value when the test is done 2680 def cleanup(): 2681 if attr_is_local: 2682 setattr(object_to_patch, attr_name, old_value) 2683 else: 2684 delattr(object_to_patch, attr_name) 2685 2686 test_instance.addCleanup(cleanup) 2687 2688 # actually override the attribute 2689 setattr(object_to_patch, attr_name, new_value) 2690 2691 2692def run_in_subinterp(code): 2693 """ 2694 Run code in a subinterpreter. Raise unittest.SkipTest if the tracemalloc 2695 module is enabled. 2696 """ 2697 # Issue #10915, #15751: PyGILState_*() functions don't work with 2698 # sub-interpreters, the tracemalloc module uses these functions internally 2699 try: 2700 import tracemalloc 2701 except ImportError: 2702 pass 2703 else: 2704 if tracemalloc.is_tracing(): 2705 raise unittest.SkipTest("run_in_subinterp() cannot be used " 2706 "if tracemalloc module is tracing " 2707 "memory allocations") 2708 import _testcapi 2709 return _testcapi.run_in_subinterp(code) 2710 2711 2712def check_free_after_iterating(test, iter, cls, args=()): 2713 class A(cls): 2714 def __del__(self): 2715 nonlocal done 2716 done = True 2717 try: 2718 next(it) 2719 except StopIteration: 2720 pass 2721 2722 done = False 2723 it = iter(A(*args)) 2724 # Issue 26494: Shouldn't crash 2725 test.assertRaises(StopIteration, next, it) 2726 # The sequence should be deallocated just after the end of iterating 2727 gc_collect() 2728 test.assertTrue(done) 2729 2730 2731def missing_compiler_executable(cmd_names=[]): 2732 """Check if the compiler components used to build the interpreter exist. 2733 2734 Check for the existence of the compiler executables whose names are listed 2735 in 'cmd_names' or all the compiler executables when 'cmd_names' is empty 2736 and return the first missing executable or None when none is found 2737 missing. 2738 2739 """ 2740 from distutils import ccompiler, sysconfig, spawn 2741 compiler = ccompiler.new_compiler() 2742 sysconfig.customize_compiler(compiler) 2743 for name in compiler.executables: 2744 if cmd_names and name not in cmd_names: 2745 continue 2746 cmd = getattr(compiler, name) 2747 if cmd_names: 2748 assert cmd is not None, \ 2749 "the '%s' executable is not configured" % name 2750 elif cmd is None: 2751 continue 2752 if spawn.find_executable(cmd[0]) is None: 2753 return cmd[0] 2754 2755 2756_is_android_emulator = None 2757def setswitchinterval(interval): 2758 # Setting a very low gil interval on the Android emulator causes python 2759 # to hang (issue #26939). 2760 minimum_interval = 1e-5 2761 if is_android and interval < minimum_interval: 2762 global _is_android_emulator 2763 if _is_android_emulator is None: 2764 _is_android_emulator = (subprocess.check_output( 2765 ['getprop', 'ro.kernel.qemu']).strip() == b'1') 2766 if _is_android_emulator: 2767 interval = minimum_interval 2768 return sys.setswitchinterval(interval) 2769 2770 2771@contextlib.contextmanager 2772def disable_faulthandler(): 2773 # use sys.__stderr__ instead of sys.stderr, since regrtest replaces 2774 # sys.stderr with a StringIO which has no file descriptor when a test 2775 # is run with -W/--verbose3. 2776 fd = sys.__stderr__.fileno() 2777 2778 is_enabled = faulthandler.is_enabled() 2779 try: 2780 faulthandler.disable() 2781 yield 2782 finally: 2783 if is_enabled: 2784 faulthandler.enable(file=fd, all_threads=True) 2785 2786 2787def fd_count(): 2788 """Count the number of open file descriptors. 2789 """ 2790 if sys.platform.startswith(('linux', 'freebsd')): 2791 try: 2792 names = os.listdir("/proc/self/fd") 2793 # Substract one because listdir() opens internally a file 2794 # descriptor to list the content of the /proc/self/fd/ directory. 2795 return len(names) - 1 2796 except FileNotFoundError: 2797 pass 2798 2799 MAXFD = 256 2800 if hasattr(os, 'sysconf'): 2801 try: 2802 MAXFD = os.sysconf("SC_OPEN_MAX") 2803 except OSError: 2804 pass 2805 2806 old_modes = None 2807 if sys.platform == 'win32': 2808 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 2809 # on invalid file descriptor if Python is compiled in debug mode 2810 try: 2811 import msvcrt 2812 msvcrt.CrtSetReportMode 2813 except (AttributeError, ImportError): 2814 # no msvcrt or a release build 2815 pass 2816 else: 2817 old_modes = {} 2818 for report_type in (msvcrt.CRT_WARN, 2819 msvcrt.CRT_ERROR, 2820 msvcrt.CRT_ASSERT): 2821 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) 2822 2823 try: 2824 count = 0 2825 for fd in range(MAXFD): 2826 try: 2827 # Prefer dup() over fstat(). fstat() can require input/output 2828 # whereas dup() doesn't. 2829 fd2 = os.dup(fd) 2830 except OSError as e: 2831 if e.errno != errno.EBADF: 2832 raise 2833 else: 2834 os.close(fd2) 2835 count += 1 2836 finally: 2837 if old_modes is not None: 2838 for report_type in (msvcrt.CRT_WARN, 2839 msvcrt.CRT_ERROR, 2840 msvcrt.CRT_ASSERT): 2841 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 2842 2843 return count 2844 2845 2846class SaveSignals: 2847 """ 2848 Save an restore signal handlers. 2849 2850 This class is only able to save/restore signal handlers registered 2851 by the Python signal module: see bpo-13285 for "external" signal 2852 handlers. 2853 """ 2854 2855 def __init__(self): 2856 import signal 2857 self.signal = signal 2858 self.signals = list(range(1, signal.NSIG)) 2859 # SIGKILL and SIGSTOP signals cannot be ignored nor caught 2860 for signame in ('SIGKILL', 'SIGSTOP'): 2861 try: 2862 signum = getattr(signal, signame) 2863 except AttributeError: 2864 continue 2865 self.signals.remove(signum) 2866 self.handlers = {} 2867 2868 def save(self): 2869 for signum in self.signals: 2870 handler = self.signal.getsignal(signum) 2871 if handler is None: 2872 # getsignal() returns None if a signal handler was not 2873 # registered by the Python signal module, 2874 # and the handler is not SIG_DFL nor SIG_IGN. 2875 # 2876 # Ignore the signal: we cannot restore the handler. 2877 continue 2878 self.handlers[signum] = handler 2879 2880 def restore(self): 2881 for signum, handler in self.handlers.items(): 2882 self.signal.signal(signum, handler) 2883 2884 2885def with_pymalloc(): 2886 import _testcapi 2887 return _testcapi.WITH_PYMALLOC 2888 2889 2890class FakePath: 2891 """Simple implementing of the path protocol. 2892 """ 2893 def __init__(self, path): 2894 self.path = path 2895 2896 def __repr__(self): 2897 return f'<FakePath {self.path!r}>' 2898 2899 def __fspath__(self): 2900 if (isinstance(self.path, BaseException) or 2901 isinstance(self.path, type) and 2902 issubclass(self.path, BaseException)): 2903 raise self.path 2904 else: 2905 return self.path 2906