1"""Supporting definitions for the Python regression tests.""" 2 3if __name__ != 'test.test_support': 4 raise ImportError('test_support must be imported from the test package') 5 6import contextlib 7import errno 8import functools 9import gc 10import socket 11import stat 12import sys 13import os 14import platform 15import shutil 16import warnings 17import unittest 18import importlib 19import UserDict 20import re 21import time 22import struct 23import sysconfig 24try: 25 import thread 26except ImportError: 27 thread = None 28 29__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module", 30 "verbose", "use_resources", "max_memuse", "record_original_stdout", 31 "get_original_stdout", "unload", "unlink", "rmtree", "forget", 32 "is_resource_enabled", "requires", "requires_mac_ver", 33 "find_unused_port", "bind_port", 34 "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ", 35 "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error", 36 "open_urlresource", "check_warnings", "check_py3k_warnings", 37 "CleanImport", "EnvironmentVarGuard", "captured_output", 38 "captured_stdout", "TransientResource", "transient_internet", 39 "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest", 40 "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup", 41 "threading_cleanup", "reap_threads", "start_threads", "cpython_only", 42 "check_impl_detail", "get_attribute", "py3k_bytes", 43 "import_fresh_module", "threading_cleanup", "reap_children", 44 "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"] 45 46class Error(Exception): 47 """Base class for regression test exceptions.""" 48 49class TestFailed(Error): 50 """Test failed.""" 51 52class ResourceDenied(unittest.SkipTest): 53 """Test skipped because it requested a disallowed resource. 54 55 This is raised when a test calls requires() for a resource that 56 has not been enabled. It is used to distinguish between expected 57 and unexpected skips. 58 """ 59 60@contextlib.contextmanager 61def _ignore_deprecated_imports(ignore=True): 62 """Context manager to suppress package and module deprecation 63 warnings when importing them. 64 65 If ignore is False, this context manager has no effect.""" 66 if ignore: 67 with warnings.catch_warnings(): 68 warnings.filterwarnings("ignore", ".+ (module|package)", 69 DeprecationWarning) 70 yield 71 else: 72 yield 73 74 75def import_module(name, deprecated=False): 76 """Import and return the module to be tested, raising SkipTest if 77 it is not available. 78 79 If deprecated is True, any module or package deprecation messages 80 will be suppressed.""" 81 with _ignore_deprecated_imports(deprecated): 82 try: 83 return importlib.import_module(name) 84 except ImportError, msg: 85 raise unittest.SkipTest(str(msg)) 86 87 88def _save_and_remove_module(name, orig_modules): 89 """Helper function to save and remove a module from sys.modules 90 91 Raise ImportError if the module can't be imported.""" 92 # try to import the module and raise an error if it can't be imported 93 if name not in sys.modules: 94 __import__(name) 95 del sys.modules[name] 96 for modname in list(sys.modules): 97 if modname == name or modname.startswith(name + '.'): 98 orig_modules[modname] = sys.modules[modname] 99 del sys.modules[modname] 100 101def _save_and_block_module(name, orig_modules): 102 """Helper function to save and block a module in sys.modules 103 104 Return True if the module was in sys.modules, False otherwise.""" 105 saved = True 106 try: 107 orig_modules[name] = sys.modules[name] 108 except KeyError: 109 saved = False 110 sys.modules[name] = None 111 return saved 112 113 114def import_fresh_module(name, fresh=(), blocked=(), deprecated=False): 115 """Imports and returns a module, deliberately bypassing the sys.modules cache 116 and importing a fresh copy of the module. Once the import is complete, 117 the sys.modules cache is restored to its original state. 118 119 Modules named in fresh are also imported anew if needed by the import. 120 If one of these modules can't be imported, None is returned. 121 122 Importing of modules named in blocked is prevented while the fresh import 123 takes place. 124 125 If deprecated is True, any module or package deprecation messages 126 will be suppressed.""" 127 # NOTE: test_heapq, test_json, and test_warnings include extra sanity 128 # checks to make sure that this utility function is working as expected 129 with _ignore_deprecated_imports(deprecated): 130 # Keep track of modules saved for later restoration as well 131 # as those which just need a blocking entry removed 132 orig_modules = {} 133 names_to_remove = [] 134 _save_and_remove_module(name, orig_modules) 135 try: 136 for fresh_name in fresh: 137 _save_and_remove_module(fresh_name, orig_modules) 138 for blocked_name in blocked: 139 if not _save_and_block_module(blocked_name, orig_modules): 140 names_to_remove.append(blocked_name) 141 fresh_module = importlib.import_module(name) 142 except ImportError: 143 fresh_module = None 144 finally: 145 for orig_name, module in orig_modules.items(): 146 sys.modules[orig_name] = module 147 for name_to_remove in names_to_remove: 148 del sys.modules[name_to_remove] 149 return fresh_module 150 151 152def get_attribute(obj, name): 153 """Get an attribute, raising SkipTest if AttributeError is raised.""" 154 try: 155 attribute = getattr(obj, name) 156 except AttributeError: 157 raise unittest.SkipTest("module %s has no attribute %s" % ( 158 obj.__name__, name)) 159 else: 160 return attribute 161 162 163verbose = 1 # Flag set to 0 by regrtest.py 164use_resources = None # Flag set to [] by regrtest.py 165max_memuse = 0 # Disable bigmem tests (they will still be run with 166 # small sizes, to make sure they work.) 167real_max_memuse = 0 168 169# _original_stdout is meant to hold stdout at the time regrtest began. 170# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever. 171# The point is to have some flavor of stdout the user can actually see. 172_original_stdout = None 173def record_original_stdout(stdout): 174 global _original_stdout 175 _original_stdout = stdout 176 177def get_original_stdout(): 178 return _original_stdout or sys.stdout 179 180def unload(name): 181 try: 182 del sys.modules[name] 183 except KeyError: 184 pass 185 186def _force_run(path, func, *args): 187 try: 188 return func(*args) 189 except EnvironmentError as err: 190 if verbose >= 2: 191 print('%s: %s' % (err.__class__.__name__, err)) 192 print('re-run %s%r' % (func.__name__, args)) 193 os.chmod(path, stat.S_IRWXU) 194 return func(*args) 195 196if sys.platform.startswith("win"): 197 def _waitfor(func, pathname, waitall=False): 198 # Perform the operation 199 func(pathname) 200 # Now setup the wait loop 201 if waitall: 202 dirname = pathname 203 else: 204 dirname, name = os.path.split(pathname) 205 dirname = dirname or '.' 206 # Check for `pathname` to be removed from the filesystem. 207 # The exponential backoff of the timeout amounts to a total 208 # of ~1 second after which the deletion is probably an error 209 # anyway. 210 # Testing on an i7@4.3GHz shows that usually only 1 iteration is 211 # required when contention occurs. 212 timeout = 0.001 213 while timeout < 1.0: 214 # Note we are only testing for the existence of the file(s) in 215 # the contents of the directory regardless of any security or 216 # access rights. If we have made it this far, we have sufficient 217 # permissions to do that much using Python's equivalent of the 218 # Windows API FindFirstFile. 219 # Other Windows APIs can fail or give incorrect results when 220 # dealing with files that are pending deletion. 221 L = os.listdir(dirname) 222 if not (L if waitall else name in L): 223 return 224 # Increase the timeout and try again 225 time.sleep(timeout) 226 timeout *= 2 227 warnings.warn('tests may fail, delete still pending for ' + pathname, 228 RuntimeWarning, stacklevel=4) 229 230 def _unlink(filename): 231 _waitfor(os.unlink, filename) 232 233 def _rmdir(dirname): 234 _waitfor(os.rmdir, dirname) 235 236 def _rmtree(path): 237 def _rmtree_inner(path): 238 for name in _force_run(path, os.listdir, path): 239 fullname = os.path.join(path, name) 240 if os.path.isdir(fullname): 241 _waitfor(_rmtree_inner, fullname, waitall=True) 242 _force_run(fullname, os.rmdir, fullname) 243 else: 244 _force_run(fullname, os.unlink, fullname) 245 _waitfor(_rmtree_inner, path, waitall=True) 246 _waitfor(lambda p: _force_run(p, os.rmdir, p), path) 247else: 248 _unlink = os.unlink 249 _rmdir = os.rmdir 250 251 def _rmtree(path): 252 try: 253 shutil.rmtree(path) 254 return 255 except EnvironmentError: 256 pass 257 258 def _rmtree_inner(path): 259 for name in _force_run(path, os.listdir, path): 260 fullname = os.path.join(path, name) 261 try: 262 mode = os.lstat(fullname).st_mode 263 except EnvironmentError: 264 mode = 0 265 if stat.S_ISDIR(mode): 266 _rmtree_inner(fullname) 267 _force_run(path, os.rmdir, fullname) 268 else: 269 _force_run(path, os.unlink, fullname) 270 _rmtree_inner(path) 271 os.rmdir(path) 272 273def unlink(filename): 274 try: 275 _unlink(filename) 276 except OSError: 277 pass 278 279def rmdir(dirname): 280 try: 281 _rmdir(dirname) 282 except OSError as error: 283 # The directory need not exist. 284 if error.errno != errno.ENOENT: 285 raise 286 287def rmtree(path): 288 try: 289 _rmtree(path) 290 except OSError, e: 291 # Unix returns ENOENT, Windows returns ESRCH. 292 if e.errno not in (errno.ENOENT, errno.ESRCH): 293 raise 294 295def forget(modname): 296 '''"Forget" a module was ever imported by removing it from sys.modules and 297 deleting any .pyc and .pyo files.''' 298 unload(modname) 299 for dirname in sys.path: 300 unlink(os.path.join(dirname, modname + os.extsep + 'pyc')) 301 # Deleting the .pyo file cannot be within the 'try' for the .pyc since 302 # the chance exists that there is no .pyc (and thus the 'try' statement 303 # is exited) but there is a .pyo file. 304 unlink(os.path.join(dirname, modname + os.extsep + 'pyo')) 305 306# Check whether a gui is actually available 307def _is_gui_available(): 308 if hasattr(_is_gui_available, 'result'): 309 return _is_gui_available.result 310 reason = None 311 if sys.platform.startswith('win'): 312 # if Python is running as a service (such as the buildbot service), 313 # gui interaction may be disallowed 314 import ctypes 315 import ctypes.wintypes 316 UOI_FLAGS = 1 317 WSF_VISIBLE = 0x0001 318 class USEROBJECTFLAGS(ctypes.Structure): 319 _fields_ = [("fInherit", ctypes.wintypes.BOOL), 320 ("fReserved", ctypes.wintypes.BOOL), 321 ("dwFlags", ctypes.wintypes.DWORD)] 322 dll = ctypes.windll.user32 323 h = dll.GetProcessWindowStation() 324 if not h: 325 raise ctypes.WinError() 326 uof = USEROBJECTFLAGS() 327 needed = ctypes.wintypes.DWORD() 328 res = dll.GetUserObjectInformationW(h, 329 UOI_FLAGS, 330 ctypes.byref(uof), 331 ctypes.sizeof(uof), 332 ctypes.byref(needed)) 333 if not res: 334 raise ctypes.WinError() 335 if not bool(uof.dwFlags & WSF_VISIBLE): 336 reason = "gui not available (WSF_VISIBLE flag not set)" 337 elif sys.platform == 'darwin': 338 # The Aqua Tk implementations on OS X can abort the process if 339 # being called in an environment where a window server connection 340 # cannot be made, for instance when invoked by a buildbot or ssh 341 # process not running under the same user id as the current console 342 # user. To avoid that, raise an exception if the window manager 343 # connection is not available. 344 from ctypes import cdll, c_int, pointer, Structure 345 from ctypes.util import find_library 346 347 app_services = cdll.LoadLibrary(find_library("ApplicationServices")) 348 349 if app_services.CGMainDisplayID() == 0: 350 reason = "gui tests cannot run without OS X window manager" 351 else: 352 class ProcessSerialNumber(Structure): 353 _fields_ = [("highLongOfPSN", c_int), 354 ("lowLongOfPSN", c_int)] 355 psn = ProcessSerialNumber() 356 psn_p = pointer(psn) 357 if ( (app_services.GetCurrentProcess(psn_p) < 0) or 358 (app_services.SetFrontProcess(psn_p) < 0) ): 359 reason = "cannot run without OS X gui process" 360 361 # check on every platform whether tkinter can actually do anything 362 if not reason: 363 try: 364 from Tkinter import Tk 365 root = Tk() 366 root.withdraw() 367 root.update() 368 root.destroy() 369 except Exception as e: 370 err_string = str(e) 371 if len(err_string) > 50: 372 err_string = err_string[:50] + ' [...]' 373 reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__, 374 err_string) 375 376 _is_gui_available.reason = reason 377 _is_gui_available.result = not reason 378 379 return _is_gui_available.result 380 381def is_resource_enabled(resource): 382 """Test whether a resource is enabled. 383 384 Known resources are set by regrtest.py. If not running under regrtest.py, 385 all resources are assumed enabled unless use_resources has been set. 386 """ 387 return use_resources is None or resource in use_resources 388 389def requires(resource, msg=None): 390 """Raise ResourceDenied if the specified resource is not available.""" 391 if not is_resource_enabled(resource): 392 if msg is None: 393 msg = "Use of the `%s' resource not enabled" % resource 394 raise ResourceDenied(msg) 395 if resource == 'gui' and not _is_gui_available(): 396 raise ResourceDenied(_is_gui_available.reason) 397 398def requires_mac_ver(*min_version): 399 """Decorator raising SkipTest if the OS is Mac OS X and the OS X 400 version if less than min_version. 401 402 For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version 403 is lesser than 10.5. 404 """ 405 def decorator(func): 406 @functools.wraps(func) 407 def wrapper(*args, **kw): 408 if sys.platform == 'darwin': 409 version_txt = platform.mac_ver()[0] 410 try: 411 version = tuple(map(int, version_txt.split('.'))) 412 except ValueError: 413 pass 414 else: 415 if version < min_version: 416 min_version_txt = '.'.join(map(str, min_version)) 417 raise unittest.SkipTest( 418 "Mac OS X %s or higher required, not %s" 419 % (min_version_txt, version_txt)) 420 return func(*args, **kw) 421 wrapper.min_version = min_version 422 return wrapper 423 return decorator 424 425 426# Don't use "localhost", since resolving it uses the DNS under recent 427# Windows versions (see issue #18792). 428HOST = "127.0.0.1" 429HOSTv6 = "::1" 430 431 432def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM): 433 """Returns an unused port that should be suitable for binding. This is 434 achieved by creating a temporary socket with the same family and type as 435 the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to 436 the specified host address (defaults to 0.0.0.0) with the port set to 0, 437 eliciting an unused ephemeral port from the OS. The temporary socket is 438 then closed and deleted, and the ephemeral port is returned. 439 440 Either this method or bind_port() should be used for any tests where a 441 server socket needs to be bound to a particular port for the duration of 442 the test. Which one to use depends on whether the calling code is creating 443 a python socket, or if an unused port needs to be provided in a constructor 444 or passed to an external program (i.e. the -accept argument to openssl's 445 s_server mode). Always prefer bind_port() over find_unused_port() where 446 possible. Hard coded ports should *NEVER* be used. As soon as a server 447 socket is bound to a hard coded port, the ability to run multiple instances 448 of the test simultaneously on the same host is compromised, which makes the 449 test a ticking time bomb in a buildbot environment. On Unix buildbots, this 450 may simply manifest as a failed test, which can be recovered from without 451 intervention in most cases, but on Windows, the entire python process can 452 completely and utterly wedge, requiring someone to log in to the buildbot 453 and manually kill the affected process. 454 455 (This is easy to reproduce on Windows, unfortunately, and can be traced to 456 the SO_REUSEADDR socket option having different semantics on Windows versus 457 Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind, 458 listen and then accept connections on identical host/ports. An EADDRINUSE 459 socket.error will be raised at some point (depending on the platform and 460 the order bind and listen were called on each socket). 461 462 However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE 463 will ever be raised when attempting to bind two identical host/ports. When 464 accept() is called on each socket, the second caller's process will steal 465 the port from the first caller, leaving them both in an awkwardly wedged 466 state where they'll no longer respond to any signals or graceful kills, and 467 must be forcibly killed via OpenProcess()/TerminateProcess(). 468 469 The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option 470 instead of SO_REUSEADDR, which effectively affords the same semantics as 471 SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open 472 Source world compared to Windows ones, this is a common mistake. A quick 473 look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when 474 openssl.exe is called with the 's_server' option, for example. See 475 http://bugs.python.org/issue2550 for more info. The following site also 476 has a very thorough description about the implications of both REUSEADDR 477 and EXCLUSIVEADDRUSE on Windows: 478 http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx) 479 480 XXX: although this approach is a vast improvement on previous attempts to 481 elicit unused ports, it rests heavily on the assumption that the ephemeral 482 port returned to us by the OS won't immediately be dished back out to some 483 other process when we close and delete our temporary socket but before our 484 calling code has a chance to bind the returned port. We can deal with this 485 issue if/when we come across it.""" 486 tempsock = socket.socket(family, socktype) 487 port = bind_port(tempsock) 488 tempsock.close() 489 del tempsock 490 return port 491 492def bind_port(sock, host=HOST): 493 """Bind the socket to a free port and return the port number. Relies on 494 ephemeral ports in order to ensure we are using an unbound port. This is 495 important as many tests may be running simultaneously, especially in a 496 buildbot environment. This method raises an exception if the sock.family 497 is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR 498 or SO_REUSEPORT set on it. Tests should *never* set these socket options 499 for TCP/IP sockets. The only case for setting these options is testing 500 multicasting via multiple UDP sockets. 501 502 Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e. 503 on Windows), it will be set on the socket. This will prevent anyone else 504 from bind()'ing to our host/port for the duration of the test. 505 """ 506 if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM: 507 if hasattr(socket, 'SO_REUSEADDR'): 508 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1: 509 raise TestFailed("tests should never set the SO_REUSEADDR " \ 510 "socket option on TCP/IP sockets!") 511 if hasattr(socket, 'SO_REUSEPORT'): 512 try: 513 if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1: 514 raise TestFailed("tests should never set the SO_REUSEPORT " \ 515 "socket option on TCP/IP sockets!") 516 except EnvironmentError: 517 # Python's socket module was compiled using modern headers 518 # thus defining SO_REUSEPORT but this process is running 519 # under an older kernel that does not support SO_REUSEPORT. 520 pass 521 if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'): 522 sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 523 524 sock.bind((host, 0)) 525 port = sock.getsockname()[1] 526 return port 527 528def _is_ipv6_enabled(): 529 """Check whether IPv6 is enabled on this host.""" 530 if socket.has_ipv6: 531 sock = None 532 try: 533 sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 534 sock.bind((HOSTv6, 0)) 535 return True 536 except socket.error: 537 pass 538 finally: 539 if sock: 540 sock.close() 541 return False 542 543IPV6_ENABLED = _is_ipv6_enabled() 544 545def system_must_validate_cert(f): 546 """Skip the test on TLS certificate validation failures.""" 547 @functools.wraps(f) 548 def dec(*args, **kwargs): 549 try: 550 f(*args, **kwargs) 551 except IOError as e: 552 if "CERTIFICATE_VERIFY_FAILED" in str(e): 553 raise unittest.SkipTest("system does not contain " 554 "necessary certificates") 555 raise 556 return dec 557 558FUZZ = 1e-6 559 560def fcmp(x, y): # fuzzy comparison function 561 if isinstance(x, float) or isinstance(y, float): 562 try: 563 fuzz = (abs(x) + abs(y)) * FUZZ 564 if abs(x-y) <= fuzz: 565 return 0 566 except: 567 pass 568 elif type(x) == type(y) and isinstance(x, (tuple, list)): 569 for i in range(min(len(x), len(y))): 570 outcome = fcmp(x[i], y[i]) 571 if outcome != 0: 572 return outcome 573 return (len(x) > len(y)) - (len(x) < len(y)) 574 return (x > y) - (x < y) 575 576 577# A constant likely larger than the underlying OS pipe buffer size, to 578# make writes blocking. 579# Windows limit seems to be around 512 B, and many Unix kernels have a 580# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure. 581# (see issue #17835 for a discussion of this number). 582PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1 583 584# A constant likely larger than the underlying OS socket buffer size, to make 585# writes blocking. 586# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl 587# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643 588# for a discussion of this number). 589SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1 590 591is_jython = sys.platform.startswith('java') 592 593try: 594 unicode 595 have_unicode = True 596except NameError: 597 have_unicode = False 598 599requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support') 600 601def u(s): 602 return unicode(s, 'unicode-escape') 603 604# FS_NONASCII: non-ASCII Unicode character encodable by 605# sys.getfilesystemencoding(), or None if there is no such character. 606FS_NONASCII = None 607if have_unicode: 608 for character in ( 609 # First try printable and common characters to have a readable filename. 610 # For each character, the encoding list are just example of encodings able 611 # to encode the character (the list is not exhaustive). 612 613 # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1 614 unichr(0x00E6), 615 # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3 616 unichr(0x0130), 617 # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257 618 unichr(0x0141), 619 # U+03C6 (Greek Small Letter Phi): cp1253 620 unichr(0x03C6), 621 # U+041A (Cyrillic Capital Letter Ka): cp1251 622 unichr(0x041A), 623 # U+05D0 (Hebrew Letter Alef): Encodable to cp424 624 unichr(0x05D0), 625 # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic 626 unichr(0x060C), 627 # U+062A (Arabic Letter Teh): cp720 628 unichr(0x062A), 629 # U+0E01 (Thai Character Ko Kai): cp874 630 unichr(0x0E01), 631 632 # Then try more "special" characters. "special" because they may be 633 # interpreted or displayed differently depending on the exact locale 634 # encoding and the font. 635 636 # U+00A0 (No-Break Space) 637 unichr(0x00A0), 638 # U+20AC (Euro Sign) 639 unichr(0x20AC), 640 ): 641 try: 642 character.encode(sys.getfilesystemencoding())\ 643 .decode(sys.getfilesystemencoding()) 644 except UnicodeError: 645 pass 646 else: 647 FS_NONASCII = character 648 break 649 650# Filename used for testing 651if os.name == 'java': 652 # Jython disallows @ in module names 653 TESTFN = '$test' 654elif os.name == 'riscos': 655 TESTFN = 'testfile' 656else: 657 TESTFN = '@test' 658 # Unicode name only used if TEST_FN_ENCODING exists for the platform. 659 if have_unicode: 660 # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding() 661 # TESTFN_UNICODE is a filename that can be encoded using the 662 # file system encoding, but *not* with the default (ascii) encoding 663 if isinstance('', unicode): 664 # python -U 665 # XXX perhaps unicode() should accept Unicode strings? 666 TESTFN_UNICODE = "@test-\xe0\xf2" 667 else: 668 # 2 latin characters. 669 TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1") 670 TESTFN_ENCODING = sys.getfilesystemencoding() 671 # TESTFN_UNENCODABLE is a filename that should *not* be 672 # able to be encoded by *either* the default or filesystem encoding. 673 # This test really only makes sense on Windows NT platforms 674 # which have special Unicode support in posixmodule. 675 if (not hasattr(sys, "getwindowsversion") or 676 sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME 677 TESTFN_UNENCODABLE = None 678 else: 679 # Japanese characters (I think - from bug 846133) 680 TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"') 681 try: 682 # XXX - Note - should be using TESTFN_ENCODING here - but for 683 # Windows, "mbcs" currently always operates as if in 684 # errors=ignore' mode - hence we get '?' characters rather than 685 # the exception. 'Latin1' operates as we expect - ie, fails. 686 # See [ 850997 ] mbcs encoding ignores errors 687 TESTFN_UNENCODABLE.encode("Latin1") 688 except UnicodeEncodeError: 689 pass 690 else: 691 print \ 692 'WARNING: The filename %r CAN be encoded by the filesystem. ' \ 693 'Unicode filename tests may not be effective' \ 694 % TESTFN_UNENCODABLE 695 696 697# Disambiguate TESTFN for parallel testing, while letting it remain a valid 698# module name. 699TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid()) 700 701# Save the initial cwd 702SAVEDCWD = os.getcwd() 703 704@contextlib.contextmanager 705def change_cwd(path, quiet=False): 706 """Return a context manager that changes the current working directory. 707 708 Arguments: 709 710 path: the directory to use as the temporary current working directory. 711 712 quiet: if False (the default), the context manager raises an exception 713 on error. Otherwise, it issues only a warning and keeps the current 714 working directory the same. 715 716 """ 717 saved_dir = os.getcwd() 718 try: 719 os.chdir(path) 720 except OSError: 721 if not quiet: 722 raise 723 warnings.warn('tests may fail, unable to change CWD to: ' + path, 724 RuntimeWarning, stacklevel=3) 725 try: 726 yield os.getcwd() 727 finally: 728 os.chdir(saved_dir) 729 730 731@contextlib.contextmanager 732def temp_cwd(name='tempcwd', quiet=False): 733 """ 734 Context manager that creates a temporary directory and set it as CWD. 735 736 The new CWD is created in the current directory and it's named *name*. 737 If *quiet* is False (default) and it's not possible to create or change 738 the CWD, an error is raised. If it's True, only a warning is raised 739 and the original CWD is used. 740 """ 741 if (have_unicode and isinstance(name, unicode) and 742 not os.path.supports_unicode_filenames): 743 try: 744 name = name.encode(sys.getfilesystemencoding() or 'ascii') 745 except UnicodeEncodeError: 746 if not quiet: 747 raise unittest.SkipTest('unable to encode the cwd name with ' 748 'the filesystem encoding.') 749 saved_dir = os.getcwd() 750 is_temporary = False 751 try: 752 os.mkdir(name) 753 os.chdir(name) 754 is_temporary = True 755 except OSError: 756 if not quiet: 757 raise 758 warnings.warn('tests may fail, unable to change the CWD to ' + name, 759 RuntimeWarning, stacklevel=3) 760 try: 761 yield os.getcwd() 762 finally: 763 os.chdir(saved_dir) 764 if is_temporary: 765 rmtree(name) 766 767 768def findfile(file, here=__file__, subdir=None): 769 """Try to find a file on sys.path and the working directory. If it is not 770 found the argument passed to the function is returned (this does not 771 necessarily signal failure; could still be the legitimate path).""" 772 if os.path.isabs(file): 773 return file 774 if subdir is not None: 775 file = os.path.join(subdir, file) 776 path = sys.path 777 path = [os.path.dirname(here)] + path 778 for dn in path: 779 fn = os.path.join(dn, file) 780 if os.path.exists(fn): return fn 781 return file 782 783def sortdict(dict): 784 "Like repr(dict), but in sorted order." 785 items = dict.items() 786 items.sort() 787 reprpairs = ["%r: %r" % pair for pair in items] 788 withcommas = ", ".join(reprpairs) 789 return "{%s}" % withcommas 790 791def make_bad_fd(): 792 """ 793 Create an invalid file descriptor by opening and closing a file and return 794 its fd. 795 """ 796 file = open(TESTFN, "wb") 797 try: 798 return file.fileno() 799 finally: 800 file.close() 801 unlink(TESTFN) 802 803def check_syntax_error(testcase, statement): 804 testcase.assertRaises(SyntaxError, compile, statement, 805 '<test string>', 'exec') 806 807def open_urlresource(url, check=None): 808 import urlparse, urllib2 809 810 filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL! 811 812 fn = os.path.join(os.path.dirname(__file__), "data", filename) 813 814 def check_valid_file(fn): 815 f = open(fn) 816 if check is None: 817 return f 818 elif check(f): 819 f.seek(0) 820 return f 821 f.close() 822 823 if os.path.exists(fn): 824 f = check_valid_file(fn) 825 if f is not None: 826 return f 827 unlink(fn) 828 829 # Verify the requirement before downloading the file 830 requires('urlfetch') 831 832 print >> get_original_stdout(), '\tfetching %s ...' % url 833 f = urllib2.urlopen(url, timeout=15) 834 try: 835 with open(fn, "wb") as out: 836 s = f.read() 837 while s: 838 out.write(s) 839 s = f.read() 840 finally: 841 f.close() 842 843 f = check_valid_file(fn) 844 if f is not None: 845 return f 846 raise TestFailed('invalid resource "%s"' % fn) 847 848 849class WarningsRecorder(object): 850 """Convenience wrapper for the warnings list returned on 851 entry to the warnings.catch_warnings() context manager. 852 """ 853 def __init__(self, warnings_list): 854 self._warnings = warnings_list 855 self._last = 0 856 857 def __getattr__(self, attr): 858 if len(self._warnings) > self._last: 859 return getattr(self._warnings[-1], attr) 860 elif attr in warnings.WarningMessage._WARNING_DETAILS: 861 return None 862 raise AttributeError("%r has no attribute %r" % (self, attr)) 863 864 @property 865 def warnings(self): 866 return self._warnings[self._last:] 867 868 def reset(self): 869 self._last = len(self._warnings) 870 871 872def _filterwarnings(filters, quiet=False): 873 """Catch the warnings, then check if all the expected 874 warnings have been raised and re-raise unexpected warnings. 875 If 'quiet' is True, only re-raise the unexpected warnings. 876 """ 877 # Clear the warning registry of the calling module 878 # in order to re-raise the warnings. 879 frame = sys._getframe(2) 880 registry = frame.f_globals.get('__warningregistry__') 881 if registry: 882 registry.clear() 883 with warnings.catch_warnings(record=True) as w: 884 # Set filter "always" to record all warnings. Because 885 # test_warnings swap the module, we need to look up in 886 # the sys.modules dictionary. 887 sys.modules['warnings'].simplefilter("always") 888 yield WarningsRecorder(w) 889 # Filter the recorded warnings 890 reraise = [warning.message for warning in w] 891 missing = [] 892 for msg, cat in filters: 893 seen = False 894 for exc in reraise[:]: 895 message = str(exc) 896 # Filter out the matching messages 897 if (re.match(msg, message, re.I) and 898 issubclass(exc.__class__, cat)): 899 seen = True 900 reraise.remove(exc) 901 if not seen and not quiet: 902 # This filter caught nothing 903 missing.append((msg, cat.__name__)) 904 if reraise: 905 raise AssertionError("unhandled warning %r" % reraise[0]) 906 if missing: 907 raise AssertionError("filter (%r, %s) did not catch any warning" % 908 missing[0]) 909 910 911@contextlib.contextmanager 912def check_warnings(*filters, **kwargs): 913 """Context manager to silence warnings. 914 915 Accept 2-tuples as positional arguments: 916 ("message regexp", WarningCategory) 917 918 Optional argument: 919 - if 'quiet' is True, it does not fail if a filter catches nothing 920 (default True without argument, 921 default False if some filters are defined) 922 923 Without argument, it defaults to: 924 check_warnings(("", Warning), quiet=True) 925 """ 926 quiet = kwargs.get('quiet') 927 if not filters: 928 filters = (("", Warning),) 929 # Preserve backward compatibility 930 if quiet is None: 931 quiet = True 932 return _filterwarnings(filters, quiet) 933 934 935@contextlib.contextmanager 936def check_py3k_warnings(*filters, **kwargs): 937 """Context manager to silence py3k warnings. 938 939 Accept 2-tuples as positional arguments: 940 ("message regexp", WarningCategory) 941 942 Optional argument: 943 - if 'quiet' is True, it does not fail if a filter catches nothing 944 (default False) 945 946 Without argument, it defaults to: 947 check_py3k_warnings(("", DeprecationWarning), quiet=False) 948 """ 949 if sys.py3kwarning: 950 if not filters: 951 filters = (("", DeprecationWarning),) 952 else: 953 # It should not raise any py3k warning 954 filters = () 955 return _filterwarnings(filters, kwargs.get('quiet')) 956 957 958class CleanImport(object): 959 """Context manager to force import to return a new module reference. 960 961 This is useful for testing module-level behaviours, such as 962 the emission of a DeprecationWarning on import. 963 964 Use like this: 965 966 with CleanImport("foo"): 967 importlib.import_module("foo") # new reference 968 """ 969 970 def __init__(self, *module_names): 971 self.original_modules = sys.modules.copy() 972 for module_name in module_names: 973 if module_name in sys.modules: 974 module = sys.modules[module_name] 975 # It is possible that module_name is just an alias for 976 # another module (e.g. stub for modules renamed in 3.x). 977 # In that case, we also need delete the real module to clear 978 # the import cache. 979 if module.__name__ != module_name: 980 del sys.modules[module.__name__] 981 del sys.modules[module_name] 982 983 def __enter__(self): 984 return self 985 986 def __exit__(self, *ignore_exc): 987 sys.modules.update(self.original_modules) 988 989 990class EnvironmentVarGuard(UserDict.DictMixin): 991 992 """Class to help protect the environment variable properly. Can be used as 993 a context manager.""" 994 995 def __init__(self): 996 self._environ = os.environ 997 self._changed = {} 998 999 def __getitem__(self, envvar): 1000 return self._environ[envvar] 1001 1002 def __setitem__(self, envvar, value): 1003 # Remember the initial value on the first access 1004 if envvar not in self._changed: 1005 self._changed[envvar] = self._environ.get(envvar) 1006 self._environ[envvar] = value 1007 1008 def __delitem__(self, envvar): 1009 # Remember the initial value on the first access 1010 if envvar not in self._changed: 1011 self._changed[envvar] = self._environ.get(envvar) 1012 if envvar in self._environ: 1013 del self._environ[envvar] 1014 1015 def keys(self): 1016 return self._environ.keys() 1017 1018 def set(self, envvar, value): 1019 self[envvar] = value 1020 1021 def unset(self, envvar): 1022 del self[envvar] 1023 1024 def __enter__(self): 1025 return self 1026 1027 def __exit__(self, *ignore_exc): 1028 for (k, v) in self._changed.items(): 1029 if v is None: 1030 if k in self._environ: 1031 del self._environ[k] 1032 else: 1033 self._environ[k] = v 1034 os.environ = self._environ 1035 1036 1037class DirsOnSysPath(object): 1038 """Context manager to temporarily add directories to sys.path. 1039 1040 This makes a copy of sys.path, appends any directories given 1041 as positional arguments, then reverts sys.path to the copied 1042 settings when the context ends. 1043 1044 Note that *all* sys.path modifications in the body of the 1045 context manager, including replacement of the object, 1046 will be reverted at the end of the block. 1047 """ 1048 1049 def __init__(self, *paths): 1050 self.original_value = sys.path[:] 1051 self.original_object = sys.path 1052 sys.path.extend(paths) 1053 1054 def __enter__(self): 1055 return self 1056 1057 def __exit__(self, *ignore_exc): 1058 sys.path = self.original_object 1059 sys.path[:] = self.original_value 1060 1061 1062class TransientResource(object): 1063 1064 """Raise ResourceDenied if an exception is raised while the context manager 1065 is in effect that matches the specified exception and attributes.""" 1066 1067 def __init__(self, exc, **kwargs): 1068 self.exc = exc 1069 self.attrs = kwargs 1070 1071 def __enter__(self): 1072 return self 1073 1074 def __exit__(self, type_=None, value=None, traceback=None): 1075 """If type_ is a subclass of self.exc and value has attributes matching 1076 self.attrs, raise ResourceDenied. Otherwise let the exception 1077 propagate (if any).""" 1078 if type_ is not None and issubclass(self.exc, type_): 1079 for attr, attr_value in self.attrs.iteritems(): 1080 if not hasattr(value, attr): 1081 break 1082 if getattr(value, attr) != attr_value: 1083 break 1084 else: 1085 raise ResourceDenied("an optional resource is not available") 1086 1087 1088@contextlib.contextmanager 1089def transient_internet(resource_name, timeout=30.0, errnos=()): 1090 """Return a context manager that raises ResourceDenied when various issues 1091 with the Internet connection manifest themselves as exceptions.""" 1092 default_errnos = [ 1093 ('ECONNREFUSED', 111), 1094 ('ECONNRESET', 104), 1095 ('EHOSTUNREACH', 113), 1096 ('ENETUNREACH', 101), 1097 ('ETIMEDOUT', 110), 1098 ] 1099 default_gai_errnos = [ 1100 ('EAI_AGAIN', -3), 1101 ('EAI_FAIL', -4), 1102 ('EAI_NONAME', -2), 1103 ('EAI_NODATA', -5), 1104 # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo() 1105 # implementation actually returns WSANO_DATA i.e. 11004. 1106 ('WSANO_DATA', 11004), 1107 ] 1108 1109 denied = ResourceDenied("Resource '%s' is not available" % resource_name) 1110 captured_errnos = errnos 1111 gai_errnos = [] 1112 if not captured_errnos: 1113 captured_errnos = [getattr(errno, name, num) 1114 for (name, num) in default_errnos] 1115 gai_errnos = [getattr(socket, name, num) 1116 for (name, num) in default_gai_errnos] 1117 1118 def filter_error(err): 1119 n = getattr(err, 'errno', None) 1120 if (isinstance(err, socket.timeout) or 1121 (isinstance(err, socket.gaierror) and n in gai_errnos) or 1122 n in captured_errnos): 1123 if not verbose: 1124 sys.stderr.write(denied.args[0] + "\n") 1125 raise denied 1126 1127 old_timeout = socket.getdefaulttimeout() 1128 try: 1129 if timeout is not None: 1130 socket.setdefaulttimeout(timeout) 1131 yield 1132 except IOError as err: 1133 # urllib can wrap original socket errors multiple times (!), we must 1134 # unwrap to get at the original error. 1135 while True: 1136 a = err.args 1137 if len(a) >= 1 and isinstance(a[0], IOError): 1138 err = a[0] 1139 # The error can also be wrapped as args[1]: 1140 # except socket.error as msg: 1141 # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2]) 1142 elif len(a) >= 2 and isinstance(a[1], IOError): 1143 err = a[1] 1144 else: 1145 break 1146 filter_error(err) 1147 raise 1148 # XXX should we catch generic exceptions and look for their 1149 # __cause__ or __context__? 1150 finally: 1151 socket.setdefaulttimeout(old_timeout) 1152 1153 1154@contextlib.contextmanager 1155def captured_output(stream_name): 1156 """Return a context manager used by captured_stdout and captured_stdin 1157 that temporarily replaces the sys stream *stream_name* with a StringIO.""" 1158 import StringIO 1159 orig_stdout = getattr(sys, stream_name) 1160 setattr(sys, stream_name, StringIO.StringIO()) 1161 try: 1162 yield getattr(sys, stream_name) 1163 finally: 1164 setattr(sys, stream_name, orig_stdout) 1165 1166def captured_stdout(): 1167 """Capture the output of sys.stdout: 1168 1169 with captured_stdout() as s: 1170 print "hello" 1171 self.assertEqual(s.getvalue(), "hello") 1172 """ 1173 return captured_output("stdout") 1174 1175def captured_stderr(): 1176 return captured_output("stderr") 1177 1178def captured_stdin(): 1179 return captured_output("stdin") 1180 1181def gc_collect(): 1182 """Force as many objects as possible to be collected. 1183 1184 In non-CPython implementations of Python, this is needed because timely 1185 deallocation is not guaranteed by the garbage collector. (Even in CPython 1186 this can be the case in case of reference cycles.) This means that __del__ 1187 methods may be called later than expected and weakrefs may remain alive for 1188 longer than expected. This function tries its best to force all garbage 1189 objects to disappear. 1190 """ 1191 gc.collect() 1192 if is_jython: 1193 time.sleep(0.1) 1194 gc.collect() 1195 gc.collect() 1196 1197 1198_header = '2P' 1199if hasattr(sys, "gettotalrefcount"): 1200 _header = '2P' + _header 1201_vheader = _header + 'P' 1202 1203def calcobjsize(fmt): 1204 return struct.calcsize(_header + fmt + '0P') 1205 1206def calcvobjsize(fmt): 1207 return struct.calcsize(_vheader + fmt + '0P') 1208 1209 1210_TPFLAGS_HAVE_GC = 1<<14 1211_TPFLAGS_HEAPTYPE = 1<<9 1212 1213def check_sizeof(test, o, size): 1214 import _testcapi 1215 result = sys.getsizeof(o) 1216 # add GC header size 1217 if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\ 1218 ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))): 1219 size += _testcapi.SIZEOF_PYGC_HEAD 1220 msg = 'wrong size for %s: got %d, expected %d' \ 1221 % (type(o), result, size) 1222 test.assertEqual(result, size, msg) 1223 1224 1225#======================================================================= 1226# Decorator for running a function in a different locale, correctly resetting 1227# it afterwards. 1228 1229def run_with_locale(catstr, *locales): 1230 def decorator(func): 1231 def inner(*args, **kwds): 1232 try: 1233 import locale 1234 category = getattr(locale, catstr) 1235 orig_locale = locale.setlocale(category) 1236 except AttributeError: 1237 # if the test author gives us an invalid category string 1238 raise 1239 except: 1240 # cannot retrieve original locale, so do nothing 1241 locale = orig_locale = None 1242 else: 1243 for loc in locales: 1244 try: 1245 locale.setlocale(category, loc) 1246 break 1247 except: 1248 pass 1249 1250 # now run the function, resetting the locale on exceptions 1251 try: 1252 return func(*args, **kwds) 1253 finally: 1254 if locale and orig_locale: 1255 locale.setlocale(category, orig_locale) 1256 inner.func_name = func.func_name 1257 inner.__doc__ = func.__doc__ 1258 return inner 1259 return decorator 1260 1261#======================================================================= 1262# Decorator for running a function in a specific timezone, correctly 1263# resetting it afterwards. 1264 1265def run_with_tz(tz): 1266 def decorator(func): 1267 def inner(*args, **kwds): 1268 try: 1269 tzset = time.tzset 1270 except AttributeError: 1271 raise unittest.SkipTest("tzset required") 1272 if 'TZ' in os.environ: 1273 orig_tz = os.environ['TZ'] 1274 else: 1275 orig_tz = None 1276 os.environ['TZ'] = tz 1277 tzset() 1278 1279 # now run the function, resetting the tz on exceptions 1280 try: 1281 return func(*args, **kwds) 1282 finally: 1283 if orig_tz is None: 1284 del os.environ['TZ'] 1285 else: 1286 os.environ['TZ'] = orig_tz 1287 time.tzset() 1288 1289 inner.__name__ = func.__name__ 1290 inner.__doc__ = func.__doc__ 1291 return inner 1292 return decorator 1293 1294#======================================================================= 1295# Big-memory-test support. Separate from 'resources' because memory use should be configurable. 1296 1297# Some handy shorthands. Note that these are used for byte-limits as well 1298# as size-limits, in the various bigmem tests 1299_1M = 1024*1024 1300_1G = 1024 * _1M 1301_2G = 2 * _1G 1302_4G = 4 * _1G 1303 1304MAX_Py_ssize_t = sys.maxsize 1305 1306def set_memlimit(limit): 1307 global max_memuse 1308 global real_max_memuse 1309 sizes = { 1310 'k': 1024, 1311 'm': _1M, 1312 'g': _1G, 1313 't': 1024*_1G, 1314 } 1315 m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit, 1316 re.IGNORECASE | re.VERBOSE) 1317 if m is None: 1318 raise ValueError('Invalid memory limit %r' % (limit,)) 1319 memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()]) 1320 real_max_memuse = memlimit 1321 if memlimit > MAX_Py_ssize_t: 1322 memlimit = MAX_Py_ssize_t 1323 if memlimit < _2G - 1: 1324 raise ValueError('Memory limit %r too low to be useful' % (limit,)) 1325 max_memuse = memlimit 1326 1327def bigmemtest(minsize, memuse, overhead=5*_1M): 1328 """Decorator for bigmem tests. 1329 1330 'minsize' is the minimum useful size for the test (in arbitrary, 1331 test-interpreted units.) 'memuse' is the number of 'bytes per size' for 1332 the test, or a good estimate of it. 'overhead' specifies fixed overhead, 1333 independent of the testsize, and defaults to 5Mb. 1334 1335 The decorator tries to guess a good value for 'size' and passes it to 1336 the decorated test function. If minsize * memuse is more than the 1337 allowed memory use (as defined by max_memuse), the test is skipped. 1338 Otherwise, minsize is adjusted upward to use up to max_memuse. 1339 """ 1340 def decorator(f): 1341 def wrapper(self): 1342 if not max_memuse: 1343 # If max_memuse is 0 (the default), 1344 # we still want to run the tests with size set to a few kb, 1345 # to make sure they work. We still want to avoid using 1346 # too much memory, though, but we do that noisily. 1347 maxsize = 5147 1348 self.assertFalse(maxsize * memuse + overhead > 20 * _1M) 1349 else: 1350 maxsize = int((max_memuse - overhead) / memuse) 1351 if maxsize < minsize: 1352 # Really ought to print 'test skipped' or something 1353 if verbose: 1354 sys.stderr.write("Skipping %s because of memory " 1355 "constraint\n" % (f.__name__,)) 1356 return 1357 # Try to keep some breathing room in memory use 1358 maxsize = max(maxsize - 50 * _1M, minsize) 1359 return f(self, maxsize) 1360 wrapper.minsize = minsize 1361 wrapper.memuse = memuse 1362 wrapper.overhead = overhead 1363 return wrapper 1364 return decorator 1365 1366def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True): 1367 def decorator(f): 1368 def wrapper(self): 1369 if not real_max_memuse: 1370 maxsize = 5147 1371 else: 1372 maxsize = size 1373 1374 if ((real_max_memuse or not dry_run) 1375 and real_max_memuse < maxsize * memuse): 1376 if verbose: 1377 sys.stderr.write("Skipping %s because of memory " 1378 "constraint\n" % (f.__name__,)) 1379 return 1380 1381 return f(self, maxsize) 1382 wrapper.size = size 1383 wrapper.memuse = memuse 1384 wrapper.overhead = overhead 1385 return wrapper 1386 return decorator 1387 1388def bigaddrspacetest(f): 1389 """Decorator for tests that fill the address space.""" 1390 def wrapper(self): 1391 if max_memuse < MAX_Py_ssize_t: 1392 if verbose: 1393 sys.stderr.write("Skipping %s because of memory " 1394 "constraint\n" % (f.__name__,)) 1395 else: 1396 return f(self) 1397 return wrapper 1398 1399#======================================================================= 1400# unittest integration. 1401 1402class BasicTestRunner: 1403 def run(self, test): 1404 result = unittest.TestResult() 1405 test(result) 1406 return result 1407 1408def _id(obj): 1409 return obj 1410 1411def requires_resource(resource): 1412 if resource == 'gui' and not _is_gui_available(): 1413 return unittest.skip(_is_gui_available.reason) 1414 if is_resource_enabled(resource): 1415 return _id 1416 else: 1417 return unittest.skip("resource {0!r} is not enabled".format(resource)) 1418 1419def cpython_only(test): 1420 """ 1421 Decorator for tests only applicable on CPython. 1422 """ 1423 return impl_detail(cpython=True)(test) 1424 1425def impl_detail(msg=None, **guards): 1426 if check_impl_detail(**guards): 1427 return _id 1428 if msg is None: 1429 guardnames, default = _parse_guards(guards) 1430 if default: 1431 msg = "implementation detail not available on {0}" 1432 else: 1433 msg = "implementation detail specific to {0}" 1434 guardnames = sorted(guardnames.keys()) 1435 msg = msg.format(' or '.join(guardnames)) 1436 return unittest.skip(msg) 1437 1438def _parse_guards(guards): 1439 # Returns a tuple ({platform_name: run_me}, default_value) 1440 if not guards: 1441 return ({'cpython': True}, False) 1442 is_true = guards.values()[0] 1443 assert guards.values() == [is_true] * len(guards) # all True or all False 1444 return (guards, not is_true) 1445 1446# Use the following check to guard CPython's implementation-specific tests -- 1447# or to run them only on the implementation(s) guarded by the arguments. 1448def check_impl_detail(**guards): 1449 """This function returns True or False depending on the host platform. 1450 Examples: 1451 if check_impl_detail(): # only on CPython (default) 1452 if check_impl_detail(jython=True): # only on Jython 1453 if check_impl_detail(cpython=False): # everywhere except on CPython 1454 """ 1455 guards, default = _parse_guards(guards) 1456 return guards.get(platform.python_implementation().lower(), default) 1457 1458 1459 1460def _run_suite(suite): 1461 """Run tests from a unittest.TestSuite-derived class.""" 1462 if verbose: 1463 runner = unittest.TextTestRunner(sys.stdout, verbosity=2) 1464 else: 1465 runner = BasicTestRunner() 1466 1467 result = runner.run(suite) 1468 if not result.wasSuccessful(): 1469 if len(result.errors) == 1 and not result.failures: 1470 err = result.errors[0][1] 1471 elif len(result.failures) == 1 and not result.errors: 1472 err = result.failures[0][1] 1473 else: 1474 err = "multiple errors occurred" 1475 if not verbose: 1476 err += "; run in verbose mode for details" 1477 raise TestFailed(err) 1478 1479 1480def run_unittest(*classes): 1481 """Run tests from unittest.TestCase-derived classes.""" 1482 valid_types = (unittest.TestSuite, unittest.TestCase) 1483 suite = unittest.TestSuite() 1484 for cls in classes: 1485 if isinstance(cls, str): 1486 if cls in sys.modules: 1487 suite.addTest(unittest.findTestCases(sys.modules[cls])) 1488 else: 1489 raise ValueError("str arguments must be keys in sys.modules") 1490 elif isinstance(cls, valid_types): 1491 suite.addTest(cls) 1492 else: 1493 suite.addTest(unittest.makeSuite(cls)) 1494 _run_suite(suite) 1495 1496#======================================================================= 1497# Check for the presence of docstrings. 1498 1499HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or 1500 sys.platform == 'win32' or 1501 sysconfig.get_config_var('WITH_DOC_STRINGS')) 1502 1503requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS, 1504 "test requires docstrings") 1505 1506 1507#======================================================================= 1508# doctest driver. 1509 1510def run_doctest(module, verbosity=None): 1511 """Run doctest on the given module. Return (#failures, #tests). 1512 1513 If optional argument verbosity is not specified (or is None), pass 1514 test_support's belief about verbosity on to doctest. Else doctest's 1515 usual behavior is used (it searches sys.argv for -v). 1516 """ 1517 1518 import doctest 1519 1520 if verbosity is None: 1521 verbosity = verbose 1522 else: 1523 verbosity = None 1524 1525 # Direct doctest output (normally just errors) to real stdout; doctest 1526 # output shouldn't be compared by regrtest. 1527 save_stdout = sys.stdout 1528 sys.stdout = get_original_stdout() 1529 try: 1530 f, t = doctest.testmod(module, verbose=verbosity) 1531 if f: 1532 raise TestFailed("%d of %d doctests failed" % (f, t)) 1533 finally: 1534 sys.stdout = save_stdout 1535 if verbose: 1536 print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t) 1537 return f, t 1538 1539#======================================================================= 1540# Threading support to prevent reporting refleaks when running regrtest.py -R 1541 1542# NOTE: we use thread._count() rather than threading.enumerate() (or the 1543# moral equivalent thereof) because a threading.Thread object is still alive 1544# until its __bootstrap() method has returned, even after it has been 1545# unregistered from the threading module. 1546# thread._count(), on the other hand, only gets decremented *after* the 1547# __bootstrap() method has returned, which gives us reliable reference counts 1548# at the end of a test run. 1549 1550def threading_setup(): 1551 if thread: 1552 return thread._count(), 1553 else: 1554 return 1, 1555 1556def threading_cleanup(nb_threads): 1557 if not thread: 1558 return 1559 1560 _MAX_COUNT = 10 1561 for count in range(_MAX_COUNT): 1562 n = thread._count() 1563 if n == nb_threads: 1564 break 1565 time.sleep(0.1) 1566 # XXX print a warning in case of failure? 1567 1568def reap_threads(func): 1569 """Use this function when threads are being used. This will 1570 ensure that the threads are cleaned up even when the test fails. 1571 If threading is unavailable this function does nothing. 1572 """ 1573 if not thread: 1574 return func 1575 1576 @functools.wraps(func) 1577 def decorator(*args): 1578 key = threading_setup() 1579 try: 1580 return func(*args) 1581 finally: 1582 threading_cleanup(*key) 1583 return decorator 1584 1585def reap_children(): 1586 """Use this function at the end of test_main() whenever sub-processes 1587 are started. This will help ensure that no extra children (zombies) 1588 stick around to hog resources and create problems when looking 1589 for refleaks. 1590 """ 1591 1592 # Reap all our dead child processes so we don't leave zombies around. 1593 # These hog resources and might be causing some of the buildbots to die. 1594 if hasattr(os, 'waitpid'): 1595 any_process = -1 1596 while True: 1597 try: 1598 # This will raise an exception on Windows. That's ok. 1599 pid, status = os.waitpid(any_process, os.WNOHANG) 1600 if pid == 0: 1601 break 1602 except: 1603 break 1604 1605@contextlib.contextmanager 1606def start_threads(threads, unlock=None): 1607 threads = list(threads) 1608 started = [] 1609 try: 1610 try: 1611 for t in threads: 1612 t.start() 1613 started.append(t) 1614 except: 1615 if verbose: 1616 print("Can't start %d threads, only %d threads started" % 1617 (len(threads), len(started))) 1618 raise 1619 yield 1620 finally: 1621 if unlock: 1622 unlock() 1623 endtime = starttime = time.time() 1624 for timeout in range(1, 16): 1625 endtime += 60 1626 for t in started: 1627 t.join(max(endtime - time.time(), 0.01)) 1628 started = [t for t in started if t.isAlive()] 1629 if not started: 1630 break 1631 if verbose: 1632 print('Unable to join %d threads during a period of ' 1633 '%d minutes' % (len(started), timeout)) 1634 started = [t for t in started if t.isAlive()] 1635 if started: 1636 raise AssertionError('Unable to join %d threads' % len(started)) 1637 1638@contextlib.contextmanager 1639def swap_attr(obj, attr, new_val): 1640 """Temporary swap out an attribute with a new object. 1641 1642 Usage: 1643 with swap_attr(obj, "attr", 5): 1644 ... 1645 1646 This will set obj.attr to 5 for the duration of the with: block, 1647 restoring the old value at the end of the block. If `attr` doesn't 1648 exist on `obj`, it will be created and then deleted at the end of the 1649 block. 1650 """ 1651 if hasattr(obj, attr): 1652 real_val = getattr(obj, attr) 1653 setattr(obj, attr, new_val) 1654 try: 1655 yield 1656 finally: 1657 setattr(obj, attr, real_val) 1658 else: 1659 setattr(obj, attr, new_val) 1660 try: 1661 yield 1662 finally: 1663 delattr(obj, attr) 1664 1665def py3k_bytes(b): 1666 """Emulate the py3k bytes() constructor. 1667 1668 NOTE: This is only a best effort function. 1669 """ 1670 try: 1671 # memoryview? 1672 return b.tobytes() 1673 except AttributeError: 1674 try: 1675 # iterable of ints? 1676 return b"".join(chr(x) for x in b) 1677 except TypeError: 1678 return bytes(b) 1679 1680def args_from_interpreter_flags(): 1681 """Return a list of command-line arguments reproducing the current 1682 settings in sys.flags.""" 1683 import subprocess 1684 return subprocess._args_from_interpreter_flags() 1685 1686def strip_python_stderr(stderr): 1687 """Strip the stderr of a Python process from potential debug output 1688 emitted by the interpreter. 1689 1690 This will typically be run on the result of the communicate() method 1691 of a subprocess.Popen object. 1692 """ 1693 stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip() 1694 return stderr 1695 1696 1697def check_free_after_iterating(test, iter, cls, args=()): 1698 class A(cls): 1699 def __del__(self): 1700 done[0] = True 1701 try: 1702 next(it) 1703 except StopIteration: 1704 pass 1705 1706 done = [False] 1707 it = iter(A(*args)) 1708 # Issue 26494: Shouldn't crash 1709 test.assertRaises(StopIteration, next, it) 1710 # The sequence should be deallocated just after the end of iterating 1711 gc_collect() 1712 test.assertTrue(done[0]) 1713