• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Supporting definitions for the Python regression tests."""
2
3if __name__ != 'test.test_support':
4    raise ImportError('test_support must be imported from the test package')
5
6import contextlib
7import errno
8import functools
9import gc
10import socket
11import stat
12import sys
13import os
14import platform
15import shutil
16import warnings
17import unittest
18import importlib
19import UserDict
20import re
21import time
22import struct
23import sysconfig
24try:
25    import thread
26except ImportError:
27    thread = None
28
29__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
30           "verbose", "use_resources", "max_memuse", "record_original_stdout",
31           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
32           "is_resource_enabled", "requires", "requires_mac_ver",
33           "find_unused_port", "bind_port",
34           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
35           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
36           "open_urlresource", "check_warnings", "check_py3k_warnings",
37           "CleanImport", "EnvironmentVarGuard", "captured_output",
38           "captured_stdout", "TransientResource", "transient_internet",
39           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
40           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
41           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
42           "check_impl_detail", "get_attribute", "py3k_bytes",
43           "import_fresh_module", "threading_cleanup", "reap_children",
44           "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
45
46class Error(Exception):
47    """Base class for regression test exceptions."""
48
49class TestFailed(Error):
50    """Test failed."""
51
52class ResourceDenied(unittest.SkipTest):
53    """Test skipped because it requested a disallowed resource.
54
55    This is raised when a test calls requires() for a resource that
56    has not been enabled.  It is used to distinguish between expected
57    and unexpected skips.
58    """
59
60@contextlib.contextmanager
61def _ignore_deprecated_imports(ignore=True):
62    """Context manager to suppress package and module deprecation
63    warnings when importing them.
64
65    If ignore is False, this context manager has no effect."""
66    if ignore:
67        with warnings.catch_warnings():
68            warnings.filterwarnings("ignore", ".+ (module|package)",
69                                    DeprecationWarning)
70            yield
71    else:
72        yield
73
74
75def import_module(name, deprecated=False):
76    """Import and return the module to be tested, raising SkipTest if
77    it is not available.
78
79    If deprecated is True, any module or package deprecation messages
80    will be suppressed."""
81    with _ignore_deprecated_imports(deprecated):
82        try:
83            return importlib.import_module(name)
84        except ImportError, msg:
85            raise unittest.SkipTest(str(msg))
86
87
88def _save_and_remove_module(name, orig_modules):
89    """Helper function to save and remove a module from sys.modules
90
91       Raise ImportError if the module can't be imported."""
92    # try to import the module and raise an error if it can't be imported
93    if name not in sys.modules:
94        __import__(name)
95        del sys.modules[name]
96    for modname in list(sys.modules):
97        if modname == name or modname.startswith(name + '.'):
98            orig_modules[modname] = sys.modules[modname]
99            del sys.modules[modname]
100
101def _save_and_block_module(name, orig_modules):
102    """Helper function to save and block a module in sys.modules
103
104       Return True if the module was in sys.modules, False otherwise."""
105    saved = True
106    try:
107        orig_modules[name] = sys.modules[name]
108    except KeyError:
109        saved = False
110    sys.modules[name] = None
111    return saved
112
113
114def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
115    """Imports and returns a module, deliberately bypassing the sys.modules cache
116    and importing a fresh copy of the module. Once the import is complete,
117    the sys.modules cache is restored to its original state.
118
119    Modules named in fresh are also imported anew if needed by the import.
120    If one of these modules can't be imported, None is returned.
121
122    Importing of modules named in blocked is prevented while the fresh import
123    takes place.
124
125    If deprecated is True, any module or package deprecation messages
126    will be suppressed."""
127    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
128    # checks to make sure that this utility function is working as expected
129    with _ignore_deprecated_imports(deprecated):
130        # Keep track of modules saved for later restoration as well
131        # as those which just need a blocking entry removed
132        orig_modules = {}
133        names_to_remove = []
134        _save_and_remove_module(name, orig_modules)
135        try:
136            for fresh_name in fresh:
137                _save_and_remove_module(fresh_name, orig_modules)
138            for blocked_name in blocked:
139                if not _save_and_block_module(blocked_name, orig_modules):
140                    names_to_remove.append(blocked_name)
141            fresh_module = importlib.import_module(name)
142        except ImportError:
143            fresh_module = None
144        finally:
145            for orig_name, module in orig_modules.items():
146                sys.modules[orig_name] = module
147            for name_to_remove in names_to_remove:
148                del sys.modules[name_to_remove]
149        return fresh_module
150
151
152def get_attribute(obj, name):
153    """Get an attribute, raising SkipTest if AttributeError is raised."""
154    try:
155        attribute = getattr(obj, name)
156    except AttributeError:
157        raise unittest.SkipTest("module %s has no attribute %s" % (
158            obj.__name__, name))
159    else:
160        return attribute
161
162
163verbose = 1              # Flag set to 0 by regrtest.py
164use_resources = None     # Flag set to [] by regrtest.py
165max_memuse = 0           # Disable bigmem tests (they will still be run with
166                         # small sizes, to make sure they work.)
167real_max_memuse = 0
168
169# _original_stdout is meant to hold stdout at the time regrtest began.
170# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
171# The point is to have some flavor of stdout the user can actually see.
172_original_stdout = None
173def record_original_stdout(stdout):
174    global _original_stdout
175    _original_stdout = stdout
176
177def get_original_stdout():
178    return _original_stdout or sys.stdout
179
180def unload(name):
181    try:
182        del sys.modules[name]
183    except KeyError:
184        pass
185
186def _force_run(path, func, *args):
187    try:
188        return func(*args)
189    except EnvironmentError as err:
190        if verbose >= 2:
191            print('%s: %s' % (err.__class__.__name__, err))
192            print('re-run %s%r' % (func.__name__, args))
193        os.chmod(path, stat.S_IRWXU)
194        return func(*args)
195
196if sys.platform.startswith("win"):
197    def _waitfor(func, pathname, waitall=False):
198        # Perform the operation
199        func(pathname)
200        # Now setup the wait loop
201        if waitall:
202            dirname = pathname
203        else:
204            dirname, name = os.path.split(pathname)
205            dirname = dirname or '.'
206        # Check for `pathname` to be removed from the filesystem.
207        # The exponential backoff of the timeout amounts to a total
208        # of ~1 second after which the deletion is probably an error
209        # anyway.
210        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
211        # required when contention occurs.
212        timeout = 0.001
213        while timeout < 1.0:
214            # Note we are only testing for the existence of the file(s) in
215            # the contents of the directory regardless of any security or
216            # access rights.  If we have made it this far, we have sufficient
217            # permissions to do that much using Python's equivalent of the
218            # Windows API FindFirstFile.
219            # Other Windows APIs can fail or give incorrect results when
220            # dealing with files that are pending deletion.
221            L = os.listdir(dirname)
222            if not (L if waitall else name in L):
223                return
224            # Increase the timeout and try again
225            time.sleep(timeout)
226            timeout *= 2
227        warnings.warn('tests may fail, delete still pending for ' + pathname,
228                      RuntimeWarning, stacklevel=4)
229
230    def _unlink(filename):
231        _waitfor(os.unlink, filename)
232
233    def _rmdir(dirname):
234        _waitfor(os.rmdir, dirname)
235
236    def _rmtree(path):
237        def _rmtree_inner(path):
238            for name in _force_run(path, os.listdir, path):
239                fullname = os.path.join(path, name)
240                if os.path.isdir(fullname):
241                    _waitfor(_rmtree_inner, fullname, waitall=True)
242                    _force_run(fullname, os.rmdir, fullname)
243                else:
244                    _force_run(fullname, os.unlink, fullname)
245        _waitfor(_rmtree_inner, path, waitall=True)
246        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
247else:
248    _unlink = os.unlink
249    _rmdir = os.rmdir
250
251    def _rmtree(path):
252        try:
253            shutil.rmtree(path)
254            return
255        except EnvironmentError:
256            pass
257
258        def _rmtree_inner(path):
259            for name in _force_run(path, os.listdir, path):
260                fullname = os.path.join(path, name)
261                try:
262                    mode = os.lstat(fullname).st_mode
263                except EnvironmentError:
264                    mode = 0
265                if stat.S_ISDIR(mode):
266                    _rmtree_inner(fullname)
267                    _force_run(path, os.rmdir, fullname)
268                else:
269                    _force_run(path, os.unlink, fullname)
270        _rmtree_inner(path)
271        os.rmdir(path)
272
273def unlink(filename):
274    try:
275        _unlink(filename)
276    except OSError:
277        pass
278
279def rmdir(dirname):
280    try:
281        _rmdir(dirname)
282    except OSError as error:
283        # The directory need not exist.
284        if error.errno != errno.ENOENT:
285            raise
286
287def rmtree(path):
288    try:
289        _rmtree(path)
290    except OSError, e:
291        # Unix returns ENOENT, Windows returns ESRCH.
292        if e.errno not in (errno.ENOENT, errno.ESRCH):
293            raise
294
295def forget(modname):
296    '''"Forget" a module was ever imported by removing it from sys.modules and
297    deleting any .pyc and .pyo files.'''
298    unload(modname)
299    for dirname in sys.path:
300        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
301        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
302        # the chance exists that there is no .pyc (and thus the 'try' statement
303        # is exited) but there is a .pyo file.
304        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
305
306# Check whether a gui is actually available
307def _is_gui_available():
308    if hasattr(_is_gui_available, 'result'):
309        return _is_gui_available.result
310    reason = None
311    if sys.platform.startswith('win'):
312        # if Python is running as a service (such as the buildbot service),
313        # gui interaction may be disallowed
314        import ctypes
315        import ctypes.wintypes
316        UOI_FLAGS = 1
317        WSF_VISIBLE = 0x0001
318        class USEROBJECTFLAGS(ctypes.Structure):
319            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
320                        ("fReserved", ctypes.wintypes.BOOL),
321                        ("dwFlags", ctypes.wintypes.DWORD)]
322        dll = ctypes.windll.user32
323        h = dll.GetProcessWindowStation()
324        if not h:
325            raise ctypes.WinError()
326        uof = USEROBJECTFLAGS()
327        needed = ctypes.wintypes.DWORD()
328        res = dll.GetUserObjectInformationW(h,
329            UOI_FLAGS,
330            ctypes.byref(uof),
331            ctypes.sizeof(uof),
332            ctypes.byref(needed))
333        if not res:
334            raise ctypes.WinError()
335        if not bool(uof.dwFlags & WSF_VISIBLE):
336            reason = "gui not available (WSF_VISIBLE flag not set)"
337    elif sys.platform == 'darwin':
338        # The Aqua Tk implementations on OS X can abort the process if
339        # being called in an environment where a window server connection
340        # cannot be made, for instance when invoked by a buildbot or ssh
341        # process not running under the same user id as the current console
342        # user.  To avoid that, raise an exception if the window manager
343        # connection is not available.
344        from ctypes import cdll, c_int, pointer, Structure
345        from ctypes.util import find_library
346
347        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
348
349        if app_services.CGMainDisplayID() == 0:
350            reason = "gui tests cannot run without OS X window manager"
351        else:
352            class ProcessSerialNumber(Structure):
353                _fields_ = [("highLongOfPSN", c_int),
354                            ("lowLongOfPSN", c_int)]
355            psn = ProcessSerialNumber()
356            psn_p = pointer(psn)
357            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
358                  (app_services.SetFrontProcess(psn_p) < 0) ):
359                reason = "cannot run without OS X gui process"
360
361    # check on every platform whether tkinter can actually do anything
362    if not reason:
363        try:
364            from Tkinter import Tk
365            root = Tk()
366            root.withdraw()
367            root.update()
368            root.destroy()
369        except Exception as e:
370            err_string = str(e)
371            if len(err_string) > 50:
372                err_string = err_string[:50] + ' [...]'
373            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
374                                                           err_string)
375
376    _is_gui_available.reason = reason
377    _is_gui_available.result = not reason
378
379    return _is_gui_available.result
380
381def is_resource_enabled(resource):
382    """Test whether a resource is enabled.
383
384    Known resources are set by regrtest.py.  If not running under regrtest.py,
385    all resources are assumed enabled unless use_resources has been set.
386    """
387    return use_resources is None or resource in use_resources
388
389def requires(resource, msg=None):
390    """Raise ResourceDenied if the specified resource is not available."""
391    if not is_resource_enabled(resource):
392        if msg is None:
393            msg = "Use of the `%s' resource not enabled" % resource
394        raise ResourceDenied(msg)
395    if resource == 'gui' and not _is_gui_available():
396        raise ResourceDenied(_is_gui_available.reason)
397
398def requires_mac_ver(*min_version):
399    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
400    version if less than min_version.
401
402    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
403    is lesser than 10.5.
404    """
405    def decorator(func):
406        @functools.wraps(func)
407        def wrapper(*args, **kw):
408            if sys.platform == 'darwin':
409                version_txt = platform.mac_ver()[0]
410                try:
411                    version = tuple(map(int, version_txt.split('.')))
412                except ValueError:
413                    pass
414                else:
415                    if version < min_version:
416                        min_version_txt = '.'.join(map(str, min_version))
417                        raise unittest.SkipTest(
418                            "Mac OS X %s or higher required, not %s"
419                            % (min_version_txt, version_txt))
420            return func(*args, **kw)
421        wrapper.min_version = min_version
422        return wrapper
423    return decorator
424
425
426# Don't use "localhost", since resolving it uses the DNS under recent
427# Windows versions (see issue #18792).
428HOST = "127.0.0.1"
429HOSTv6 = "::1"
430
431
432def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
433    """Returns an unused port that should be suitable for binding.  This is
434    achieved by creating a temporary socket with the same family and type as
435    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
436    the specified host address (defaults to 0.0.0.0) with the port set to 0,
437    eliciting an unused ephemeral port from the OS.  The temporary socket is
438    then closed and deleted, and the ephemeral port is returned.
439
440    Either this method or bind_port() should be used for any tests where a
441    server socket needs to be bound to a particular port for the duration of
442    the test.  Which one to use depends on whether the calling code is creating
443    a python socket, or if an unused port needs to be provided in a constructor
444    or passed to an external program (i.e. the -accept argument to openssl's
445    s_server mode).  Always prefer bind_port() over find_unused_port() where
446    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
447    socket is bound to a hard coded port, the ability to run multiple instances
448    of the test simultaneously on the same host is compromised, which makes the
449    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
450    may simply manifest as a failed test, which can be recovered from without
451    intervention in most cases, but on Windows, the entire python process can
452    completely and utterly wedge, requiring someone to log in to the buildbot
453    and manually kill the affected process.
454
455    (This is easy to reproduce on Windows, unfortunately, and can be traced to
456    the SO_REUSEADDR socket option having different semantics on Windows versus
457    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
458    listen and then accept connections on identical host/ports.  An EADDRINUSE
459    socket.error will be raised at some point (depending on the platform and
460    the order bind and listen were called on each socket).
461
462    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
463    will ever be raised when attempting to bind two identical host/ports. When
464    accept() is called on each socket, the second caller's process will steal
465    the port from the first caller, leaving them both in an awkwardly wedged
466    state where they'll no longer respond to any signals or graceful kills, and
467    must be forcibly killed via OpenProcess()/TerminateProcess().
468
469    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
470    instead of SO_REUSEADDR, which effectively affords the same semantics as
471    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
472    Source world compared to Windows ones, this is a common mistake.  A quick
473    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
474    openssl.exe is called with the 's_server' option, for example. See
475    http://bugs.python.org/issue2550 for more info.  The following site also
476    has a very thorough description about the implications of both REUSEADDR
477    and EXCLUSIVEADDRUSE on Windows:
478    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
479
480    XXX: although this approach is a vast improvement on previous attempts to
481    elicit unused ports, it rests heavily on the assumption that the ephemeral
482    port returned to us by the OS won't immediately be dished back out to some
483    other process when we close and delete our temporary socket but before our
484    calling code has a chance to bind the returned port.  We can deal with this
485    issue if/when we come across it."""
486    tempsock = socket.socket(family, socktype)
487    port = bind_port(tempsock)
488    tempsock.close()
489    del tempsock
490    return port
491
492def bind_port(sock, host=HOST):
493    """Bind the socket to a free port and return the port number.  Relies on
494    ephemeral ports in order to ensure we are using an unbound port.  This is
495    important as many tests may be running simultaneously, especially in a
496    buildbot environment.  This method raises an exception if the sock.family
497    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
498    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
499    for TCP/IP sockets.  The only case for setting these options is testing
500    multicasting via multiple UDP sockets.
501
502    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
503    on Windows), it will be set on the socket.  This will prevent anyone else
504    from bind()'ing to our host/port for the duration of the test.
505    """
506    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
507        if hasattr(socket, 'SO_REUSEADDR'):
508            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
509                raise TestFailed("tests should never set the SO_REUSEADDR "   \
510                                 "socket option on TCP/IP sockets!")
511        if hasattr(socket, 'SO_REUSEPORT'):
512            try:
513                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
514                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
515                                     "socket option on TCP/IP sockets!")
516            except EnvironmentError:
517                # Python's socket module was compiled using modern headers
518                # thus defining SO_REUSEPORT but this process is running
519                # under an older kernel that does not support SO_REUSEPORT.
520                pass
521        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
522            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
523
524    sock.bind((host, 0))
525    port = sock.getsockname()[1]
526    return port
527
528def _is_ipv6_enabled():
529    """Check whether IPv6 is enabled on this host."""
530    if socket.has_ipv6:
531        sock = None
532        try:
533            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
534            sock.bind((HOSTv6, 0))
535            return True
536        except socket.error:
537            pass
538        finally:
539            if sock:
540                sock.close()
541    return False
542
543IPV6_ENABLED = _is_ipv6_enabled()
544
545def system_must_validate_cert(f):
546    """Skip the test on TLS certificate validation failures."""
547    @functools.wraps(f)
548    def dec(*args, **kwargs):
549        try:
550            f(*args, **kwargs)
551        except IOError as e:
552            if "CERTIFICATE_VERIFY_FAILED" in str(e):
553                raise unittest.SkipTest("system does not contain "
554                                        "necessary certificates")
555            raise
556    return dec
557
558FUZZ = 1e-6
559
560def fcmp(x, y): # fuzzy comparison function
561    if isinstance(x, float) or isinstance(y, float):
562        try:
563            fuzz = (abs(x) + abs(y)) * FUZZ
564            if abs(x-y) <= fuzz:
565                return 0
566        except:
567            pass
568    elif type(x) == type(y) and isinstance(x, (tuple, list)):
569        for i in range(min(len(x), len(y))):
570            outcome = fcmp(x[i], y[i])
571            if outcome != 0:
572                return outcome
573        return (len(x) > len(y)) - (len(x) < len(y))
574    return (x > y) - (x < y)
575
576
577# A constant likely larger than the underlying OS pipe buffer size, to
578# make writes blocking.
579# Windows limit seems to be around 512 B, and many Unix kernels have a
580# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
581# (see issue #17835 for a discussion of this number).
582PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
583
584# A constant likely larger than the underlying OS socket buffer size, to make
585# writes blocking.
586# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
587# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
588# for a discussion of this number).
589SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
590
591is_jython = sys.platform.startswith('java')
592
593try:
594    unicode
595    have_unicode = True
596except NameError:
597    have_unicode = False
598
599requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
600
601def u(s):
602    return unicode(s, 'unicode-escape')
603
604# FS_NONASCII: non-ASCII Unicode character encodable by
605# sys.getfilesystemencoding(), or None if there is no such character.
606FS_NONASCII = None
607if have_unicode:
608    for character in (
609        # First try printable and common characters to have a readable filename.
610        # For each character, the encoding list are just example of encodings able
611        # to encode the character (the list is not exhaustive).
612
613        # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
614        unichr(0x00E6),
615        # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
616        unichr(0x0130),
617        # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
618        unichr(0x0141),
619        # U+03C6 (Greek Small Letter Phi): cp1253
620        unichr(0x03C6),
621        # U+041A (Cyrillic Capital Letter Ka): cp1251
622        unichr(0x041A),
623        # U+05D0 (Hebrew Letter Alef): Encodable to cp424
624        unichr(0x05D0),
625        # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
626        unichr(0x060C),
627        # U+062A (Arabic Letter Teh): cp720
628        unichr(0x062A),
629        # U+0E01 (Thai Character Ko Kai): cp874
630        unichr(0x0E01),
631
632        # Then try more "special" characters. "special" because they may be
633        # interpreted or displayed differently depending on the exact locale
634        # encoding and the font.
635
636        # U+00A0 (No-Break Space)
637        unichr(0x00A0),
638        # U+20AC (Euro Sign)
639        unichr(0x20AC),
640    ):
641        try:
642            character.encode(sys.getfilesystemencoding())\
643                     .decode(sys.getfilesystemencoding())
644        except UnicodeError:
645            pass
646        else:
647            FS_NONASCII = character
648            break
649
650# Filename used for testing
651if os.name == 'java':
652    # Jython disallows @ in module names
653    TESTFN = '$test'
654elif os.name == 'riscos':
655    TESTFN = 'testfile'
656else:
657    TESTFN = '@test'
658    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
659    if have_unicode:
660        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
661        # TESTFN_UNICODE is a filename that can be encoded using the
662        # file system encoding, but *not* with the default (ascii) encoding
663        if isinstance('', unicode):
664            # python -U
665            # XXX perhaps unicode() should accept Unicode strings?
666            TESTFN_UNICODE = "@test-\xe0\xf2"
667        else:
668            # 2 latin characters.
669            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
670        TESTFN_ENCODING = sys.getfilesystemencoding()
671        # TESTFN_UNENCODABLE is a filename that should *not* be
672        # able to be encoded by *either* the default or filesystem encoding.
673        # This test really only makes sense on Windows NT platforms
674        # which have special Unicode support in posixmodule.
675        if (not hasattr(sys, "getwindowsversion") or
676                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
677            TESTFN_UNENCODABLE = None
678        else:
679            # Japanese characters (I think - from bug 846133)
680            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
681            try:
682                # XXX - Note - should be using TESTFN_ENCODING here - but for
683                # Windows, "mbcs" currently always operates as if in
684                # errors=ignore' mode - hence we get '?' characters rather than
685                # the exception.  'Latin1' operates as we expect - ie, fails.
686                # See [ 850997 ] mbcs encoding ignores errors
687                TESTFN_UNENCODABLE.encode("Latin1")
688            except UnicodeEncodeError:
689                pass
690            else:
691                print \
692                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
693                'Unicode filename tests may not be effective' \
694                % TESTFN_UNENCODABLE
695
696
697# Disambiguate TESTFN for parallel testing, while letting it remain a valid
698# module name.
699TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
700
701# Save the initial cwd
702SAVEDCWD = os.getcwd()
703
704@contextlib.contextmanager
705def change_cwd(path, quiet=False):
706    """Return a context manager that changes the current working directory.
707
708    Arguments:
709
710      path: the directory to use as the temporary current working directory.
711
712      quiet: if False (the default), the context manager raises an exception
713        on error.  Otherwise, it issues only a warning and keeps the current
714        working directory the same.
715
716    """
717    saved_dir = os.getcwd()
718    try:
719        os.chdir(path)
720    except OSError:
721        if not quiet:
722            raise
723        warnings.warn('tests may fail, unable to change CWD to: ' + path,
724                      RuntimeWarning, stacklevel=3)
725    try:
726        yield os.getcwd()
727    finally:
728        os.chdir(saved_dir)
729
730
731@contextlib.contextmanager
732def temp_cwd(name='tempcwd', quiet=False):
733    """
734    Context manager that creates a temporary directory and set it as CWD.
735
736    The new CWD is created in the current directory and it's named *name*.
737    If *quiet* is False (default) and it's not possible to create or change
738    the CWD, an error is raised.  If it's True, only a warning is raised
739    and the original CWD is used.
740    """
741    if (have_unicode and isinstance(name, unicode) and
742        not os.path.supports_unicode_filenames):
743        try:
744            name = name.encode(sys.getfilesystemencoding() or 'ascii')
745        except UnicodeEncodeError:
746            if not quiet:
747                raise unittest.SkipTest('unable to encode the cwd name with '
748                                        'the filesystem encoding.')
749    saved_dir = os.getcwd()
750    is_temporary = False
751    try:
752        os.mkdir(name)
753        os.chdir(name)
754        is_temporary = True
755    except OSError:
756        if not quiet:
757            raise
758        warnings.warn('tests may fail, unable to change the CWD to ' + name,
759                      RuntimeWarning, stacklevel=3)
760    try:
761        yield os.getcwd()
762    finally:
763        os.chdir(saved_dir)
764        if is_temporary:
765            rmtree(name)
766
767
768def findfile(file, here=__file__, subdir=None):
769    """Try to find a file on sys.path and the working directory.  If it is not
770    found the argument passed to the function is returned (this does not
771    necessarily signal failure; could still be the legitimate path)."""
772    if os.path.isabs(file):
773        return file
774    if subdir is not None:
775        file = os.path.join(subdir, file)
776    path = sys.path
777    path = [os.path.dirname(here)] + path
778    for dn in path:
779        fn = os.path.join(dn, file)
780        if os.path.exists(fn): return fn
781    return file
782
783def sortdict(dict):
784    "Like repr(dict), but in sorted order."
785    items = dict.items()
786    items.sort()
787    reprpairs = ["%r: %r" % pair for pair in items]
788    withcommas = ", ".join(reprpairs)
789    return "{%s}" % withcommas
790
791def make_bad_fd():
792    """
793    Create an invalid file descriptor by opening and closing a file and return
794    its fd.
795    """
796    file = open(TESTFN, "wb")
797    try:
798        return file.fileno()
799    finally:
800        file.close()
801        unlink(TESTFN)
802
803def check_syntax_error(testcase, statement):
804    testcase.assertRaises(SyntaxError, compile, statement,
805                          '<test string>', 'exec')
806
807def open_urlresource(url, check=None):
808    import urlparse, urllib2
809
810    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
811
812    fn = os.path.join(os.path.dirname(__file__), "data", filename)
813
814    def check_valid_file(fn):
815        f = open(fn)
816        if check is None:
817            return f
818        elif check(f):
819            f.seek(0)
820            return f
821        f.close()
822
823    if os.path.exists(fn):
824        f = check_valid_file(fn)
825        if f is not None:
826            return f
827        unlink(fn)
828
829    # Verify the requirement before downloading the file
830    requires('urlfetch')
831
832    print >> get_original_stdout(), '\tfetching %s ...' % url
833    f = urllib2.urlopen(url, timeout=15)
834    try:
835        with open(fn, "wb") as out:
836            s = f.read()
837            while s:
838                out.write(s)
839                s = f.read()
840    finally:
841        f.close()
842
843    f = check_valid_file(fn)
844    if f is not None:
845        return f
846    raise TestFailed('invalid resource "%s"' % fn)
847
848
849class WarningsRecorder(object):
850    """Convenience wrapper for the warnings list returned on
851       entry to the warnings.catch_warnings() context manager.
852    """
853    def __init__(self, warnings_list):
854        self._warnings = warnings_list
855        self._last = 0
856
857    def __getattr__(self, attr):
858        if len(self._warnings) > self._last:
859            return getattr(self._warnings[-1], attr)
860        elif attr in warnings.WarningMessage._WARNING_DETAILS:
861            return None
862        raise AttributeError("%r has no attribute %r" % (self, attr))
863
864    @property
865    def warnings(self):
866        return self._warnings[self._last:]
867
868    def reset(self):
869        self._last = len(self._warnings)
870
871
872def _filterwarnings(filters, quiet=False):
873    """Catch the warnings, then check if all the expected
874    warnings have been raised and re-raise unexpected warnings.
875    If 'quiet' is True, only re-raise the unexpected warnings.
876    """
877    # Clear the warning registry of the calling module
878    # in order to re-raise the warnings.
879    frame = sys._getframe(2)
880    registry = frame.f_globals.get('__warningregistry__')
881    if registry:
882        registry.clear()
883    with warnings.catch_warnings(record=True) as w:
884        # Set filter "always" to record all warnings.  Because
885        # test_warnings swap the module, we need to look up in
886        # the sys.modules dictionary.
887        sys.modules['warnings'].simplefilter("always")
888        yield WarningsRecorder(w)
889    # Filter the recorded warnings
890    reraise = [warning.message for warning in w]
891    missing = []
892    for msg, cat in filters:
893        seen = False
894        for exc in reraise[:]:
895            message = str(exc)
896            # Filter out the matching messages
897            if (re.match(msg, message, re.I) and
898                issubclass(exc.__class__, cat)):
899                seen = True
900                reraise.remove(exc)
901        if not seen and not quiet:
902            # This filter caught nothing
903            missing.append((msg, cat.__name__))
904    if reraise:
905        raise AssertionError("unhandled warning %r" % reraise[0])
906    if missing:
907        raise AssertionError("filter (%r, %s) did not catch any warning" %
908                             missing[0])
909
910
911@contextlib.contextmanager
912def check_warnings(*filters, **kwargs):
913    """Context manager to silence warnings.
914
915    Accept 2-tuples as positional arguments:
916        ("message regexp", WarningCategory)
917
918    Optional argument:
919     - if 'quiet' is True, it does not fail if a filter catches nothing
920        (default True without argument,
921         default False if some filters are defined)
922
923    Without argument, it defaults to:
924        check_warnings(("", Warning), quiet=True)
925    """
926    quiet = kwargs.get('quiet')
927    if not filters:
928        filters = (("", Warning),)
929        # Preserve backward compatibility
930        if quiet is None:
931            quiet = True
932    return _filterwarnings(filters, quiet)
933
934
935@contextlib.contextmanager
936def check_py3k_warnings(*filters, **kwargs):
937    """Context manager to silence py3k warnings.
938
939    Accept 2-tuples as positional arguments:
940        ("message regexp", WarningCategory)
941
942    Optional argument:
943     - if 'quiet' is True, it does not fail if a filter catches nothing
944        (default False)
945
946    Without argument, it defaults to:
947        check_py3k_warnings(("", DeprecationWarning), quiet=False)
948    """
949    if sys.py3kwarning:
950        if not filters:
951            filters = (("", DeprecationWarning),)
952    else:
953        # It should not raise any py3k warning
954        filters = ()
955    return _filterwarnings(filters, kwargs.get('quiet'))
956
957
958class CleanImport(object):
959    """Context manager to force import to return a new module reference.
960
961    This is useful for testing module-level behaviours, such as
962    the emission of a DeprecationWarning on import.
963
964    Use like this:
965
966        with CleanImport("foo"):
967            importlib.import_module("foo") # new reference
968    """
969
970    def __init__(self, *module_names):
971        self.original_modules = sys.modules.copy()
972        for module_name in module_names:
973            if module_name in sys.modules:
974                module = sys.modules[module_name]
975                # It is possible that module_name is just an alias for
976                # another module (e.g. stub for modules renamed in 3.x).
977                # In that case, we also need delete the real module to clear
978                # the import cache.
979                if module.__name__ != module_name:
980                    del sys.modules[module.__name__]
981                del sys.modules[module_name]
982
983    def __enter__(self):
984        return self
985
986    def __exit__(self, *ignore_exc):
987        sys.modules.update(self.original_modules)
988
989
990class EnvironmentVarGuard(UserDict.DictMixin):
991
992    """Class to help protect the environment variable properly.  Can be used as
993    a context manager."""
994
995    def __init__(self):
996        self._environ = os.environ
997        self._changed = {}
998
999    def __getitem__(self, envvar):
1000        return self._environ[envvar]
1001
1002    def __setitem__(self, envvar, value):
1003        # Remember the initial value on the first access
1004        if envvar not in self._changed:
1005            self._changed[envvar] = self._environ.get(envvar)
1006        self._environ[envvar] = value
1007
1008    def __delitem__(self, envvar):
1009        # Remember the initial value on the first access
1010        if envvar not in self._changed:
1011            self._changed[envvar] = self._environ.get(envvar)
1012        if envvar in self._environ:
1013            del self._environ[envvar]
1014
1015    def keys(self):
1016        return self._environ.keys()
1017
1018    def set(self, envvar, value):
1019        self[envvar] = value
1020
1021    def unset(self, envvar):
1022        del self[envvar]
1023
1024    def __enter__(self):
1025        return self
1026
1027    def __exit__(self, *ignore_exc):
1028        for (k, v) in self._changed.items():
1029            if v is None:
1030                if k in self._environ:
1031                    del self._environ[k]
1032            else:
1033                self._environ[k] = v
1034        os.environ = self._environ
1035
1036
1037class DirsOnSysPath(object):
1038    """Context manager to temporarily add directories to sys.path.
1039
1040    This makes a copy of sys.path, appends any directories given
1041    as positional arguments, then reverts sys.path to the copied
1042    settings when the context ends.
1043
1044    Note that *all* sys.path modifications in the body of the
1045    context manager, including replacement of the object,
1046    will be reverted at the end of the block.
1047    """
1048
1049    def __init__(self, *paths):
1050        self.original_value = sys.path[:]
1051        self.original_object = sys.path
1052        sys.path.extend(paths)
1053
1054    def __enter__(self):
1055        return self
1056
1057    def __exit__(self, *ignore_exc):
1058        sys.path = self.original_object
1059        sys.path[:] = self.original_value
1060
1061
1062class TransientResource(object):
1063
1064    """Raise ResourceDenied if an exception is raised while the context manager
1065    is in effect that matches the specified exception and attributes."""
1066
1067    def __init__(self, exc, **kwargs):
1068        self.exc = exc
1069        self.attrs = kwargs
1070
1071    def __enter__(self):
1072        return self
1073
1074    def __exit__(self, type_=None, value=None, traceback=None):
1075        """If type_ is a subclass of self.exc and value has attributes matching
1076        self.attrs, raise ResourceDenied.  Otherwise let the exception
1077        propagate (if any)."""
1078        if type_ is not None and issubclass(self.exc, type_):
1079            for attr, attr_value in self.attrs.iteritems():
1080                if not hasattr(value, attr):
1081                    break
1082                if getattr(value, attr) != attr_value:
1083                    break
1084            else:
1085                raise ResourceDenied("an optional resource is not available")
1086
1087
1088@contextlib.contextmanager
1089def transient_internet(resource_name, timeout=30.0, errnos=()):
1090    """Return a context manager that raises ResourceDenied when various issues
1091    with the Internet connection manifest themselves as exceptions."""
1092    default_errnos = [
1093        ('ECONNREFUSED', 111),
1094        ('ECONNRESET', 104),
1095        ('EHOSTUNREACH', 113),
1096        ('ENETUNREACH', 101),
1097        ('ETIMEDOUT', 110),
1098    ]
1099    default_gai_errnos = [
1100        ('EAI_AGAIN', -3),
1101        ('EAI_FAIL', -4),
1102        ('EAI_NONAME', -2),
1103        ('EAI_NODATA', -5),
1104        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
1105        # implementation actually returns WSANO_DATA i.e. 11004.
1106        ('WSANO_DATA', 11004),
1107    ]
1108
1109    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
1110    captured_errnos = errnos
1111    gai_errnos = []
1112    if not captured_errnos:
1113        captured_errnos = [getattr(errno, name, num)
1114                           for (name, num) in default_errnos]
1115        gai_errnos = [getattr(socket, name, num)
1116                      for (name, num) in default_gai_errnos]
1117
1118    def filter_error(err):
1119        n = getattr(err, 'errno', None)
1120        if (isinstance(err, socket.timeout) or
1121            (isinstance(err, socket.gaierror) and n in gai_errnos) or
1122            n in captured_errnos):
1123            if not verbose:
1124                sys.stderr.write(denied.args[0] + "\n")
1125            raise denied
1126
1127    old_timeout = socket.getdefaulttimeout()
1128    try:
1129        if timeout is not None:
1130            socket.setdefaulttimeout(timeout)
1131        yield
1132    except IOError as err:
1133        # urllib can wrap original socket errors multiple times (!), we must
1134        # unwrap to get at the original error.
1135        while True:
1136            a = err.args
1137            if len(a) >= 1 and isinstance(a[0], IOError):
1138                err = a[0]
1139            # The error can also be wrapped as args[1]:
1140            #    except socket.error as msg:
1141            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
1142            elif len(a) >= 2 and isinstance(a[1], IOError):
1143                err = a[1]
1144            else:
1145                break
1146        filter_error(err)
1147        raise
1148    # XXX should we catch generic exceptions and look for their
1149    # __cause__ or __context__?
1150    finally:
1151        socket.setdefaulttimeout(old_timeout)
1152
1153
1154@contextlib.contextmanager
1155def captured_output(stream_name):
1156    """Return a context manager used by captured_stdout and captured_stdin
1157    that temporarily replaces the sys stream *stream_name* with a StringIO."""
1158    import StringIO
1159    orig_stdout = getattr(sys, stream_name)
1160    setattr(sys, stream_name, StringIO.StringIO())
1161    try:
1162        yield getattr(sys, stream_name)
1163    finally:
1164        setattr(sys, stream_name, orig_stdout)
1165
1166def captured_stdout():
1167    """Capture the output of sys.stdout:
1168
1169       with captured_stdout() as s:
1170           print "hello"
1171       self.assertEqual(s.getvalue(), "hello")
1172    """
1173    return captured_output("stdout")
1174
1175def captured_stderr():
1176    return captured_output("stderr")
1177
1178def captured_stdin():
1179    return captured_output("stdin")
1180
1181def gc_collect():
1182    """Force as many objects as possible to be collected.
1183
1184    In non-CPython implementations of Python, this is needed because timely
1185    deallocation is not guaranteed by the garbage collector.  (Even in CPython
1186    this can be the case in case of reference cycles.)  This means that __del__
1187    methods may be called later than expected and weakrefs may remain alive for
1188    longer than expected.  This function tries its best to force all garbage
1189    objects to disappear.
1190    """
1191    gc.collect()
1192    if is_jython:
1193        time.sleep(0.1)
1194    gc.collect()
1195    gc.collect()
1196
1197
1198_header = '2P'
1199if hasattr(sys, "gettotalrefcount"):
1200    _header = '2P' + _header
1201_vheader = _header + 'P'
1202
1203def calcobjsize(fmt):
1204    return struct.calcsize(_header + fmt + '0P')
1205
1206def calcvobjsize(fmt):
1207    return struct.calcsize(_vheader + fmt + '0P')
1208
1209
1210_TPFLAGS_HAVE_GC = 1<<14
1211_TPFLAGS_HEAPTYPE = 1<<9
1212
1213def check_sizeof(test, o, size):
1214    import _testcapi
1215    result = sys.getsizeof(o)
1216    # add GC header size
1217    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
1218        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
1219        size += _testcapi.SIZEOF_PYGC_HEAD
1220    msg = 'wrong size for %s: got %d, expected %d' \
1221            % (type(o), result, size)
1222    test.assertEqual(result, size, msg)
1223
1224
1225#=======================================================================
1226# Decorator for running a function in a different locale, correctly resetting
1227# it afterwards.
1228
1229def run_with_locale(catstr, *locales):
1230    def decorator(func):
1231        def inner(*args, **kwds):
1232            try:
1233                import locale
1234                category = getattr(locale, catstr)
1235                orig_locale = locale.setlocale(category)
1236            except AttributeError:
1237                # if the test author gives us an invalid category string
1238                raise
1239            except:
1240                # cannot retrieve original locale, so do nothing
1241                locale = orig_locale = None
1242            else:
1243                for loc in locales:
1244                    try:
1245                        locale.setlocale(category, loc)
1246                        break
1247                    except:
1248                        pass
1249
1250            # now run the function, resetting the locale on exceptions
1251            try:
1252                return func(*args, **kwds)
1253            finally:
1254                if locale and orig_locale:
1255                    locale.setlocale(category, orig_locale)
1256        inner.func_name = func.func_name
1257        inner.__doc__ = func.__doc__
1258        return inner
1259    return decorator
1260
1261#=======================================================================
1262# Decorator for running a function in a specific timezone, correctly
1263# resetting it afterwards.
1264
1265def run_with_tz(tz):
1266    def decorator(func):
1267        def inner(*args, **kwds):
1268            try:
1269                tzset = time.tzset
1270            except AttributeError:
1271                raise unittest.SkipTest("tzset required")
1272            if 'TZ' in os.environ:
1273                orig_tz = os.environ['TZ']
1274            else:
1275                orig_tz = None
1276            os.environ['TZ'] = tz
1277            tzset()
1278
1279            # now run the function, resetting the tz on exceptions
1280            try:
1281                return func(*args, **kwds)
1282            finally:
1283                if orig_tz is None:
1284                    del os.environ['TZ']
1285                else:
1286                    os.environ['TZ'] = orig_tz
1287                time.tzset()
1288
1289        inner.__name__ = func.__name__
1290        inner.__doc__ = func.__doc__
1291        return inner
1292    return decorator
1293
1294#=======================================================================
1295# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
1296
1297# Some handy shorthands. Note that these are used for byte-limits as well
1298# as size-limits, in the various bigmem tests
1299_1M = 1024*1024
1300_1G = 1024 * _1M
1301_2G = 2 * _1G
1302_4G = 4 * _1G
1303
1304MAX_Py_ssize_t = sys.maxsize
1305
1306def set_memlimit(limit):
1307    global max_memuse
1308    global real_max_memuse
1309    sizes = {
1310        'k': 1024,
1311        'm': _1M,
1312        'g': _1G,
1313        't': 1024*_1G,
1314    }
1315    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
1316                 re.IGNORECASE | re.VERBOSE)
1317    if m is None:
1318        raise ValueError('Invalid memory limit %r' % (limit,))
1319    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
1320    real_max_memuse = memlimit
1321    if memlimit > MAX_Py_ssize_t:
1322        memlimit = MAX_Py_ssize_t
1323    if memlimit < _2G - 1:
1324        raise ValueError('Memory limit %r too low to be useful' % (limit,))
1325    max_memuse = memlimit
1326
1327def bigmemtest(minsize, memuse, overhead=5*_1M):
1328    """Decorator for bigmem tests.
1329
1330    'minsize' is the minimum useful size for the test (in arbitrary,
1331    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
1332    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
1333    independent of the testsize, and defaults to 5Mb.
1334
1335    The decorator tries to guess a good value for 'size' and passes it to
1336    the decorated test function. If minsize * memuse is more than the
1337    allowed memory use (as defined by max_memuse), the test is skipped.
1338    Otherwise, minsize is adjusted upward to use up to max_memuse.
1339    """
1340    def decorator(f):
1341        def wrapper(self):
1342            if not max_memuse:
1343                # If max_memuse is 0 (the default),
1344                # we still want to run the tests with size set to a few kb,
1345                # to make sure they work. We still want to avoid using
1346                # too much memory, though, but we do that noisily.
1347                maxsize = 5147
1348                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
1349            else:
1350                maxsize = int((max_memuse - overhead) / memuse)
1351                if maxsize < minsize:
1352                    # Really ought to print 'test skipped' or something
1353                    if verbose:
1354                        sys.stderr.write("Skipping %s because of memory "
1355                                         "constraint\n" % (f.__name__,))
1356                    return
1357                # Try to keep some breathing room in memory use
1358                maxsize = max(maxsize - 50 * _1M, minsize)
1359            return f(self, maxsize)
1360        wrapper.minsize = minsize
1361        wrapper.memuse = memuse
1362        wrapper.overhead = overhead
1363        return wrapper
1364    return decorator
1365
1366def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
1367    def decorator(f):
1368        def wrapper(self):
1369            if not real_max_memuse:
1370                maxsize = 5147
1371            else:
1372                maxsize = size
1373
1374            if ((real_max_memuse or not dry_run)
1375                and real_max_memuse < maxsize * memuse):
1376                if verbose:
1377                    sys.stderr.write("Skipping %s because of memory "
1378                                     "constraint\n" % (f.__name__,))
1379                return
1380
1381            return f(self, maxsize)
1382        wrapper.size = size
1383        wrapper.memuse = memuse
1384        wrapper.overhead = overhead
1385        return wrapper
1386    return decorator
1387
1388def bigaddrspacetest(f):
1389    """Decorator for tests that fill the address space."""
1390    def wrapper(self):
1391        if max_memuse < MAX_Py_ssize_t:
1392            if verbose:
1393                sys.stderr.write("Skipping %s because of memory "
1394                                 "constraint\n" % (f.__name__,))
1395        else:
1396            return f(self)
1397    return wrapper
1398
1399#=======================================================================
1400# unittest integration.
1401
1402class BasicTestRunner:
1403    def run(self, test):
1404        result = unittest.TestResult()
1405        test(result)
1406        return result
1407
1408def _id(obj):
1409    return obj
1410
1411def requires_resource(resource):
1412    if resource == 'gui' and not _is_gui_available():
1413        return unittest.skip(_is_gui_available.reason)
1414    if is_resource_enabled(resource):
1415        return _id
1416    else:
1417        return unittest.skip("resource {0!r} is not enabled".format(resource))
1418
1419def cpython_only(test):
1420    """
1421    Decorator for tests only applicable on CPython.
1422    """
1423    return impl_detail(cpython=True)(test)
1424
1425def impl_detail(msg=None, **guards):
1426    if check_impl_detail(**guards):
1427        return _id
1428    if msg is None:
1429        guardnames, default = _parse_guards(guards)
1430        if default:
1431            msg = "implementation detail not available on {0}"
1432        else:
1433            msg = "implementation detail specific to {0}"
1434        guardnames = sorted(guardnames.keys())
1435        msg = msg.format(' or '.join(guardnames))
1436    return unittest.skip(msg)
1437
1438def _parse_guards(guards):
1439    # Returns a tuple ({platform_name: run_me}, default_value)
1440    if not guards:
1441        return ({'cpython': True}, False)
1442    is_true = guards.values()[0]
1443    assert guards.values() == [is_true] * len(guards)   # all True or all False
1444    return (guards, not is_true)
1445
1446# Use the following check to guard CPython's implementation-specific tests --
1447# or to run them only on the implementation(s) guarded by the arguments.
1448def check_impl_detail(**guards):
1449    """This function returns True or False depending on the host platform.
1450       Examples:
1451          if check_impl_detail():               # only on CPython (default)
1452          if check_impl_detail(jython=True):    # only on Jython
1453          if check_impl_detail(cpython=False):  # everywhere except on CPython
1454    """
1455    guards, default = _parse_guards(guards)
1456    return guards.get(platform.python_implementation().lower(), default)
1457
1458
1459
1460def _run_suite(suite):
1461    """Run tests from a unittest.TestSuite-derived class."""
1462    if verbose:
1463        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
1464    else:
1465        runner = BasicTestRunner()
1466
1467    result = runner.run(suite)
1468    if not result.wasSuccessful():
1469        if len(result.errors) == 1 and not result.failures:
1470            err = result.errors[0][1]
1471        elif len(result.failures) == 1 and not result.errors:
1472            err = result.failures[0][1]
1473        else:
1474            err = "multiple errors occurred"
1475            if not verbose:
1476                err += "; run in verbose mode for details"
1477        raise TestFailed(err)
1478
1479
1480def run_unittest(*classes):
1481    """Run tests from unittest.TestCase-derived classes."""
1482    valid_types = (unittest.TestSuite, unittest.TestCase)
1483    suite = unittest.TestSuite()
1484    for cls in classes:
1485        if isinstance(cls, str):
1486            if cls in sys.modules:
1487                suite.addTest(unittest.findTestCases(sys.modules[cls]))
1488            else:
1489                raise ValueError("str arguments must be keys in sys.modules")
1490        elif isinstance(cls, valid_types):
1491            suite.addTest(cls)
1492        else:
1493            suite.addTest(unittest.makeSuite(cls))
1494    _run_suite(suite)
1495
1496#=======================================================================
1497# Check for the presence of docstrings.
1498
1499HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
1500                   sys.platform == 'win32' or
1501                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
1502
1503requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
1504                                          "test requires docstrings")
1505
1506
1507#=======================================================================
1508# doctest driver.
1509
1510def run_doctest(module, verbosity=None):
1511    """Run doctest on the given module.  Return (#failures, #tests).
1512
1513    If optional argument verbosity is not specified (or is None), pass
1514    test_support's belief about verbosity on to doctest.  Else doctest's
1515    usual behavior is used (it searches sys.argv for -v).
1516    """
1517
1518    import doctest
1519
1520    if verbosity is None:
1521        verbosity = verbose
1522    else:
1523        verbosity = None
1524
1525    # Direct doctest output (normally just errors) to real stdout; doctest
1526    # output shouldn't be compared by regrtest.
1527    save_stdout = sys.stdout
1528    sys.stdout = get_original_stdout()
1529    try:
1530        f, t = doctest.testmod(module, verbose=verbosity)
1531        if f:
1532            raise TestFailed("%d of %d doctests failed" % (f, t))
1533    finally:
1534        sys.stdout = save_stdout
1535    if verbose:
1536        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
1537    return f, t
1538
1539#=======================================================================
1540# Threading support to prevent reporting refleaks when running regrtest.py -R
1541
1542# NOTE: we use thread._count() rather than threading.enumerate() (or the
1543# moral equivalent thereof) because a threading.Thread object is still alive
1544# until its __bootstrap() method has returned, even after it has been
1545# unregistered from the threading module.
1546# thread._count(), on the other hand, only gets decremented *after* the
1547# __bootstrap() method has returned, which gives us reliable reference counts
1548# at the end of a test run.
1549
1550def threading_setup():
1551    if thread:
1552        return thread._count(),
1553    else:
1554        return 1,
1555
1556def threading_cleanup(nb_threads):
1557    if not thread:
1558        return
1559
1560    _MAX_COUNT = 10
1561    for count in range(_MAX_COUNT):
1562        n = thread._count()
1563        if n == nb_threads:
1564            break
1565        time.sleep(0.1)
1566    # XXX print a warning in case of failure?
1567
1568def reap_threads(func):
1569    """Use this function when threads are being used.  This will
1570    ensure that the threads are cleaned up even when the test fails.
1571    If threading is unavailable this function does nothing.
1572    """
1573    if not thread:
1574        return func
1575
1576    @functools.wraps(func)
1577    def decorator(*args):
1578        key = threading_setup()
1579        try:
1580            return func(*args)
1581        finally:
1582            threading_cleanup(*key)
1583    return decorator
1584
1585def reap_children():
1586    """Use this function at the end of test_main() whenever sub-processes
1587    are started.  This will help ensure that no extra children (zombies)
1588    stick around to hog resources and create problems when looking
1589    for refleaks.
1590    """
1591
1592    # Reap all our dead child processes so we don't leave zombies around.
1593    # These hog resources and might be causing some of the buildbots to die.
1594    if hasattr(os, 'waitpid'):
1595        any_process = -1
1596        while True:
1597            try:
1598                # This will raise an exception on Windows.  That's ok.
1599                pid, status = os.waitpid(any_process, os.WNOHANG)
1600                if pid == 0:
1601                    break
1602            except:
1603                break
1604
1605@contextlib.contextmanager
1606def start_threads(threads, unlock=None):
1607    threads = list(threads)
1608    started = []
1609    try:
1610        try:
1611            for t in threads:
1612                t.start()
1613                started.append(t)
1614        except:
1615            if verbose:
1616                print("Can't start %d threads, only %d threads started" %
1617                      (len(threads), len(started)))
1618            raise
1619        yield
1620    finally:
1621        if unlock:
1622            unlock()
1623        endtime = starttime = time.time()
1624        for timeout in range(1, 16):
1625            endtime += 60
1626            for t in started:
1627                t.join(max(endtime - time.time(), 0.01))
1628            started = [t for t in started if t.isAlive()]
1629            if not started:
1630                break
1631            if verbose:
1632                print('Unable to join %d threads during a period of '
1633                      '%d minutes' % (len(started), timeout))
1634    started = [t for t in started if t.isAlive()]
1635    if started:
1636        raise AssertionError('Unable to join %d threads' % len(started))
1637
1638@contextlib.contextmanager
1639def swap_attr(obj, attr, new_val):
1640    """Temporary swap out an attribute with a new object.
1641
1642    Usage:
1643        with swap_attr(obj, "attr", 5):
1644            ...
1645
1646        This will set obj.attr to 5 for the duration of the with: block,
1647        restoring the old value at the end of the block. If `attr` doesn't
1648        exist on `obj`, it will be created and then deleted at the end of the
1649        block.
1650    """
1651    if hasattr(obj, attr):
1652        real_val = getattr(obj, attr)
1653        setattr(obj, attr, new_val)
1654        try:
1655            yield
1656        finally:
1657            setattr(obj, attr, real_val)
1658    else:
1659        setattr(obj, attr, new_val)
1660        try:
1661            yield
1662        finally:
1663            delattr(obj, attr)
1664
1665def py3k_bytes(b):
1666    """Emulate the py3k bytes() constructor.
1667
1668    NOTE: This is only a best effort function.
1669    """
1670    try:
1671        # memoryview?
1672        return b.tobytes()
1673    except AttributeError:
1674        try:
1675            # iterable of ints?
1676            return b"".join(chr(x) for x in b)
1677        except TypeError:
1678            return bytes(b)
1679
1680def args_from_interpreter_flags():
1681    """Return a list of command-line arguments reproducing the current
1682    settings in sys.flags."""
1683    import subprocess
1684    return subprocess._args_from_interpreter_flags()
1685
1686def strip_python_stderr(stderr):
1687    """Strip the stderr of a Python process from potential debug output
1688    emitted by the interpreter.
1689
1690    This will typically be run on the result of the communicate() method
1691    of a subprocess.Popen object.
1692    """
1693    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
1694    return stderr
1695
1696
1697def check_free_after_iterating(test, iter, cls, args=()):
1698    class A(cls):
1699        def __del__(self):
1700            done[0] = True
1701            try:
1702                next(it)
1703            except StopIteration:
1704                pass
1705
1706    done = [False]
1707    it = iter(A(*args))
1708    # Issue 26494: Shouldn't crash
1709    test.assertRaises(StopIteration, next, it)
1710    # The sequence should be deallocated just after the end of iterating
1711    gc_collect()
1712    test.assertTrue(done[0])
1713