• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import builtins
2import locale
3import logging
4import os
5import shutil
6import sys
7import sysconfig
8import warnings
9from test import support
10try:
11    import threading
12except ImportError:
13    threading = None
14try:
15    import _multiprocessing, multiprocessing.process
16except ImportError:
17    multiprocessing = None
18
19
20# Unit tests are supposed to leave the execution environment unchanged
21# once they complete.  But sometimes tests have bugs, especially when
22# tests fail, and the changes to environment go on to mess up other
23# tests.  This can cause issues with buildbot stability, since tests
24# are run in random order and so problems may appear to come and go.
25# There are a few things we can save and restore to mitigate this, and
26# the following context manager handles this task.
27
28class saved_test_environment:
29    """Save bits of the test environment and restore them at block exit.
30
31        with saved_test_environment(testname, verbose, quiet):
32            #stuff
33
34    Unless quiet is True, a warning is printed to stderr if any of
35    the saved items was changed by the test.  The attribute 'changed'
36    is initially False, but is set to True if a change is detected.
37
38    If verbose is more than 1, the before and after state of changed
39    items is also printed.
40    """
41
42    changed = False
43
44    def __init__(self, testname, verbose=0, quiet=False, *, pgo=False):
45        self.testname = testname
46        self.verbose = verbose
47        self.quiet = quiet
48        self.pgo = pgo
49
50    # To add things to save and restore, add a name XXX to the resources list
51    # and add corresponding get_XXX/restore_XXX functions.  get_XXX should
52    # return the value to be saved and compared against a second call to the
53    # get function when test execution completes.  restore_XXX should accept
54    # the saved value and restore the resource using it.  It will be called if
55    # and only if a change in the value is detected.
56    #
57    # Note: XXX will have any '.' replaced with '_' characters when determining
58    # the corresponding method names.
59
60    resources = ('sys.argv', 'cwd', 'sys.stdin', 'sys.stdout', 'sys.stderr',
61                 'os.environ', 'sys.path', 'sys.path_hooks', '__import__',
62                 'warnings.filters', 'asyncore.socket_map',
63                 'logging._handlers', 'logging._handlerList', 'sys.gettrace',
64                 'sys.warnoptions',
65                 # multiprocessing.process._cleanup() may release ref
66                 # to a thread, so check processes first.
67                 'multiprocessing.process._dangling', 'threading._dangling',
68                 'sysconfig._CONFIG_VARS', 'sysconfig._INSTALL_SCHEMES',
69                 'files', 'locale', 'warnings.showwarning',
70                 'shutil_archive_formats', 'shutil_unpack_formats',
71                )
72
73    def get_sys_argv(self):
74        return id(sys.argv), sys.argv, sys.argv[:]
75    def restore_sys_argv(self, saved_argv):
76        sys.argv = saved_argv[1]
77        sys.argv[:] = saved_argv[2]
78
79    def get_cwd(self):
80        return os.getcwd()
81    def restore_cwd(self, saved_cwd):
82        os.chdir(saved_cwd)
83
84    def get_sys_stdout(self):
85        return sys.stdout
86    def restore_sys_stdout(self, saved_stdout):
87        sys.stdout = saved_stdout
88
89    def get_sys_stderr(self):
90        return sys.stderr
91    def restore_sys_stderr(self, saved_stderr):
92        sys.stderr = saved_stderr
93
94    def get_sys_stdin(self):
95        return sys.stdin
96    def restore_sys_stdin(self, saved_stdin):
97        sys.stdin = saved_stdin
98
99    def get_os_environ(self):
100        return id(os.environ), os.environ, dict(os.environ)
101    def restore_os_environ(self, saved_environ):
102        os.environ = saved_environ[1]
103        os.environ.clear()
104        os.environ.update(saved_environ[2])
105
106    def get_sys_path(self):
107        return id(sys.path), sys.path, sys.path[:]
108    def restore_sys_path(self, saved_path):
109        sys.path = saved_path[1]
110        sys.path[:] = saved_path[2]
111
112    def get_sys_path_hooks(self):
113        return id(sys.path_hooks), sys.path_hooks, sys.path_hooks[:]
114    def restore_sys_path_hooks(self, saved_hooks):
115        sys.path_hooks = saved_hooks[1]
116        sys.path_hooks[:] = saved_hooks[2]
117
118    def get_sys_gettrace(self):
119        return sys.gettrace()
120    def restore_sys_gettrace(self, trace_fxn):
121        sys.settrace(trace_fxn)
122
123    def get___import__(self):
124        return builtins.__import__
125    def restore___import__(self, import_):
126        builtins.__import__ = import_
127
128    def get_warnings_filters(self):
129        return id(warnings.filters), warnings.filters, warnings.filters[:]
130    def restore_warnings_filters(self, saved_filters):
131        warnings.filters = saved_filters[1]
132        warnings.filters[:] = saved_filters[2]
133
134    def get_asyncore_socket_map(self):
135        asyncore = sys.modules.get('asyncore')
136        # XXX Making a copy keeps objects alive until __exit__ gets called.
137        return asyncore and asyncore.socket_map.copy() or {}
138    def restore_asyncore_socket_map(self, saved_map):
139        asyncore = sys.modules.get('asyncore')
140        if asyncore is not None:
141            asyncore.close_all(ignore_all=True)
142            asyncore.socket_map.update(saved_map)
143
144    def get_shutil_archive_formats(self):
145        # we could call get_archives_formats() but that only returns the
146        # registry keys; we want to check the values too (the functions that
147        # are registered)
148        return shutil._ARCHIVE_FORMATS, shutil._ARCHIVE_FORMATS.copy()
149    def restore_shutil_archive_formats(self, saved):
150        shutil._ARCHIVE_FORMATS = saved[0]
151        shutil._ARCHIVE_FORMATS.clear()
152        shutil._ARCHIVE_FORMATS.update(saved[1])
153
154    def get_shutil_unpack_formats(self):
155        return shutil._UNPACK_FORMATS, shutil._UNPACK_FORMATS.copy()
156    def restore_shutil_unpack_formats(self, saved):
157        shutil._UNPACK_FORMATS = saved[0]
158        shutil._UNPACK_FORMATS.clear()
159        shutil._UNPACK_FORMATS.update(saved[1])
160
161    def get_logging__handlers(self):
162        # _handlers is a WeakValueDictionary
163        return id(logging._handlers), logging._handlers, logging._handlers.copy()
164    def restore_logging__handlers(self, saved_handlers):
165        # Can't easily revert the logging state
166        pass
167
168    def get_logging__handlerList(self):
169        # _handlerList is a list of weakrefs to handlers
170        return id(logging._handlerList), logging._handlerList, logging._handlerList[:]
171    def restore_logging__handlerList(self, saved_handlerList):
172        # Can't easily revert the logging state
173        pass
174
175    def get_sys_warnoptions(self):
176        return id(sys.warnoptions), sys.warnoptions, sys.warnoptions[:]
177    def restore_sys_warnoptions(self, saved_options):
178        sys.warnoptions = saved_options[1]
179        sys.warnoptions[:] = saved_options[2]
180
181    # Controlling dangling references to Thread objects can make it easier
182    # to track reference leaks.
183    def get_threading__dangling(self):
184        if not threading:
185            return None
186        # This copies the weakrefs without making any strong reference
187        return threading._dangling.copy()
188    def restore_threading__dangling(self, saved):
189        if not threading:
190            return
191        threading._dangling.clear()
192        threading._dangling.update(saved)
193
194    # Same for Process objects
195    def get_multiprocessing_process__dangling(self):
196        if not multiprocessing:
197            return None
198        # Unjoined process objects can survive after process exits
199        multiprocessing.process._cleanup()
200        # This copies the weakrefs without making any strong reference
201        return multiprocessing.process._dangling.copy()
202    def restore_multiprocessing_process__dangling(self, saved):
203        if not multiprocessing:
204            return
205        multiprocessing.process._dangling.clear()
206        multiprocessing.process._dangling.update(saved)
207
208    def get_sysconfig__CONFIG_VARS(self):
209        # make sure the dict is initialized
210        sysconfig.get_config_var('prefix')
211        return (id(sysconfig._CONFIG_VARS), sysconfig._CONFIG_VARS,
212                dict(sysconfig._CONFIG_VARS))
213    def restore_sysconfig__CONFIG_VARS(self, saved):
214        sysconfig._CONFIG_VARS = saved[1]
215        sysconfig._CONFIG_VARS.clear()
216        sysconfig._CONFIG_VARS.update(saved[2])
217
218    def get_sysconfig__INSTALL_SCHEMES(self):
219        return (id(sysconfig._INSTALL_SCHEMES), sysconfig._INSTALL_SCHEMES,
220                sysconfig._INSTALL_SCHEMES.copy())
221    def restore_sysconfig__INSTALL_SCHEMES(self, saved):
222        sysconfig._INSTALL_SCHEMES = saved[1]
223        sysconfig._INSTALL_SCHEMES.clear()
224        sysconfig._INSTALL_SCHEMES.update(saved[2])
225
226    def get_files(self):
227        return sorted(fn + ('/' if os.path.isdir(fn) else '')
228                      for fn in os.listdir())
229    def restore_files(self, saved_value):
230        fn = support.TESTFN
231        if fn not in saved_value and (fn + '/') not in saved_value:
232            if os.path.isfile(fn):
233                support.unlink(fn)
234            elif os.path.isdir(fn):
235                support.rmtree(fn)
236
237    _lc = [getattr(locale, lc) for lc in dir(locale)
238           if lc.startswith('LC_')]
239    def get_locale(self):
240        pairings = []
241        for lc in self._lc:
242            try:
243                pairings.append((lc, locale.setlocale(lc, None)))
244            except (TypeError, ValueError):
245                continue
246        return pairings
247    def restore_locale(self, saved):
248        for lc, setting in saved:
249            locale.setlocale(lc, setting)
250
251    def get_warnings_showwarning(self):
252        return warnings.showwarning
253    def restore_warnings_showwarning(self, fxn):
254        warnings.showwarning = fxn
255
256    def resource_info(self):
257        for name in self.resources:
258            method_suffix = name.replace('.', '_')
259            get_name = 'get_' + method_suffix
260            restore_name = 'restore_' + method_suffix
261            yield name, getattr(self, get_name), getattr(self, restore_name)
262
263    def __enter__(self):
264        self.saved_values = dict((name, get()) for name, get, restore
265                                                   in self.resource_info())
266        return self
267
268    def __exit__(self, exc_type, exc_val, exc_tb):
269        saved_values = self.saved_values
270        del self.saved_values
271        support.gc_collect()  # Some resources use weak references
272        for name, get, restore in self.resource_info():
273            current = get()
274            original = saved_values.pop(name)
275            # Check for changes to the resource's value
276            if current != original:
277                self.changed = True
278                restore(original)
279                if not self.quiet and not self.pgo:
280                    print(f"Warning -- {name} was modified by {self.testname}",
281                          file=sys.stderr, flush=True)
282                    if self.verbose > 1:
283                        print(f"  Before: {original}\n  After:  {current} ",
284                              file=sys.stderr, flush=True)
285        return False
286