1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.support': 4 raise ImportError('test.support must be imported from the test package') 5 6import contextlib 7import errno 8import fnmatch 9import functools 10import gc 11import socket 12import stat 13import sys 14import os 15import platform 16import shutil 17import warnings 18import unittest 19import importlib 20import UserDict 21import re 22import time 23import struct 24import sysconfig 25import types 26 27try: 28 import thread 29except ImportError: 30 thread = None 31 32__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", 33 "verbose", "use_resources", "max_memuse", "record_original_stdout", 34 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 35 "is_resource_enabled", "requires", "requires_mac_ver", 36 "find_unused_port", "bind_port", 37 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 38 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", 39 "open_urlresource", "check_warnings", "check_py3k_warnings", 40 "CleanImport", "EnvironmentVarGuard", "captured_output", 41 "captured_stdout", "TransientResource", "transient_internet", 42 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 43 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 44 "threading_cleanup", "reap_threads", "start_threads", "cpython_only", 45 "check_impl_detail", "get_attribute", "py3k_bytes", 46 "import_fresh_module", "threading_cleanup", "reap_children", 47 "strip_python_stderr", "IPV6_ENABLED", "run_with_tz", 48 "SuppressCrashReport"] 49 50class Error(Exception): 51 """Base class for regression test exceptions.""" 52 53class TestFailed(Error): 54 """Test failed.""" 55 56class ResourceDenied(unittest.SkipTest): 57 """Test skipped because it requested a disallowed resource. 58 59 This is raised when a test calls requires() for a resource that 60 has not been enabled. It is used to distinguish between expected 61 and unexpected skips. 62 """ 63 64@contextlib.contextmanager 65def _ignore_deprecated_imports(ignore=True): 66 """Context manager to suppress package and module deprecation 67 warnings when importing them. 68 69 If ignore is False, this context manager has no effect.""" 70 if ignore: 71 with warnings.catch_warnings(): 72 warnings.filterwarnings("ignore", ".+ (module|package)", 73 DeprecationWarning) 74 yield 75 else: 76 yield 77 78 79def import_module(name, deprecated=False): 80 """Import and return the module to be tested, raising SkipTest if 81 it is not available. 82 83 If deprecated is True, any module or package deprecation messages 84 will be suppressed.""" 85 with _ignore_deprecated_imports(deprecated): 86 try: 87 return importlib.import_module(name) 88 except ImportError, msg: 89 raise unittest.SkipTest(str(msg)) 90 91 92def _save_and_remove_module(name, orig_modules): 93 """Helper function to save and remove a module from sys.modules 94 95 Raise ImportError if the module can't be imported.""" 96 # try to import the module and raise an error if it can't be imported 97 if name not in sys.modules: 98 __import__(name) 99 del sys.modules[name] 100 for modname in list(sys.modules): 101 if modname == name or modname.startswith(name + '.'): 102 orig_modules[modname] = sys.modules[modname] 103 del sys.modules[modname] 104 105def _save_and_block_module(name, orig_modules): 106 """Helper function to save and block a module in sys.modules 107 108 Return True if the module was in sys.modules, False otherwise.""" 109 saved = True 110 try: 111 orig_modules[name] = sys.modules[name] 112 except KeyError: 113 saved = False 114 sys.modules[name] = None 115 return saved 116 117 118def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 119 """Imports and returns a module, deliberately bypassing the sys.modules cache 120 and importing a fresh copy of the module. Once the import is complete, 121 the sys.modules cache is restored to its original state. 122 123 Modules named in fresh are also imported anew if needed by the import. 124 If one of these modules can't be imported, None is returned. 125 126 Importing of modules named in blocked is prevented while the fresh import 127 takes place. 128 129 If deprecated is True, any module or package deprecation messages 130 will be suppressed.""" 131 # NOTE: test_heapq, test_json, and test_warnings include extra sanity 132 # checks to make sure that this utility function is working as expected 133 with _ignore_deprecated_imports(deprecated): 134 # Keep track of modules saved for later restoration as well 135 # as those which just need a blocking entry removed 136 orig_modules = {} 137 names_to_remove = [] 138 _save_and_remove_module(name, orig_modules) 139 try: 140 for fresh_name in fresh: 141 _save_and_remove_module(fresh_name, orig_modules) 142 for blocked_name in blocked: 143 if not _save_and_block_module(blocked_name, orig_modules): 144 names_to_remove.append(blocked_name) 145 fresh_module = importlib.import_module(name) 146 except ImportError: 147 fresh_module = None 148 finally: 149 for orig_name, module in orig_modules.items(): 150 sys.modules[orig_name] = module 151 for name_to_remove in names_to_remove: 152 del sys.modules[name_to_remove] 153 return fresh_module 154 155 156def get_attribute(obj, name): 157 """Get an attribute, raising SkipTest if AttributeError is raised.""" 158 try: 159 attribute = getattr(obj, name) 160 except AttributeError: 161 if isinstance(obj, types.ModuleType): 162 msg = "module %r has no attribute %r" % (obj.__name__, name) 163 elif isinstance(obj, types.ClassType): 164 msg = "class %s has no attribute %r" % (obj.__name__, name) 165 elif isinstance(obj, types.InstanceType): 166 msg = "%s instance has no attribute %r" % (obj.__class__.__name__, name) 167 elif isinstance(obj, type): 168 msg = "type object %r has no attribute %r" % (obj.__name__, name) 169 else: 170 msg = "%r object has no attribute %r" % (type(obj).__name__, name) 171 raise unittest.SkipTest(msg) 172 else: 173 return attribute 174 175 176verbose = 1 # Flag set to 0 by regrtest.py 177use_resources = None # Flag set to [] by regrtest.py 178max_memuse = 0 # Disable bigmem tests (they will still be run with 179 # small sizes, to make sure they work.) 180real_max_memuse = 0 181failfast = False 182 183# _original_stdout is meant to hold stdout at the time regrtest began. 184# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 185# The point is to have some flavor of stdout the user can actually see. 186_original_stdout = None 187def record_original_stdout(stdout): 188 global _original_stdout 189 _original_stdout = stdout 190 191def get_original_stdout(): 192 return _original_stdout or sys.stdout 193 194def unload(name): 195 try: 196 del sys.modules[name] 197 except KeyError: 198 pass 199 200def _force_run(path, func, *args): 201 try: 202 return func(*args) 203 except EnvironmentError as err: 204 if verbose >= 2: 205 print('%s: %s' % (err.__class__.__name__, err)) 206 print('re-run %s%r' % (func.__name__, args)) 207 os.chmod(path, stat.S_IRWXU) 208 return func(*args) 209 210if sys.platform.startswith("win"): 211 def _waitfor(func, pathname, waitall=False): 212 # Perform the operation 213 func(pathname) 214 # Now setup the wait loop 215 if waitall: 216 dirname = pathname 217 else: 218 dirname, name = os.path.split(pathname) 219 dirname = dirname or '.' 220 # Check for `pathname` to be removed from the filesystem. 221 # The exponential backoff of the timeout amounts to a total 222 # of ~1 second after which the deletion is probably an error 223 # anyway. 224 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 225 # required when contention occurs. 226 timeout = 0.001 227 while timeout < 1.0: 228 # Note we are only testing for the existence of the file(s) in 229 # the contents of the directory regardless of any security or 230 # access rights. If we have made it this far, we have sufficient 231 # permissions to do that much using Python's equivalent of the 232 # Windows API FindFirstFile. 233 # Other Windows APIs can fail or give incorrect results when 234 # dealing with files that are pending deletion. 235 L = os.listdir(dirname) 236 if not (L if waitall else name in L): 237 return 238 # Increase the timeout and try again 239 time.sleep(timeout) 240 timeout *= 2 241 warnings.warn('tests may fail, delete still pending for ' + pathname, 242 RuntimeWarning, stacklevel=4) 243 244 def _unlink(filename): 245 _waitfor(os.unlink, filename) 246 247 def _rmdir(dirname): 248 _waitfor(os.rmdir, dirname) 249 250 def _rmtree(path): 251 def _rmtree_inner(path): 252 for name in _force_run(path, os.listdir, path): 253 fullname = os.path.join(path, name) 254 if os.path.isdir(fullname): 255 _waitfor(_rmtree_inner, fullname, waitall=True) 256 _force_run(fullname, os.rmdir, fullname) 257 else: 258 _force_run(fullname, os.unlink, fullname) 259 _waitfor(_rmtree_inner, path, waitall=True) 260 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 261else: 262 _unlink = os.unlink 263 _rmdir = os.rmdir 264 265 def _rmtree(path): 266 try: 267 shutil.rmtree(path) 268 return 269 except EnvironmentError: 270 pass 271 272 def _rmtree_inner(path): 273 for name in _force_run(path, os.listdir, path): 274 fullname = os.path.join(path, name) 275 try: 276 mode = os.lstat(fullname).st_mode 277 except EnvironmentError: 278 mode = 0 279 if stat.S_ISDIR(mode): 280 _rmtree_inner(fullname) 281 _force_run(path, os.rmdir, fullname) 282 else: 283 _force_run(path, os.unlink, fullname) 284 _rmtree_inner(path) 285 os.rmdir(path) 286 287def unlink(filename): 288 try: 289 _unlink(filename) 290 except OSError as exc: 291 if exc.errno not in (errno.ENOENT, errno.ENOTDIR): 292 raise 293 294def rmdir(dirname): 295 try: 296 _rmdir(dirname) 297 except OSError as error: 298 # The directory need not exist. 299 if error.errno != errno.ENOENT: 300 raise 301 302def rmtree(path): 303 try: 304 _rmtree(path) 305 except OSError, e: 306 # Unix returns ENOENT, Windows returns ESRCH. 307 if e.errno not in (errno.ENOENT, errno.ESRCH): 308 raise 309 310def forget(modname): 311 '''"Forget" a module was ever imported by removing it from sys.modules and 312 deleting any .pyc and .pyo files.''' 313 unload(modname) 314 for dirname in sys.path: 315 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 316 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 317 # the chance exists that there is no .pyc (and thus the 'try' statement 318 # is exited) but there is a .pyo file. 319 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 320 321# Check whether a gui is actually available 322def _is_gui_available(): 323 if hasattr(_is_gui_available, 'result'): 324 return _is_gui_available.result 325 reason = None 326 if sys.platform.startswith('win'): 327 # if Python is running as a service (such as the buildbot service), 328 # gui interaction may be disallowed 329 import ctypes 330 import ctypes.wintypes 331 UOI_FLAGS = 1 332 WSF_VISIBLE = 0x0001 333 class USEROBJECTFLAGS(ctypes.Structure): 334 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 335 ("fReserved", ctypes.wintypes.BOOL), 336 ("dwFlags", ctypes.wintypes.DWORD)] 337 dll = ctypes.windll.user32 338 h = dll.GetProcessWindowStation() 339 if not h: 340 raise ctypes.WinError() 341 uof = USEROBJECTFLAGS() 342 needed = ctypes.wintypes.DWORD() 343 res = dll.GetUserObjectInformationW(h, 344 UOI_FLAGS, 345 ctypes.byref(uof), 346 ctypes.sizeof(uof), 347 ctypes.byref(needed)) 348 if not res: 349 raise ctypes.WinError() 350 if not bool(uof.dwFlags & WSF_VISIBLE): 351 reason = "gui not available (WSF_VISIBLE flag not set)" 352 elif sys.platform == 'darwin': 353 # The Aqua Tk implementations on OS X can abort the process if 354 # being called in an environment where a window server connection 355 # cannot be made, for instance when invoked by a buildbot or ssh 356 # process not running under the same user id as the current console 357 # user. To avoid that, raise an exception if the window manager 358 # connection is not available. 359 from ctypes import cdll, c_int, pointer, Structure 360 from ctypes.util import find_library 361 362 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 363 364 if app_services.CGMainDisplayID() == 0: 365 reason = "gui tests cannot run without OS X window manager" 366 else: 367 class ProcessSerialNumber(Structure): 368 _fields_ = [("highLongOfPSN", c_int), 369 ("lowLongOfPSN", c_int)] 370 psn = ProcessSerialNumber() 371 psn_p = pointer(psn) 372 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 373 (app_services.SetFrontProcess(psn_p) < 0) ): 374 reason = "cannot run without OS X gui process" 375 376 # check on every platform whether tkinter can actually do anything 377 if not reason: 378 try: 379 from Tkinter import Tk 380 root = Tk() 381 root.withdraw() 382 root.update() 383 root.destroy() 384 except Exception as e: 385 err_string = str(e) 386 if len(err_string) > 50: 387 err_string = err_string[:50] + ' [...]' 388 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 389 err_string) 390 391 _is_gui_available.reason = reason 392 _is_gui_available.result = not reason 393 394 return _is_gui_available.result 395 396def is_resource_enabled(resource): 397 """Test whether a resource is enabled. 398 399 Known resources are set by regrtest.py. If not running under regrtest.py, 400 all resources are assumed enabled unless use_resources has been set. 401 """ 402 return use_resources is None or resource in use_resources 403 404def requires(resource, msg=None): 405 """Raise ResourceDenied if the specified resource is not available.""" 406 if not is_resource_enabled(resource): 407 if msg is None: 408 msg = "Use of the `%s' resource not enabled" % resource 409 raise ResourceDenied(msg) 410 if resource == 'gui' and not _is_gui_available(): 411 raise ResourceDenied(_is_gui_available.reason) 412 413def requires_mac_ver(*min_version): 414 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 415 version if less than min_version. 416 417 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 418 is lesser than 10.5. 419 """ 420 def decorator(func): 421 @functools.wraps(func) 422 def wrapper(*args, **kw): 423 if sys.platform == 'darwin': 424 version_txt = platform.mac_ver()[0] 425 try: 426 version = tuple(map(int, version_txt.split('.'))) 427 except ValueError: 428 pass 429 else: 430 if version < min_version: 431 min_version_txt = '.'.join(map(str, min_version)) 432 raise unittest.SkipTest( 433 "Mac OS X %s or higher required, not %s" 434 % (min_version_txt, version_txt)) 435 return func(*args, **kw) 436 wrapper.min_version = min_version 437 return wrapper 438 return decorator 439 440 441# Don't use "localhost", since resolving it uses the DNS under recent 442# Windows versions (see issue #18792). 443HOST = "127.0.0.1" 444HOSTv6 = "::1" 445 446 447def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 448 """Returns an unused port that should be suitable for binding. This is 449 achieved by creating a temporary socket with the same family and type as 450 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 451 the specified host address (defaults to 0.0.0.0) with the port set to 0, 452 eliciting an unused ephemeral port from the OS. The temporary socket is 453 then closed and deleted, and the ephemeral port is returned. 454 455 Either this method or bind_port() should be used for any tests where a 456 server socket needs to be bound to a particular port for the duration of 457 the test. Which one to use depends on whether the calling code is creating 458 a python socket, or if an unused port needs to be provided in a constructor 459 or passed to an external program (i.e. the -accept argument to openssl's 460 s_server mode). Always prefer bind_port() over find_unused_port() where 461 possible. Hard coded ports should *NEVER* be used. As soon as a server 462 socket is bound to a hard coded port, the ability to run multiple instances 463 of the test simultaneously on the same host is compromised, which makes the 464 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 465 may simply manifest as a failed test, which can be recovered from without 466 intervention in most cases, but on Windows, the entire python process can 467 completely and utterly wedge, requiring someone to log in to the buildbot 468 and manually kill the affected process. 469 470 (This is easy to reproduce on Windows, unfortunately, and can be traced to 471 the SO_REUSEADDR socket option having different semantics on Windows versus 472 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 473 listen and then accept connections on identical host/ports. An EADDRINUSE 474 socket.error will be raised at some point (depending on the platform and 475 the order bind and listen were called on each socket). 476 477 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 478 will ever be raised when attempting to bind two identical host/ports. When 479 accept() is called on each socket, the second caller's process will steal 480 the port from the first caller, leaving them both in an awkwardly wedged 481 state where they'll no longer respond to any signals or graceful kills, and 482 must be forcibly killed via OpenProcess()/TerminateProcess(). 483 484 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 485 instead of SO_REUSEADDR, which effectively affords the same semantics as 486 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 487 Source world compared to Windows ones, this is a common mistake. A quick 488 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 489 openssl.exe is called with the 's_server' option, for example. See 490 http://bugs.python.org/issue2550 for more info. The following site also 491 has a very thorough description about the implications of both REUSEADDR 492 and EXCLUSIVEADDRUSE on Windows: 493 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 494 495 XXX: although this approach is a vast improvement on previous attempts to 496 elicit unused ports, it rests heavily on the assumption that the ephemeral 497 port returned to us by the OS won't immediately be dished back out to some 498 other process when we close and delete our temporary socket but before our 499 calling code has a chance to bind the returned port. We can deal with this 500 issue if/when we come across it.""" 501 tempsock = socket.socket(family, socktype) 502 port = bind_port(tempsock) 503 tempsock.close() 504 del tempsock 505 return port 506 507def bind_port(sock, host=HOST): 508 """Bind the socket to a free port and return the port number. Relies on 509 ephemeral ports in order to ensure we are using an unbound port. This is 510 important as many tests may be running simultaneously, especially in a 511 buildbot environment. This method raises an exception if the sock.family 512 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 513 or SO_REUSEPORT set on it. Tests should *never* set these socket options 514 for TCP/IP sockets. The only case for setting these options is testing 515 multicasting via multiple UDP sockets. 516 517 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 518 on Windows), it will be set on the socket. This will prevent anyone else 519 from bind()'ing to our host/port for the duration of the test. 520 """ 521 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 522 if hasattr(socket, 'SO_REUSEADDR'): 523 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 524 raise TestFailed("tests should never set the SO_REUSEADDR " \ 525 "socket option on TCP/IP sockets!") 526 if hasattr(socket, 'SO_REUSEPORT'): 527 try: 528 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 529 raise TestFailed("tests should never set the SO_REUSEPORT " \ 530 "socket option on TCP/IP sockets!") 531 except EnvironmentError: 532 # Python's socket module was compiled using modern headers 533 # thus defining SO_REUSEPORT but this process is running 534 # under an older kernel that does not support SO_REUSEPORT. 535 pass 536 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 537 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 538 539 sock.bind((host, 0)) 540 port = sock.getsockname()[1] 541 return port 542 543def _is_ipv6_enabled(): 544 """Check whether IPv6 is enabled on this host.""" 545 if socket.has_ipv6: 546 sock = None 547 try: 548 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 549 sock.bind((HOSTv6, 0)) 550 return True 551 except socket.error: 552 pass 553 finally: 554 if sock: 555 sock.close() 556 return False 557 558IPV6_ENABLED = _is_ipv6_enabled() 559 560def system_must_validate_cert(f): 561 """Skip the test on TLS certificate validation failures.""" 562 @functools.wraps(f) 563 def dec(*args, **kwargs): 564 try: 565 f(*args, **kwargs) 566 except IOError as e: 567 if "CERTIFICATE_VERIFY_FAILED" in str(e): 568 raise unittest.SkipTest("system does not contain " 569 "necessary certificates") 570 raise 571 return dec 572 573FUZZ = 1e-6 574 575def fcmp(x, y): # fuzzy comparison function 576 if isinstance(x, float) or isinstance(y, float): 577 try: 578 fuzz = (abs(x) + abs(y)) * FUZZ 579 if abs(x-y) <= fuzz: 580 return 0 581 except: 582 pass 583 elif type(x) == type(y) and isinstance(x, (tuple, list)): 584 for i in range(min(len(x), len(y))): 585 outcome = fcmp(x[i], y[i]) 586 if outcome != 0: 587 return outcome 588 return (len(x) > len(y)) - (len(x) < len(y)) 589 return (x > y) - (x < y) 590 591 592# A constant likely larger than the underlying OS pipe buffer size, to 593# make writes blocking. 594# Windows limit seems to be around 512 B, and many Unix kernels have a 595# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 596# (see issue #17835 for a discussion of this number). 597PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 598 599# A constant likely larger than the underlying OS socket buffer size, to make 600# writes blocking. 601# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 602# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 603# for a discussion of this number). 604SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 605 606is_jython = sys.platform.startswith('java') 607 608try: 609 unicode 610 have_unicode = True 611except NameError: 612 have_unicode = False 613 614requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support') 615 616def u(s): 617 return unicode(s, 'unicode-escape') 618 619# FS_NONASCII: non-ASCII Unicode character encodable by 620# sys.getfilesystemencoding(), or None if there is no such character. 621FS_NONASCII = None 622if have_unicode: 623 for character in ( 624 # First try printable and common characters to have a readable filename. 625 # For each character, the encoding list are just example of encodings able 626 # to encode the character (the list is not exhaustive). 627 628 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 629 unichr(0x00E6), 630 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 631 unichr(0x0130), 632 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 633 unichr(0x0141), 634 # U+03C6 (Greek Small Letter Phi): cp1253 635 unichr(0x03C6), 636 # U+041A (Cyrillic Capital Letter Ka): cp1251 637 unichr(0x041A), 638 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 639 unichr(0x05D0), 640 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 641 unichr(0x060C), 642 # U+062A (Arabic Letter Teh): cp720 643 unichr(0x062A), 644 # U+0E01 (Thai Character Ko Kai): cp874 645 unichr(0x0E01), 646 647 # Then try more "special" characters. "special" because they may be 648 # interpreted or displayed differently depending on the exact locale 649 # encoding and the font. 650 651 # U+00A0 (No-Break Space) 652 unichr(0x00A0), 653 # U+20AC (Euro Sign) 654 unichr(0x20AC), 655 ): 656 try: 657 character.encode(sys.getfilesystemencoding())\ 658 .decode(sys.getfilesystemencoding()) 659 except UnicodeError: 660 pass 661 else: 662 FS_NONASCII = character 663 break 664 665# Filename used for testing 666if os.name == 'java': 667 # Jython disallows @ in module names 668 TESTFN = '$test' 669elif os.name == 'riscos': 670 TESTFN = 'testfile' 671else: 672 TESTFN = '@test' 673 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 674 if have_unicode: 675 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 676 # TESTFN_UNICODE is a filename that can be encoded using the 677 # file system encoding, but *not* with the default (ascii) encoding 678 if isinstance('', unicode): 679 # python -U 680 # XXX perhaps unicode() should accept Unicode strings? 681 TESTFN_UNICODE = "@test-\xe0\xf2" 682 else: 683 # 2 latin characters. 684 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 685 TESTFN_ENCODING = sys.getfilesystemencoding() 686 # TESTFN_UNENCODABLE is a filename that should *not* be 687 # able to be encoded by *either* the default or filesystem encoding. 688 # This test really only makes sense on Windows NT platforms 689 # which have special Unicode support in posixmodule. 690 if (not hasattr(sys, "getwindowsversion") or 691 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 692 TESTFN_UNENCODABLE = None 693 else: 694 # Japanese characters (I think - from bug 846133) 695 TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 696 try: 697 # XXX - Note - should be using TESTFN_ENCODING here - but for 698 # Windows, "mbcs" currently always operates as if in 699 # errors=ignore' mode - hence we get '?' characters rather than 700 # the exception. 'Latin1' operates as we expect - ie, fails. 701 # See [ 850997 ] mbcs encoding ignores errors 702 TESTFN_UNENCODABLE.encode("Latin1") 703 except UnicodeEncodeError: 704 pass 705 else: 706 print \ 707 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 708 'Unicode filename tests may not be effective' \ 709 % TESTFN_UNENCODABLE 710 711 712# Disambiguate TESTFN for parallel testing, while letting it remain a valid 713# module name. 714TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 715 716# Save the initial cwd 717SAVEDCWD = os.getcwd() 718 719@contextlib.contextmanager 720def temp_dir(path=None, quiet=False): 721 """Return a context manager that creates a temporary directory. 722 723 Arguments: 724 725 path: the directory to create temporarily. If omitted or None, 726 defaults to creating a temporary directory using tempfile.mkdtemp. 727 728 quiet: if False (the default), the context manager raises an exception 729 on error. Otherwise, if the path is specified and cannot be 730 created, only a warning is issued. 731 732 """ 733 dir_created = False 734 if path is None: 735 import tempfile 736 path = tempfile.mkdtemp() 737 dir_created = True 738 path = os.path.realpath(path) 739 else: 740 if (have_unicode and isinstance(path, unicode) and 741 not os.path.supports_unicode_filenames): 742 try: 743 path = path.encode(sys.getfilesystemencoding() or 'ascii') 744 except UnicodeEncodeError: 745 if not quiet: 746 raise unittest.SkipTest('unable to encode the cwd name with ' 747 'the filesystem encoding.') 748 try: 749 os.mkdir(path) 750 dir_created = True 751 except OSError: 752 if not quiet: 753 raise 754 warnings.warn('tests may fail, unable to create temp dir: ' + path, 755 RuntimeWarning, stacklevel=3) 756 if dir_created: 757 pid = os.getpid() 758 try: 759 yield path 760 finally: 761 # In case the process forks, let only the parent remove the 762 # directory. The child has a diffent process id. (bpo-30028) 763 if dir_created and pid == os.getpid(): 764 rmtree(path) 765 766@contextlib.contextmanager 767def change_cwd(path, quiet=False): 768 """Return a context manager that changes the current working directory. 769 770 Arguments: 771 772 path: the directory to use as the temporary current working directory. 773 774 quiet: if False (the default), the context manager raises an exception 775 on error. Otherwise, it issues only a warning and keeps the current 776 working directory the same. 777 778 """ 779 saved_dir = os.getcwd() 780 try: 781 os.chdir(path) 782 except OSError: 783 if not quiet: 784 raise 785 warnings.warn('tests may fail, unable to change CWD to: ' + path, 786 RuntimeWarning, stacklevel=3) 787 try: 788 yield os.getcwd() 789 finally: 790 os.chdir(saved_dir) 791 792 793@contextlib.contextmanager 794def temp_cwd(name='tempcwd', quiet=False): 795 """ 796 Context manager that temporarily creates and changes the CWD. 797 798 The function temporarily changes the current working directory 799 after creating a temporary directory in the current directory with 800 name *name*. If *name* is None, the temporary directory is 801 created using tempfile.mkdtemp. 802 803 If *quiet* is False (default) and it is not possible to 804 create or change the CWD, an error is raised. If *quiet* is True, 805 only a warning is raised and the original CWD is used. 806 807 """ 808 with temp_dir(path=name, quiet=quiet) as temp_path: 809 with change_cwd(temp_path, quiet=quiet) as cwd_dir: 810 yield cwd_dir 811 812# TEST_HOME_DIR refers to the top level directory of the "test" package 813# that contains Python's regression test suite 814TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__)) 815TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR) 816 817# TEST_DATA_DIR is used as a target download location for remote resources 818TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data") 819 820def findfile(file, subdir=None): 821 """Try to find a file on sys.path and the working directory. If it is not 822 found the argument passed to the function is returned (this does not 823 necessarily signal failure; could still be the legitimate path).""" 824 if os.path.isabs(file): 825 return file 826 if subdir is not None: 827 file = os.path.join(subdir, file) 828 path = [TEST_HOME_DIR] + sys.path 829 for dn in path: 830 fn = os.path.join(dn, file) 831 if os.path.exists(fn): return fn 832 return file 833 834def sortdict(dict): 835 "Like repr(dict), but in sorted order." 836 items = dict.items() 837 items.sort() 838 reprpairs = ["%r: %r" % pair for pair in items] 839 withcommas = ", ".join(reprpairs) 840 return "{%s}" % withcommas 841 842def make_bad_fd(): 843 """ 844 Create an invalid file descriptor by opening and closing a file and return 845 its fd. 846 """ 847 file = open(TESTFN, "wb") 848 try: 849 return file.fileno() 850 finally: 851 file.close() 852 unlink(TESTFN) 853 854def check_syntax_error(testcase, statement, errtext='', lineno=None, offset=None): 855 with testcase.assertRaisesRegexp(SyntaxError, errtext) as cm: 856 compile(statement, '<test string>', 'exec') 857 err = cm.exception 858 if lineno is not None: 859 testcase.assertEqual(err.lineno, lineno) 860 if offset is not None: 861 testcase.assertEqual(err.offset, offset) 862 863def open_urlresource(url, check=None): 864 import urlparse, urllib2 865 866 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 867 868 fn = os.path.join(TEST_DATA_DIR, filename) 869 870 def check_valid_file(fn): 871 f = open(fn) 872 if check is None: 873 return f 874 elif check(f): 875 f.seek(0) 876 return f 877 f.close() 878 879 if os.path.exists(fn): 880 f = check_valid_file(fn) 881 if f is not None: 882 return f 883 unlink(fn) 884 885 # Verify the requirement before downloading the file 886 requires('urlfetch') 887 888 print >> get_original_stdout(), '\tfetching %s ...' % url 889 f = urllib2.urlopen(url, timeout=15) 890 try: 891 with open(fn, "wb") as out: 892 s = f.read() 893 while s: 894 out.write(s) 895 s = f.read() 896 finally: 897 f.close() 898 899 f = check_valid_file(fn) 900 if f is not None: 901 return f 902 raise TestFailed('invalid resource "%s"' % fn) 903 904 905class WarningsRecorder(object): 906 """Convenience wrapper for the warnings list returned on 907 entry to the warnings.catch_warnings() context manager. 908 """ 909 def __init__(self, warnings_list): 910 self._warnings = warnings_list 911 self._last = 0 912 913 def __getattr__(self, attr): 914 if len(self._warnings) > self._last: 915 return getattr(self._warnings[-1], attr) 916 elif attr in warnings.WarningMessage._WARNING_DETAILS: 917 return None 918 raise AttributeError("%r has no attribute %r" % (self, attr)) 919 920 @property 921 def warnings(self): 922 return self._warnings[self._last:] 923 924 def reset(self): 925 self._last = len(self._warnings) 926 927 928def _filterwarnings(filters, quiet=False): 929 """Catch the warnings, then check if all the expected 930 warnings have been raised and re-raise unexpected warnings. 931 If 'quiet' is True, only re-raise the unexpected warnings. 932 """ 933 # Clear the warning registry of the calling module 934 # in order to re-raise the warnings. 935 frame = sys._getframe(2) 936 registry = frame.f_globals.get('__warningregistry__') 937 if registry: 938 registry.clear() 939 with warnings.catch_warnings(record=True) as w: 940 # Set filter "always" to record all warnings. Because 941 # test_warnings swap the module, we need to look up in 942 # the sys.modules dictionary. 943 sys.modules['warnings'].simplefilter("always") 944 yield WarningsRecorder(w) 945 # Filter the recorded warnings 946 reraise = [warning.message for warning in w] 947 missing = [] 948 for msg, cat in filters: 949 seen = False 950 for exc in reraise[:]: 951 message = str(exc) 952 # Filter out the matching messages 953 if (re.match(msg, message, re.I) and 954 issubclass(exc.__class__, cat)): 955 seen = True 956 reraise.remove(exc) 957 if not seen and not quiet: 958 # This filter caught nothing 959 missing.append((msg, cat.__name__)) 960 if reraise: 961 raise AssertionError("unhandled warning %r" % reraise[0]) 962 if missing: 963 raise AssertionError("filter (%r, %s) did not catch any warning" % 964 missing[0]) 965 966 967@contextlib.contextmanager 968def check_warnings(*filters, **kwargs): 969 """Context manager to silence warnings. 970 971 Accept 2-tuples as positional arguments: 972 ("message regexp", WarningCategory) 973 974 Optional argument: 975 - if 'quiet' is True, it does not fail if a filter catches nothing 976 (default True without argument, 977 default False if some filters are defined) 978 979 Without argument, it defaults to: 980 check_warnings(("", Warning), quiet=True) 981 """ 982 quiet = kwargs.get('quiet') 983 if not filters: 984 filters = (("", Warning),) 985 # Preserve backward compatibility 986 if quiet is None: 987 quiet = True 988 return _filterwarnings(filters, quiet) 989 990 991@contextlib.contextmanager 992def check_py3k_warnings(*filters, **kwargs): 993 """Context manager to silence py3k warnings. 994 995 Accept 2-tuples as positional arguments: 996 ("message regexp", WarningCategory) 997 998 Optional argument: 999 - if 'quiet' is True, it does not fail if a filter catches nothing 1000 (default False) 1001 1002 Without argument, it defaults to: 1003 check_py3k_warnings(("", DeprecationWarning), quiet=False) 1004 """ 1005 if sys.py3kwarning: 1006 if not filters: 1007 filters = (("", DeprecationWarning),) 1008 else: 1009 # It should not raise any py3k warning 1010 filters = () 1011 return _filterwarnings(filters, kwargs.get('quiet')) 1012 1013 1014class CleanImport(object): 1015 """Context manager to force import to return a new module reference. 1016 1017 This is useful for testing module-level behaviours, such as 1018 the emission of a DeprecationWarning on import. 1019 1020 Use like this: 1021 1022 with CleanImport("foo"): 1023 importlib.import_module("foo") # new reference 1024 """ 1025 1026 def __init__(self, *module_names): 1027 self.original_modules = sys.modules.copy() 1028 for module_name in module_names: 1029 if module_name in sys.modules: 1030 module = sys.modules[module_name] 1031 # It is possible that module_name is just an alias for 1032 # another module (e.g. stub for modules renamed in 3.x). 1033 # In that case, we also need delete the real module to clear 1034 # the import cache. 1035 if module.__name__ != module_name: 1036 del sys.modules[module.__name__] 1037 del sys.modules[module_name] 1038 1039 def __enter__(self): 1040 return self 1041 1042 def __exit__(self, *ignore_exc): 1043 sys.modules.update(self.original_modules) 1044 1045 1046class EnvironmentVarGuard(UserDict.DictMixin): 1047 1048 """Class to help protect the environment variable properly. Can be used as 1049 a context manager.""" 1050 1051 def __init__(self): 1052 self._environ = os.environ 1053 self._changed = {} 1054 1055 def __getitem__(self, envvar): 1056 return self._environ[envvar] 1057 1058 def __setitem__(self, envvar, value): 1059 # Remember the initial value on the first access 1060 if envvar not in self._changed: 1061 self._changed[envvar] = self._environ.get(envvar) 1062 self._environ[envvar] = value 1063 1064 def __delitem__(self, envvar): 1065 # Remember the initial value on the first access 1066 if envvar not in self._changed: 1067 self._changed[envvar] = self._environ.get(envvar) 1068 if envvar in self._environ: 1069 del self._environ[envvar] 1070 1071 def keys(self): 1072 return self._environ.keys() 1073 1074 def set(self, envvar, value): 1075 self[envvar] = value 1076 1077 def unset(self, envvar): 1078 del self[envvar] 1079 1080 def __enter__(self): 1081 return self 1082 1083 def __exit__(self, *ignore_exc): 1084 for (k, v) in self._changed.items(): 1085 if v is None: 1086 if k in self._environ: 1087 del self._environ[k] 1088 else: 1089 self._environ[k] = v 1090 os.environ = self._environ 1091 1092 1093class DirsOnSysPath(object): 1094 """Context manager to temporarily add directories to sys.path. 1095 1096 This makes a copy of sys.path, appends any directories given 1097 as positional arguments, then reverts sys.path to the copied 1098 settings when the context ends. 1099 1100 Note that *all* sys.path modifications in the body of the 1101 context manager, including replacement of the object, 1102 will be reverted at the end of the block. 1103 """ 1104 1105 def __init__(self, *paths): 1106 self.original_value = sys.path[:] 1107 self.original_object = sys.path 1108 sys.path.extend(paths) 1109 1110 def __enter__(self): 1111 return self 1112 1113 def __exit__(self, *ignore_exc): 1114 sys.path = self.original_object 1115 sys.path[:] = self.original_value 1116 1117 1118class TransientResource(object): 1119 1120 """Raise ResourceDenied if an exception is raised while the context manager 1121 is in effect that matches the specified exception and attributes.""" 1122 1123 def __init__(self, exc, **kwargs): 1124 self.exc = exc 1125 self.attrs = kwargs 1126 1127 def __enter__(self): 1128 return self 1129 1130 def __exit__(self, type_=None, value=None, traceback=None): 1131 """If type_ is a subclass of self.exc and value has attributes matching 1132 self.attrs, raise ResourceDenied. Otherwise let the exception 1133 propagate (if any).""" 1134 if type_ is not None and issubclass(self.exc, type_): 1135 for attr, attr_value in self.attrs.iteritems(): 1136 if not hasattr(value, attr): 1137 break 1138 if getattr(value, attr) != attr_value: 1139 break 1140 else: 1141 raise ResourceDenied("an optional resource is not available") 1142 1143 1144@contextlib.contextmanager 1145def transient_internet(resource_name, timeout=30.0, errnos=()): 1146 """Return a context manager that raises ResourceDenied when various issues 1147 with the Internet connection manifest themselves as exceptions.""" 1148 default_errnos = [ 1149 ('ECONNREFUSED', 111), 1150 ('ECONNRESET', 104), 1151 ('EHOSTUNREACH', 113), 1152 ('ENETUNREACH', 101), 1153 ('ETIMEDOUT', 110), 1154 ] 1155 default_gai_errnos = [ 1156 ('EAI_AGAIN', -3), 1157 ('EAI_FAIL', -4), 1158 ('EAI_NONAME', -2), 1159 ('EAI_NODATA', -5), 1160 # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo() 1161 # implementation actually returns WSANO_DATA i.e. 11004. 1162 ('WSANO_DATA', 11004), 1163 ] 1164 1165 denied = ResourceDenied("Resource '%s' is not available" % resource_name) 1166 captured_errnos = errnos 1167 gai_errnos = [] 1168 if not captured_errnos: 1169 captured_errnos = [getattr(errno, name, num) 1170 for (name, num) in default_errnos] 1171 gai_errnos = [getattr(socket, name, num) 1172 for (name, num) in default_gai_errnos] 1173 1174 def filter_error(err): 1175 n = getattr(err, 'errno', None) 1176 if (isinstance(err, socket.timeout) or 1177 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1178 n in captured_errnos): 1179 if not verbose: 1180 sys.stderr.write(denied.args[0] + "\n") 1181 raise denied 1182 1183 old_timeout = socket.getdefaulttimeout() 1184 try: 1185 if timeout is not None: 1186 socket.setdefaulttimeout(timeout) 1187 yield 1188 except IOError as err: 1189 # urllib can wrap original socket errors multiple times (!), we must 1190 # unwrap to get at the original error. 1191 while True: 1192 a = err.args 1193 if len(a) >= 1 and isinstance(a[0], IOError): 1194 err = a[0] 1195 # The error can also be wrapped as args[1]: 1196 # except socket.error as msg: 1197 # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) 1198 elif len(a) >= 2 and isinstance(a[1], IOError): 1199 err = a[1] 1200 else: 1201 break 1202 filter_error(err) 1203 raise 1204 # XXX should we catch generic exceptions and look for their 1205 # __cause__ or __context__? 1206 finally: 1207 socket.setdefaulttimeout(old_timeout) 1208 1209 1210@contextlib.contextmanager 1211def captured_output(stream_name): 1212 """Return a context manager used by captured_stdout and captured_stdin 1213 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1214 import StringIO 1215 orig_stdout = getattr(sys, stream_name) 1216 setattr(sys, stream_name, StringIO.StringIO()) 1217 try: 1218 yield getattr(sys, stream_name) 1219 finally: 1220 setattr(sys, stream_name, orig_stdout) 1221 1222def captured_stdout(): 1223 """Capture the output of sys.stdout: 1224 1225 with captured_stdout() as s: 1226 print "hello" 1227 self.assertEqual(s.getvalue(), "hello") 1228 """ 1229 return captured_output("stdout") 1230 1231def captured_stderr(): 1232 return captured_output("stderr") 1233 1234def captured_stdin(): 1235 return captured_output("stdin") 1236 1237def gc_collect(): 1238 """Force as many objects as possible to be collected. 1239 1240 In non-CPython implementations of Python, this is needed because timely 1241 deallocation is not guaranteed by the garbage collector. (Even in CPython 1242 this can be the case in case of reference cycles.) This means that __del__ 1243 methods may be called later than expected and weakrefs may remain alive for 1244 longer than expected. This function tries its best to force all garbage 1245 objects to disappear. 1246 """ 1247 gc.collect() 1248 if is_jython: 1249 time.sleep(0.1) 1250 gc.collect() 1251 gc.collect() 1252 1253 1254_header = '2P' 1255if hasattr(sys, "gettotalrefcount"): 1256 _header = '2P' + _header 1257_vheader = _header + 'P' 1258 1259def calcobjsize(fmt): 1260 return struct.calcsize(_header + fmt + '0P') 1261 1262def calcvobjsize(fmt): 1263 return struct.calcsize(_vheader + fmt + '0P') 1264 1265 1266_TPFLAGS_HAVE_GC = 1<<14 1267_TPFLAGS_HEAPTYPE = 1<<9 1268 1269def check_sizeof(test, o, size): 1270 import _testcapi 1271 result = sys.getsizeof(o) 1272 # add GC header size 1273 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1274 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1275 size += _testcapi.SIZEOF_PYGC_HEAD 1276 msg = 'wrong size for %s: got %d, expected %d' \ 1277 % (type(o), result, size) 1278 test.assertEqual(result, size, msg) 1279 1280 1281#======================================================================= 1282# Decorator for running a function in a different locale, correctly resetting 1283# it afterwards. 1284 1285def run_with_locale(catstr, *locales): 1286 def decorator(func): 1287 def inner(*args, **kwds): 1288 try: 1289 import locale 1290 category = getattr(locale, catstr) 1291 orig_locale = locale.setlocale(category) 1292 except AttributeError: 1293 # if the test author gives us an invalid category string 1294 raise 1295 except: 1296 # cannot retrieve original locale, so do nothing 1297 locale = orig_locale = None 1298 else: 1299 for loc in locales: 1300 try: 1301 locale.setlocale(category, loc) 1302 break 1303 except: 1304 pass 1305 1306 # now run the function, resetting the locale on exceptions 1307 try: 1308 return func(*args, **kwds) 1309 finally: 1310 if locale and orig_locale: 1311 locale.setlocale(category, orig_locale) 1312 inner.func_name = func.func_name 1313 inner.__doc__ = func.__doc__ 1314 return inner 1315 return decorator 1316 1317#======================================================================= 1318# Decorator for running a function in a specific timezone, correctly 1319# resetting it afterwards. 1320 1321def run_with_tz(tz): 1322 def decorator(func): 1323 def inner(*args, **kwds): 1324 try: 1325 tzset = time.tzset 1326 except AttributeError: 1327 raise unittest.SkipTest("tzset required") 1328 if 'TZ' in os.environ: 1329 orig_tz = os.environ['TZ'] 1330 else: 1331 orig_tz = None 1332 os.environ['TZ'] = tz 1333 tzset() 1334 1335 # now run the function, resetting the tz on exceptions 1336 try: 1337 return func(*args, **kwds) 1338 finally: 1339 if orig_tz is None: 1340 del os.environ['TZ'] 1341 else: 1342 os.environ['TZ'] = orig_tz 1343 time.tzset() 1344 1345 inner.__name__ = func.__name__ 1346 inner.__doc__ = func.__doc__ 1347 return inner 1348 return decorator 1349 1350#======================================================================= 1351# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 1352 1353# Some handy shorthands. Note that these are used for byte-limits as well 1354# as size-limits, in the various bigmem tests 1355_1M = 1024*1024 1356_1G = 1024 * _1M 1357_2G = 2 * _1G 1358_4G = 4 * _1G 1359 1360MAX_Py_ssize_t = sys.maxsize 1361 1362def set_memlimit(limit): 1363 global max_memuse 1364 global real_max_memuse 1365 sizes = { 1366 'k': 1024, 1367 'm': _1M, 1368 'g': _1G, 1369 't': 1024*_1G, 1370 } 1371 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1372 re.IGNORECASE | re.VERBOSE) 1373 if m is None: 1374 raise ValueError('Invalid memory limit %r' % (limit,)) 1375 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1376 real_max_memuse = memlimit 1377 if memlimit > MAX_Py_ssize_t: 1378 memlimit = MAX_Py_ssize_t 1379 if memlimit < _2G - 1: 1380 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1381 max_memuse = memlimit 1382 1383def bigmemtest(minsize, memuse, overhead=5*_1M): 1384 """Decorator for bigmem tests. 1385 1386 'minsize' is the minimum useful size for the test (in arbitrary, 1387 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 1388 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 1389 independent of the testsize, and defaults to 5Mb. 1390 1391 The decorator tries to guess a good value for 'size' and passes it to 1392 the decorated test function. If minsize * memuse is more than the 1393 allowed memory use (as defined by max_memuse), the test is skipped. 1394 Otherwise, minsize is adjusted upward to use up to max_memuse. 1395 """ 1396 def decorator(f): 1397 def wrapper(self): 1398 if not max_memuse: 1399 # If max_memuse is 0 (the default), 1400 # we still want to run the tests with size set to a few kb, 1401 # to make sure they work. We still want to avoid using 1402 # too much memory, though, but we do that noisily. 1403 maxsize = 5147 1404 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) 1405 else: 1406 maxsize = int((max_memuse - overhead) / memuse) 1407 if maxsize < minsize: 1408 # Really ought to print 'test skipped' or something 1409 if verbose: 1410 sys.stderr.write("Skipping %s because of memory " 1411 "constraint\n" % (f.__name__,)) 1412 return 1413 # Try to keep some breathing room in memory use 1414 maxsize = max(maxsize - 50 * _1M, minsize) 1415 return f(self, maxsize) 1416 wrapper.minsize = minsize 1417 wrapper.memuse = memuse 1418 wrapper.overhead = overhead 1419 return wrapper 1420 return decorator 1421 1422def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True): 1423 def decorator(f): 1424 def wrapper(self): 1425 if not real_max_memuse: 1426 maxsize = 5147 1427 else: 1428 maxsize = size 1429 1430 if ((real_max_memuse or not dry_run) 1431 and real_max_memuse < maxsize * memuse): 1432 if verbose: 1433 sys.stderr.write("Skipping %s because of memory " 1434 "constraint\n" % (f.__name__,)) 1435 return 1436 1437 return f(self, maxsize) 1438 wrapper.size = size 1439 wrapper.memuse = memuse 1440 wrapper.overhead = overhead 1441 return wrapper 1442 return decorator 1443 1444def bigaddrspacetest(f): 1445 """Decorator for tests that fill the address space.""" 1446 def wrapper(self): 1447 if max_memuse < MAX_Py_ssize_t: 1448 if verbose: 1449 sys.stderr.write("Skipping %s because of memory " 1450 "constraint\n" % (f.__name__,)) 1451 else: 1452 return f(self) 1453 return wrapper 1454 1455#======================================================================= 1456# unittest integration. 1457 1458class BasicTestRunner: 1459 def run(self, test): 1460 result = unittest.TestResult() 1461 test(result) 1462 return result 1463 1464def _id(obj): 1465 return obj 1466 1467def requires_resource(resource): 1468 if resource == 'gui' and not _is_gui_available(): 1469 return unittest.skip(_is_gui_available.reason) 1470 if is_resource_enabled(resource): 1471 return _id 1472 else: 1473 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1474 1475def cpython_only(test): 1476 """ 1477 Decorator for tests only applicable on CPython. 1478 """ 1479 return impl_detail(cpython=True)(test) 1480 1481def impl_detail(msg=None, **guards): 1482 if check_impl_detail(**guards): 1483 return _id 1484 if msg is None: 1485 guardnames, default = _parse_guards(guards) 1486 if default: 1487 msg = "implementation detail not available on {0}" 1488 else: 1489 msg = "implementation detail specific to {0}" 1490 guardnames = sorted(guardnames.keys()) 1491 msg = msg.format(' or '.join(guardnames)) 1492 return unittest.skip(msg) 1493 1494def _parse_guards(guards): 1495 # Returns a tuple ({platform_name: run_me}, default_value) 1496 if not guards: 1497 return ({'cpython': True}, False) 1498 is_true = guards.values()[0] 1499 assert guards.values() == [is_true] * len(guards) # all True or all False 1500 return (guards, not is_true) 1501 1502# Use the following check to guard CPython's implementation-specific tests -- 1503# or to run them only on the implementation(s) guarded by the arguments. 1504def check_impl_detail(**guards): 1505 """This function returns True or False depending on the host platform. 1506 Examples: 1507 if check_impl_detail(): # only on CPython (default) 1508 if check_impl_detail(jython=True): # only on Jython 1509 if check_impl_detail(cpython=False): # everywhere except on CPython 1510 """ 1511 guards, default = _parse_guards(guards) 1512 return guards.get(platform.python_implementation().lower(), default) 1513 1514 1515def _filter_suite(suite, pred): 1516 """Recursively filter test cases in a suite based on a predicate.""" 1517 newtests = [] 1518 for test in suite._tests: 1519 if isinstance(test, unittest.TestSuite): 1520 _filter_suite(test, pred) 1521 newtests.append(test) 1522 else: 1523 if pred(test): 1524 newtests.append(test) 1525 suite._tests = newtests 1526 1527def _run_suite(suite): 1528 """Run tests from a unittest.TestSuite-derived class.""" 1529 if verbose: 1530 runner = unittest.TextTestRunner(sys.stdout, verbosity=2, 1531 failfast=failfast) 1532 else: 1533 runner = BasicTestRunner() 1534 1535 result = runner.run(suite) 1536 if not result.wasSuccessful(): 1537 if len(result.errors) == 1 and not result.failures: 1538 err = result.errors[0][1] 1539 elif len(result.failures) == 1 and not result.errors: 1540 err = result.failures[0][1] 1541 else: 1542 err = "multiple errors occurred" 1543 if not verbose: 1544 err += "; run in verbose mode for details" 1545 raise TestFailed(err) 1546 1547 1548# By default, don't filter tests 1549_match_test_func = None 1550_match_test_patterns = None 1551 1552 1553def match_test(test): 1554 # Function used by support.run_unittest() and regrtest --list-cases 1555 if _match_test_func is None: 1556 return True 1557 else: 1558 return _match_test_func(test.id()) 1559 1560 1561def _is_full_match_test(pattern): 1562 # If a pattern contains at least one dot, it's considered 1563 # as a full test identifier. 1564 # Example: 'test.test_os.FileTests.test_access'. 1565 # 1566 # Reject patterns which contain fnmatch patterns: '*', '?', '[...]' 1567 # or '[!...]'. For example, reject 'test_access*'. 1568 return ('.' in pattern) and (not re.search(r'[?*\[\]]', pattern)) 1569 1570 1571def set_match_tests(patterns): 1572 global _match_test_func, _match_test_patterns 1573 1574 if patterns == _match_test_patterns: 1575 # No change: no need to recompile patterns. 1576 return 1577 1578 if not patterns: 1579 func = None 1580 # set_match_tests(None) behaves as set_match_tests(()) 1581 patterns = () 1582 elif all(map(_is_full_match_test, patterns)): 1583 # Simple case: all patterns are full test identifier. 1584 # The test.bisect utility only uses such full test identifiers. 1585 func = set(patterns).__contains__ 1586 else: 1587 regex = '|'.join(map(fnmatch.translate, patterns)) 1588 # The search *is* case sensitive on purpose: 1589 # don't use flags=re.IGNORECASE 1590 regex_match = re.compile(regex).match 1591 1592 def match_test_regex(test_id): 1593 if regex_match(test_id): 1594 # The regex matchs the whole identifier like 1595 # 'test.test_os.FileTests.test_access' 1596 return True 1597 else: 1598 # Try to match parts of the test identifier. 1599 # For example, split 'test.test_os.FileTests.test_access' 1600 # into: 'test', 'test_os', 'FileTests' and 'test_access'. 1601 return any(map(regex_match, test_id.split("."))) 1602 1603 func = match_test_regex 1604 1605 # Create a copy since patterns can be mutable and so modified later 1606 _match_test_patterns = tuple(patterns) 1607 _match_test_func = func 1608 1609 1610 1611def run_unittest(*classes): 1612 """Run tests from unittest.TestCase-derived classes.""" 1613 valid_types = (unittest.TestSuite, unittest.TestCase) 1614 suite = unittest.TestSuite() 1615 for cls in classes: 1616 if isinstance(cls, str): 1617 if cls in sys.modules: 1618 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1619 else: 1620 raise ValueError("str arguments must be keys in sys.modules") 1621 elif isinstance(cls, valid_types): 1622 suite.addTest(cls) 1623 else: 1624 suite.addTest(unittest.makeSuite(cls)) 1625 _filter_suite(suite, match_test) 1626 _run_suite(suite) 1627 1628#======================================================================= 1629# Check for the presence of docstrings. 1630 1631HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or 1632 sys.platform == 'win32' or 1633 sysconfig.get_config_var('WITH_DOC_STRINGS')) 1634 1635requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1636 "test requires docstrings") 1637 1638 1639#======================================================================= 1640# doctest driver. 1641 1642def run_doctest(module, verbosity=None): 1643 """Run doctest on the given module. Return (#failures, #tests). 1644 1645 If optional argument verbosity is not specified (or is None), pass 1646 test.support's belief about verbosity on to doctest. Else doctest's 1647 usual behavior is used (it searches sys.argv for -v). 1648 """ 1649 1650 import doctest 1651 1652 if verbosity is None: 1653 verbosity = verbose 1654 else: 1655 verbosity = None 1656 1657 # Direct doctest output (normally just errors) to real stdout; doctest 1658 # output shouldn't be compared by regrtest. 1659 save_stdout = sys.stdout 1660 sys.stdout = get_original_stdout() 1661 try: 1662 f, t = doctest.testmod(module, verbose=verbosity) 1663 if f: 1664 raise TestFailed("%d of %d doctests failed" % (f, t)) 1665 finally: 1666 sys.stdout = save_stdout 1667 if verbose: 1668 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 1669 return f, t 1670 1671#======================================================================= 1672# Threading support to prevent reporting refleaks when running regrtest.py -R 1673 1674# Flag used by saved_test_environment of test.libregrtest.save_env, 1675# to check if a test modified the environment. The flag should be set to False 1676# before running a new test. 1677# 1678# For example, threading_cleanup() sets the flag is the function fails 1679# to cleanup threads. 1680environment_altered = False 1681 1682# NOTE: we use thread._count() rather than threading.enumerate() (or the 1683# moral equivalent thereof) because a threading.Thread object is still alive 1684# until its __bootstrap() method has returned, even after it has been 1685# unregistered from the threading module. 1686# thread._count(), on the other hand, only gets decremented *after* the 1687# __bootstrap() method has returned, which gives us reliable reference counts 1688# at the end of a test run. 1689 1690def threading_setup(): 1691 if thread: 1692 return thread._count(), 1693 else: 1694 return 1, 1695 1696def threading_cleanup(nb_threads): 1697 if not thread: 1698 return 1699 1700 _MAX_COUNT = 10 1701 for count in range(_MAX_COUNT): 1702 n = thread._count() 1703 if n == nb_threads: 1704 break 1705 time.sleep(0.1) 1706 # XXX print a warning in case of failure? 1707 1708def reap_threads(func): 1709 """Use this function when threads are being used. This will 1710 ensure that the threads are cleaned up even when the test fails. 1711 If threading is unavailable this function does nothing. 1712 """ 1713 if not thread: 1714 return func 1715 1716 @functools.wraps(func) 1717 def decorator(*args): 1718 key = threading_setup() 1719 try: 1720 return func(*args) 1721 finally: 1722 threading_cleanup(*key) 1723 return decorator 1724 1725 1726@contextlib.contextmanager 1727def wait_threads_exit(timeout=60.0): 1728 """ 1729 bpo-31234: Context manager to wait until all threads created in the with 1730 statement exit. 1731 1732 Use thread.count() to check if threads exited. Indirectly, wait until 1733 threads exit the internal t_bootstrap() C function of the thread module. 1734 1735 threading_setup() and threading_cleanup() are designed to emit a warning 1736 if a test leaves running threads in the background. This context manager 1737 is designed to cleanup threads started by the thread.start_new_thread() 1738 which doesn't allow to wait for thread exit, whereas thread.Thread has a 1739 join() method. 1740 """ 1741 old_count = thread._count() 1742 try: 1743 yield 1744 finally: 1745 start_time = time.time() 1746 deadline = start_time + timeout 1747 while True: 1748 count = thread._count() 1749 if count <= old_count: 1750 break 1751 if time.time() > deadline: 1752 dt = time.time() - start_time 1753 msg = ("wait_threads() failed to cleanup %s " 1754 "threads after %.1f seconds " 1755 "(count: %s, old count: %s)" 1756 % (count - old_count, dt, count, old_count)) 1757 raise AssertionError(msg) 1758 time.sleep(0.010) 1759 gc_collect() 1760 1761 1762def reap_children(): 1763 """Use this function at the end of test_main() whenever sub-processes 1764 are started. This will help ensure that no extra children (zombies) 1765 stick around to hog resources and create problems when looking 1766 for refleaks. 1767 """ 1768 1769 # Reap all our dead child processes so we don't leave zombies around. 1770 # These hog resources and might be causing some of the buildbots to die. 1771 if hasattr(os, 'waitpid'): 1772 any_process = -1 1773 while True: 1774 try: 1775 # This will raise an exception on Windows. That's ok. 1776 pid, status = os.waitpid(any_process, os.WNOHANG) 1777 if pid == 0: 1778 break 1779 except: 1780 break 1781 1782@contextlib.contextmanager 1783def start_threads(threads, unlock=None): 1784 threads = list(threads) 1785 started = [] 1786 try: 1787 try: 1788 for t in threads: 1789 t.start() 1790 started.append(t) 1791 except: 1792 if verbose: 1793 print("Can't start %d threads, only %d threads started" % 1794 (len(threads), len(started))) 1795 raise 1796 yield 1797 finally: 1798 if unlock: 1799 unlock() 1800 endtime = starttime = time.time() 1801 for timeout in range(1, 16): 1802 endtime += 60 1803 for t in started: 1804 t.join(max(endtime - time.time(), 0.01)) 1805 started = [t for t in started if t.isAlive()] 1806 if not started: 1807 break 1808 if verbose: 1809 print('Unable to join %d threads during a period of ' 1810 '%d minutes' % (len(started), timeout)) 1811 started = [t for t in started if t.isAlive()] 1812 if started: 1813 raise AssertionError('Unable to join %d threads' % len(started)) 1814 1815@contextlib.contextmanager 1816def swap_attr(obj, attr, new_val): 1817 """Temporary swap out an attribute with a new object. 1818 1819 Usage: 1820 with swap_attr(obj, "attr", 5): 1821 ... 1822 1823 This will set obj.attr to 5 for the duration of the with: block, 1824 restoring the old value at the end of the block. If `attr` doesn't 1825 exist on `obj`, it will be created and then deleted at the end of the 1826 block. 1827 1828 The old value (or None if it doesn't exist) will be assigned to the 1829 target of the "as" clause, if there is one. 1830 """ 1831 if hasattr(obj, attr): 1832 real_val = getattr(obj, attr) 1833 setattr(obj, attr, new_val) 1834 try: 1835 yield real_val 1836 finally: 1837 setattr(obj, attr, real_val) 1838 else: 1839 setattr(obj, attr, new_val) 1840 try: 1841 yield 1842 finally: 1843 if hasattr(obj, attr): 1844 delattr(obj, attr) 1845 1846@contextlib.contextmanager 1847def swap_item(obj, item, new_val): 1848 """Temporary swap out an item with a new object. 1849 1850 Usage: 1851 with swap_item(obj, "item", 5): 1852 ... 1853 1854 This will set obj["item"] to 5 for the duration of the with: block, 1855 restoring the old value at the end of the block. If `item` doesn't 1856 exist on `obj`, it will be created and then deleted at the end of the 1857 block. 1858 1859 The old value (or None if it doesn't exist) will be assigned to the 1860 target of the "as" clause, if there is one. 1861 """ 1862 if item in obj: 1863 real_val = obj[item] 1864 obj[item] = new_val 1865 try: 1866 yield real_val 1867 finally: 1868 obj[item] = real_val 1869 else: 1870 obj[item] = new_val 1871 try: 1872 yield 1873 finally: 1874 if item in obj: 1875 del obj[item] 1876 1877def py3k_bytes(b): 1878 """Emulate the py3k bytes() constructor. 1879 1880 NOTE: This is only a best effort function. 1881 """ 1882 try: 1883 # memoryview? 1884 return b.tobytes() 1885 except AttributeError: 1886 try: 1887 # iterable of ints? 1888 return b"".join(chr(x) for x in b) 1889 except TypeError: 1890 return bytes(b) 1891 1892requires_type_collecting = unittest.skipIf(hasattr(sys, 'getcounts'), 1893 'types are immortal if COUNT_ALLOCS is defined') 1894 1895def args_from_interpreter_flags(): 1896 """Return a list of command-line arguments reproducing the current 1897 settings in sys.flags.""" 1898 import subprocess 1899 return subprocess._args_from_interpreter_flags() 1900 1901def strip_python_stderr(stderr): 1902 """Strip the stderr of a Python process from potential debug output 1903 emitted by the interpreter. 1904 1905 This will typically be run on the result of the communicate() method 1906 of a subprocess.Popen object. 1907 """ 1908 stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() 1909 return stderr 1910 1911 1912def check_free_after_iterating(test, iter, cls, args=()): 1913 class A(cls): 1914 def __del__(self): 1915 done[0] = True 1916 try: 1917 next(it) 1918 except StopIteration: 1919 pass 1920 1921 done = [False] 1922 it = iter(A(*args)) 1923 # Issue 26494: Shouldn't crash 1924 test.assertRaises(StopIteration, next, it) 1925 # The sequence should be deallocated just after the end of iterating 1926 gc_collect() 1927 test.assertTrue(done[0]) 1928 1929@contextlib.contextmanager 1930def disable_gc(): 1931 have_gc = gc.isenabled() 1932 gc.disable() 1933 try: 1934 yield 1935 finally: 1936 if have_gc: 1937 gc.enable() 1938 1939 1940def python_is_optimized(): 1941 """Find if Python was built with optimizations.""" 1942 cflags = sysconfig.get_config_var('PY_CFLAGS') or '' 1943 final_opt = "" 1944 for opt in cflags.split(): 1945 if opt.startswith('-O'): 1946 final_opt = opt 1947 return final_opt not in ('', '-O0', '-Og') 1948 1949 1950class SuppressCrashReport: 1951 """Try to prevent a crash report from popping up. 1952 1953 On Windows, don't display the Windows Error Reporting dialog. On UNIX, 1954 disable the creation of coredump file. 1955 """ 1956 old_value = None 1957 old_modes = None 1958 1959 def __enter__(self): 1960 """On Windows, disable Windows Error Reporting dialogs using 1961 SetErrorMode. 1962 1963 On UNIX, try to save the previous core file size limit, then set 1964 soft limit to 0. 1965 """ 1966 if sys.platform.startswith('win'): 1967 # see http://msdn.microsoft.com/en-us/library/windows/desktop/ms680621.aspx 1968 # GetErrorMode is not available on Windows XP and Windows Server 2003, 1969 # but SetErrorMode returns the previous value, so we can use that 1970 import ctypes 1971 self._k32 = ctypes.windll.kernel32 1972 SEM_NOGPFAULTERRORBOX = 0x02 1973 self.old_value = self._k32.SetErrorMode(SEM_NOGPFAULTERRORBOX) 1974 self._k32.SetErrorMode(self.old_value | SEM_NOGPFAULTERRORBOX) 1975 1976 # Suppress assert dialogs in debug builds 1977 # (see http://bugs.python.org/issue23314) 1978 try: 1979 import _testcapi 1980 _testcapi.CrtSetReportMode 1981 except (AttributeError, ImportError): 1982 # no _testcapi or a release build 1983 pass 1984 else: 1985 self.old_modes = {} 1986 for report_type in [_testcapi.CRT_WARN, 1987 _testcapi.CRT_ERROR, 1988 _testcapi.CRT_ASSERT]: 1989 old_mode = _testcapi.CrtSetReportMode(report_type, 1990 _testcapi.CRTDBG_MODE_FILE) 1991 old_file = _testcapi.CrtSetReportFile(report_type, 1992 _testcapi.CRTDBG_FILE_STDERR) 1993 self.old_modes[report_type] = old_mode, old_file 1994 1995 else: 1996 try: 1997 import resource 1998 except ImportError: 1999 resource = None 2000 2001 if resource is not None: 2002 try: 2003 self.old_value = resource.getrlimit(resource.RLIMIT_CORE) 2004 resource.setrlimit(resource.RLIMIT_CORE, 2005 (0, self.old_value[1])) 2006 except (ValueError, OSError): 2007 pass 2008 2009 if sys.platform == 'darwin': 2010 # Check if the 'Crash Reporter' on OSX was configured 2011 # in 'Developer' mode and warn that it will get triggered 2012 # when it is. 2013 # 2014 # This assumes that this context manager is used in tests 2015 # that might trigger the next manager. 2016 import subprocess 2017 cmd = ['/usr/bin/defaults', 'read', 2018 'com.apple.CrashReporter', 'DialogType'] 2019 proc = subprocess.Popen(cmd, 2020 stdout=subprocess.PIPE, 2021 stderr=subprocess.PIPE) 2022 stdout = proc.communicate()[0] 2023 if stdout.strip() == b'developer': 2024 sys.stdout.write("this test triggers the Crash Reporter, " 2025 "that is intentional") 2026 sys.stdout.flush() 2027 2028 return self 2029 2030 def __exit__(self, *ignore_exc): 2031 """Restore Windows ErrorMode or core file behavior to initial value.""" 2032 if self.old_value is None: 2033 return 2034 2035 if sys.platform.startswith('win'): 2036 self._k32.SetErrorMode(self.old_value) 2037 2038 if self.old_modes: 2039 import _testcapi 2040 for report_type, (old_mode, old_file) in self.old_modes.items(): 2041 _testcapi.CrtSetReportMode(report_type, old_mode) 2042 _testcapi.CrtSetReportFile(report_type, old_file) 2043 else: 2044 import resource 2045 try: 2046 resource.setrlimit(resource.RLIMIT_CORE, self.old_value) 2047 except (ValueError, OSError): 2048 pass 2049 2050 2051def _crash_python(): 2052 """Deliberate crash of Python. 2053 2054 Python can be killed by a segmentation fault (SIGSEGV), a bus error 2055 (SIGBUS), or a different error depending on the platform. 2056 2057 Use SuppressCrashReport() to prevent a crash report from popping up. 2058 """ 2059 2060 import _testcapi 2061 with SuppressCrashReport(): 2062 _testcapi._read_null() 2063 2064 2065def fd_count(): 2066 """Count the number of open file descriptors. 2067 """ 2068 if sys.platform.startswith(('linux', 'freebsd')): 2069 try: 2070 names = os.listdir("/proc/self/fd") 2071 # Substract one because listdir() opens internally a file 2072 # descriptor to list the content of the /proc/self/fd/ directory. 2073 return len(names) - 1 2074 except OSError as exc: 2075 if exc.errno != errno.ENOENT: 2076 raise 2077 2078 MAXFD = 256 2079 if hasattr(os, 'sysconf'): 2080 try: 2081 MAXFD = os.sysconf("SC_OPEN_MAX") 2082 except OSError: 2083 pass 2084 2085 old_modes = None 2086 if sys.platform == 'win32': 2087 # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process 2088 # on invalid file descriptor if Python is compiled in debug mode 2089 try: 2090 import msvcrt 2091 msvcrt.CrtSetReportMode 2092 except (AttributeError, ImportError): 2093 # no msvcrt or a release build 2094 pass 2095 else: 2096 old_modes = {} 2097 for report_type in (msvcrt.CRT_WARN, 2098 msvcrt.CRT_ERROR, 2099 msvcrt.CRT_ASSERT): 2100 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type, 0) 2101 2102 try: 2103 count = 0 2104 for fd in range(MAXFD): 2105 try: 2106 # Prefer dup() over fstat(). fstat() can require input/output 2107 # whereas dup() doesn't. 2108 fd2 = os.dup(fd) 2109 except OSError as e: 2110 if e.errno != errno.EBADF: 2111 raise 2112 else: 2113 os.close(fd2) 2114 count += 1 2115 finally: 2116 if old_modes is not None: 2117 for report_type in (msvcrt.CRT_WARN, 2118 msvcrt.CRT_ERROR, 2119 msvcrt.CRT_ASSERT): 2120 msvcrt.CrtSetReportMode(report_type, old_modes[report_type]) 2121 2122 return count 2123 2124 2125class SaveSignals: 2126 """ 2127 Save an restore signal handlers. 2128 2129 This class is only able to save/restore signal handlers registered 2130 by the Python signal module: see bpo-13285 for "external" signal 2131 handlers. 2132 """ 2133 2134 def __init__(self): 2135 import signal 2136 self.signal = signal 2137 self.signals = list(range(1, signal.NSIG)) 2138 # SIGKILL and SIGSTOP signals cannot be ignored nor catched 2139 for signame in ('SIGKILL', 'SIGSTOP'): 2140 try: 2141 signum = getattr(signal, signame) 2142 except AttributeError: 2143 continue 2144 self.signals.remove(signum) 2145 self.handlers = {} 2146 2147 def save(self): 2148 for signum in self.signals: 2149 handler = self.signal.getsignal(signum) 2150 if handler is None: 2151 # getsignal() returns None if a signal handler was not 2152 # registered by the Python signal module, 2153 # and the handler is not SIG_DFL nor SIG_IGN. 2154 # 2155 # Ignore the signal: we cannot restore the handler. 2156 continue 2157 self.handlers[signum] = handler 2158 2159 def restore(self): 2160 for signum, handler in self.handlers.items(): 2161 self.signal.signal(signum, handler) 2162