• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import subprocess
10import py_compile
11import zipfile
12
13from importlib.util import source_from_cache
14from test.support import make_legacy_pyc, strip_python_stderr
15
16
17# Cached result of the expensive test performed in the function below.
18__cached_interp_requires_environment = None
19
20def interpreter_requires_environment():
21    """
22    Returns True if our sys.executable interpreter requires environment
23    variables in order to be able to run at all.
24
25    This is designed to be used with @unittest.skipIf() to annotate tests
26    that need to use an assert_python*() function to launch an isolated
27    mode (-I) or no environment mode (-E) sub-interpreter process.
28
29    A normal build & test does not run into this situation but it can happen
30    when trying to run the standard library test suite from an interpreter that
31    doesn't have an obvious home with Python's current home finding logic.
32
33    Setting PYTHONHOME is one way to get most of the testsuite to run in that
34    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
35    variables that might impact whether or not the interpreter can start.
36    """
37    global __cached_interp_requires_environment
38    if __cached_interp_requires_environment is None:
39        # If PYTHONHOME is set, assume that we need it
40        if 'PYTHONHOME' in os.environ:
41            __cached_interp_requires_environment = True
42            return True
43
44        # Try running an interpreter with -E to see if it works or not.
45        try:
46            subprocess.check_call([sys.executable, '-E',
47                                   '-c', 'import sys; sys.exit(0)'])
48        except subprocess.CalledProcessError:
49            __cached_interp_requires_environment = True
50        else:
51            __cached_interp_requires_environment = False
52
53    return __cached_interp_requires_environment
54
55
56class _PythonRunResult(collections.namedtuple("_PythonRunResult",
57                                          ("rc", "out", "err"))):
58    """Helper for reporting Python subprocess run results"""
59    def fail(self, cmd_line):
60        """Provide helpful details about failed subcommand runs"""
61        # Limit to 80 lines to ASCII characters
62        maxlen = 80 * 100
63        out, err = self.out, self.err
64        if len(out) > maxlen:
65            out = b'(... truncated stdout ...)' + out[-maxlen:]
66        if len(err) > maxlen:
67            err = b'(... truncated stderr ...)' + err[-maxlen:]
68        out = out.decode('ascii', 'replace').rstrip()
69        err = err.decode('ascii', 'replace').rstrip()
70        raise AssertionError("Process return code is %d\n"
71                             "command line: %r\n"
72                             "\n"
73                             "stdout:\n"
74                             "---\n"
75                             "%s\n"
76                             "---\n"
77                             "\n"
78                             "stderr:\n"
79                             "---\n"
80                             "%s\n"
81                             "---"
82                             % (self.rc, cmd_line,
83                                out,
84                                err))
85
86
87# Executing the interpreter in a subprocess
88def run_python_until_end(*args, **env_vars):
89    env_required = interpreter_requires_environment()
90    cwd = env_vars.pop('__cwd', None)
91    if '__isolated' in env_vars:
92        isolated = env_vars.pop('__isolated')
93    else:
94        isolated = not env_vars and not env_required
95    cmd_line = [sys.executable, '-X', 'faulthandler']
96    if isolated:
97        # isolated mode: ignore Python environment variables, ignore user
98        # site-packages, and don't add the current directory to sys.path
99        cmd_line.append('-I')
100    elif not env_vars and not env_required:
101        # ignore Python environment variables
102        cmd_line.append('-E')
103
104    # But a special flag that can be set to override -- in this case, the
105    # caller is responsible to pass the full environment.
106    if env_vars.pop('__cleanenv', None):
107        env = {}
108        if sys.platform == 'win32':
109            # Windows requires at least the SYSTEMROOT environment variable to
110            # start Python.
111            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
112
113        # Other interesting environment variables, not copied currently:
114        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
115    else:
116        # Need to preserve the original environment, for in-place testing of
117        # shared library builds.
118        env = os.environ.copy()
119
120    # set TERM='' unless the TERM environment variable is passed explicitly
121    # see issues #11390 and #18300
122    if 'TERM' not in env_vars:
123        env['TERM'] = ''
124
125    env.update(env_vars)
126    cmd_line.extend(args)
127    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
128                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
129                         env=env, cwd=cwd)
130    with proc:
131        try:
132            out, err = proc.communicate()
133        finally:
134            proc.kill()
135            subprocess._cleanup()
136    rc = proc.returncode
137    err = strip_python_stderr(err)
138    return _PythonRunResult(rc, out, err), cmd_line
139
140def _assert_python(expected_success, *args, **env_vars):
141    res, cmd_line = run_python_until_end(*args, **env_vars)
142    if (res.rc and expected_success) or (not res.rc and not expected_success):
143        res.fail(cmd_line)
144    return res
145
146def assert_python_ok(*args, **env_vars):
147    """
148    Assert that running the interpreter with `args` and optional environment
149    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
150    stderr) tuple.
151
152    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
153
154    Python is started in isolated mode (command line option -I),
155    except if the __isolated keyword is set to False.
156    """
157    return _assert_python(True, *args, **env_vars)
158
159def assert_python_failure(*args, **env_vars):
160    """
161    Assert that running the interpreter with `args` and optional environment
162    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
163    stderr) tuple.
164
165    See assert_python_ok() for more options.
166    """
167    return _assert_python(False, *args, **env_vars)
168
169def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
170    """Run a Python subprocess with the given arguments.
171
172    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
173    object.
174    """
175    cmd_line = [sys.executable]
176    if not interpreter_requires_environment():
177        cmd_line.append('-E')
178    cmd_line.extend(args)
179    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
180    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
181    # that.  References:
182    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
183    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
184    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
185    env = kw.setdefault('env', dict(os.environ))
186    env['TERM'] = 'vt100'
187    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
188                            stdout=stdout, stderr=stderr,
189                            **kw)
190
191def kill_python(p):
192    """Run the given Popen process until completion and return stdout."""
193    p.stdin.close()
194    data = p.stdout.read()
195    p.stdout.close()
196    # try to cleanup the child so we don't appear to leak when running
197    # with regrtest -R.
198    p.wait()
199    subprocess._cleanup()
200    return data
201
202def make_script(script_dir, script_basename, source, omit_suffix=False):
203    script_filename = script_basename
204    if not omit_suffix:
205        script_filename += os.extsep + 'py'
206    script_name = os.path.join(script_dir, script_filename)
207    # The script should be encoded to UTF-8, the default string encoding
208    script_file = open(script_name, 'w', encoding='utf-8')
209    script_file.write(source)
210    script_file.close()
211    importlib.invalidate_caches()
212    return script_name
213
214def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
215    zip_filename = zip_basename+os.extsep+'zip'
216    zip_name = os.path.join(zip_dir, zip_filename)
217    zip_file = zipfile.ZipFile(zip_name, 'w')
218    if name_in_zip is None:
219        parts = script_name.split(os.sep)
220        if len(parts) >= 2 and parts[-2] == '__pycache__':
221            legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
222            name_in_zip = os.path.basename(legacy_pyc)
223            script_name = legacy_pyc
224        else:
225            name_in_zip = os.path.basename(script_name)
226    zip_file.write(script_name, name_in_zip)
227    zip_file.close()
228    #if test.support.verbose:
229    #    zip_file = zipfile.ZipFile(zip_name, 'r')
230    #    print 'Contents of %r:' % zip_name
231    #    zip_file.printdir()
232    #    zip_file.close()
233    return zip_name, os.path.join(zip_name, name_in_zip)
234
235def make_pkg(pkg_dir, init_source=''):
236    os.mkdir(pkg_dir)
237    make_script(pkg_dir, '__init__', init_source)
238
239def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
240                 source, depth=1, compiled=False):
241    unlink = []
242    init_name = make_script(zip_dir, '__init__', '')
243    unlink.append(init_name)
244    init_basename = os.path.basename(init_name)
245    script_name = make_script(zip_dir, script_basename, source)
246    unlink.append(script_name)
247    if compiled:
248        init_name = py_compile.compile(init_name, doraise=True)
249        script_name = py_compile.compile(script_name, doraise=True)
250        unlink.extend((init_name, script_name))
251    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
252    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
253    zip_filename = zip_basename+os.extsep+'zip'
254    zip_name = os.path.join(zip_dir, zip_filename)
255    zip_file = zipfile.ZipFile(zip_name, 'w')
256    for name in pkg_names:
257        init_name_in_zip = os.path.join(name, init_basename)
258        zip_file.write(init_name, init_name_in_zip)
259    zip_file.write(script_name, script_name_in_zip)
260    zip_file.close()
261    for name in unlink:
262        os.unlink(name)
263    #if test.support.verbose:
264    #    zip_file = zipfile.ZipFile(zip_name, 'r')
265    #    print 'Contents of %r:' % zip_name
266    #    zip_file.printdir()
267    #    zip_file.close()
268    return zip_name, os.path.join(zip_name, script_name_in_zip)
269