• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import subprocess
10import py_compile
11
12from importlib.util import source_from_cache
13from test import support
14from test.support.import_helper import make_legacy_pyc
15
16
17# Cached result of the expensive test performed in the function below.
18__cached_interp_requires_environment = None
19
20
21def interpreter_requires_environment():
22    """
23    Returns True if our sys.executable interpreter requires environment
24    variables in order to be able to run at all.
25
26    This is designed to be used with @unittest.skipIf() to annotate tests
27    that need to use an assert_python*() function to launch an isolated
28    mode (-I) or no environment mode (-E) sub-interpreter process.
29
30    A normal build & test does not run into this situation but it can happen
31    when trying to run the standard library test suite from an interpreter that
32    doesn't have an obvious home with Python's current home finding logic.
33
34    Setting PYTHONHOME is one way to get most of the testsuite to run in that
35    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
36    variables that might impact whether or not the interpreter can start.
37    """
38    global __cached_interp_requires_environment
39    if __cached_interp_requires_environment is None:
40        # If PYTHONHOME is set, assume that we need it
41        if 'PYTHONHOME' in os.environ:
42            __cached_interp_requires_environment = True
43            return True
44        # cannot run subprocess, assume we don't need it
45        if not support.has_subprocess_support:
46            __cached_interp_requires_environment = False
47            return False
48
49        # Try running an interpreter with -E to see if it works or not.
50        try:
51            subprocess.check_call([sys.executable, '-E',
52                                   '-c', 'import sys; sys.exit(0)'])
53        except subprocess.CalledProcessError:
54            __cached_interp_requires_environment = True
55        else:
56            __cached_interp_requires_environment = False
57
58    return __cached_interp_requires_environment
59
60
61class _PythonRunResult(collections.namedtuple("_PythonRunResult",
62                                          ("rc", "out", "err"))):
63    """Helper for reporting Python subprocess run results"""
64    def fail(self, cmd_line):
65        """Provide helpful details about failed subcommand runs"""
66        # Limit to 300 lines of ASCII characters
67        maxlen = 300 * 100
68        out, err = self.out, self.err
69        if len(out) > maxlen:
70            out = b'(... truncated stdout ...)' + out[-maxlen:]
71        if len(err) > maxlen:
72            err = b'(... truncated stderr ...)' + err[-maxlen:]
73        out = out.decode('ascii', 'replace').rstrip()
74        err = err.decode('ascii', 'replace').rstrip()
75        raise AssertionError("Process return code is %d\n"
76                             "command line: %r\n"
77                             "\n"
78                             "stdout:\n"
79                             "---\n"
80                             "%s\n"
81                             "---\n"
82                             "\n"
83                             "stderr:\n"
84                             "---\n"
85                             "%s\n"
86                             "---"
87                             % (self.rc, cmd_line,
88                                out,
89                                err))
90
91
92# Executing the interpreter in a subprocess
93@support.requires_subprocess()
94def run_python_until_end(*args, **env_vars):
95    """Used to implement assert_python_*.
96
97    *args are the command line flags to pass to the python interpreter.
98    **env_vars keyword arguments are environment variables to set on the process.
99
100    If __run_using_command= is supplied, it must be a list of
101    command line arguments to prepend to the command line used.
102    Useful when you want to run another command that should launch the
103    python interpreter via its own arguments. ["/bin/echo", "--"] for
104    example could print the unquoted python command line instead of
105    run it.
106    """
107    env_required = interpreter_requires_environment()
108    run_using_command = env_vars.pop('__run_using_command', None)
109    cwd = env_vars.pop('__cwd', None)
110    if '__isolated' in env_vars:
111        isolated = env_vars.pop('__isolated')
112    else:
113        isolated = not env_vars and not env_required
114    cmd_line = [sys.executable, '-X', 'faulthandler']
115    if run_using_command:
116        cmd_line = run_using_command + cmd_line
117    if isolated:
118        # isolated mode: ignore Python environment variables, ignore user
119        # site-packages, and don't add the current directory to sys.path
120        cmd_line.append('-I')
121    elif not env_vars and not env_required:
122        # ignore Python environment variables
123        cmd_line.append('-E')
124
125    # But a special flag that can be set to override -- in this case, the
126    # caller is responsible to pass the full environment.
127    if env_vars.pop('__cleanenv', None):
128        env = {}
129        if sys.platform == 'win32':
130            # Windows requires at least the SYSTEMROOT environment variable to
131            # start Python.
132            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
133
134        # Other interesting environment variables, not copied currently:
135        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
136    else:
137        # Need to preserve the original environment, for in-place testing of
138        # shared library builds.
139        env = os.environ.copy()
140
141    # set TERM='' unless the TERM environment variable is passed explicitly
142    # see issues #11390 and #18300
143    if 'TERM' not in env_vars:
144        env['TERM'] = ''
145
146    env.update(env_vars)
147    cmd_line.extend(args)
148    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
149                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
150                         env=env, cwd=cwd)
151    with proc:
152        try:
153            out, err = proc.communicate()
154        finally:
155            proc.kill()
156            subprocess._cleanup()
157    rc = proc.returncode
158    return _PythonRunResult(rc, out, err), cmd_line
159
160
161@support.requires_subprocess()
162def _assert_python(expected_success, /, *args, **env_vars):
163    res, cmd_line = run_python_until_end(*args, **env_vars)
164    if (res.rc and expected_success) or (not res.rc and not expected_success):
165        res.fail(cmd_line)
166    return res
167
168
169def assert_python_ok(*args, **env_vars):
170    """
171    Assert that running the interpreter with `args` and optional environment
172    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
173    stderr) tuple.
174
175    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
176
177    Python is started in isolated mode (command line option -I),
178    except if the __isolated keyword is set to False.
179    """
180    return _assert_python(True, *args, **env_vars)
181
182
183def assert_python_failure(*args, **env_vars):
184    """
185    Assert that running the interpreter with `args` and optional environment
186    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
187    stderr) tuple.
188
189    See assert_python_ok() for more options.
190    """
191    return _assert_python(False, *args, **env_vars)
192
193
194@support.requires_subprocess()
195def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
196    """Run a Python subprocess with the given arguments.
197
198    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
199    object.
200    """
201    cmd_line = [sys.executable]
202    if not interpreter_requires_environment():
203        cmd_line.append('-E')
204    cmd_line.extend(args)
205    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
206    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
207    # that.  References:
208    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
209    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
210    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
211    env = kw.setdefault('env', dict(os.environ))
212    env['TERM'] = 'vt100'
213    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
214                            stdout=stdout, stderr=stderr,
215                            **kw)
216
217
218def kill_python(p):
219    """Run the given Popen process until completion and return stdout."""
220    p.stdin.close()
221    data = p.stdout.read()
222    p.stdout.close()
223    # try to cleanup the child so we don't appear to leak when running
224    # with regrtest -R.
225    p.wait()
226    subprocess._cleanup()
227    return data
228
229
230def make_script(script_dir, script_basename, source, omit_suffix=False):
231    script_filename = script_basename
232    if not omit_suffix:
233        script_filename += os.extsep + 'py'
234    script_name = os.path.join(script_dir, script_filename)
235    if isinstance(source, str):
236        # The script should be encoded to UTF-8, the default string encoding
237        with open(script_name, 'w', encoding='utf-8') as script_file:
238            script_file.write(source)
239    else:
240        with open(script_name, 'wb') as script_file:
241            script_file.write(source)
242    importlib.invalidate_caches()
243    return script_name
244
245
246def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
247    import zipfile
248    zip_filename = zip_basename+os.extsep+'zip'
249    zip_name = os.path.join(zip_dir, zip_filename)
250    with zipfile.ZipFile(zip_name, 'w') as zip_file:
251        if name_in_zip is None:
252            parts = script_name.split(os.sep)
253            if len(parts) >= 2 and parts[-2] == '__pycache__':
254                legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
255                name_in_zip = os.path.basename(legacy_pyc)
256                script_name = legacy_pyc
257            else:
258                name_in_zip = os.path.basename(script_name)
259        zip_file.write(script_name, name_in_zip)
260    #if test.support.verbose:
261    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
262    #        print 'Contents of %r:' % zip_name
263    #        zip_file.printdir()
264    return zip_name, os.path.join(zip_name, name_in_zip)
265
266
267def make_pkg(pkg_dir, init_source=''):
268    os.mkdir(pkg_dir)
269    make_script(pkg_dir, '__init__', init_source)
270
271
272def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
273                 source, depth=1, compiled=False):
274    import zipfile
275    unlink = []
276    init_name = make_script(zip_dir, '__init__', '')
277    unlink.append(init_name)
278    init_basename = os.path.basename(init_name)
279    script_name = make_script(zip_dir, script_basename, source)
280    unlink.append(script_name)
281    if compiled:
282        init_name = py_compile.compile(init_name, doraise=True)
283        script_name = py_compile.compile(script_name, doraise=True)
284        unlink.extend((init_name, script_name))
285    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
286    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
287    zip_filename = zip_basename+os.extsep+'zip'
288    zip_name = os.path.join(zip_dir, zip_filename)
289    with zipfile.ZipFile(zip_name, 'w') as zip_file:
290        for name in pkg_names:
291            init_name_in_zip = os.path.join(name, init_basename)
292            zip_file.write(init_name, init_name_in_zip)
293        zip_file.write(script_name, script_name_in_zip)
294    for name in unlink:
295        os.unlink(name)
296    #if test.support.verbose:
297    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
298    #        print 'Contents of %r:' % zip_name
299    #        zip_file.printdir()
300    return zip_name, os.path.join(zip_name, script_name_in_zip)
301
302
303@support.requires_subprocess()
304def run_test_script(script):
305    # use -u to try to get the full output if the test hangs or crash
306    if support.verbose:
307        def title(text):
308            return f"===== {text} ======"
309
310        name = f"script {os.path.basename(script)}"
311        print()
312        print(title(name), flush=True)
313        # In verbose mode, the child process inherit stdout and stdout,
314        # to see output in realtime and reduce the risk of losing output.
315        args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
316        proc = subprocess.run(args)
317        print(title(f"{name} completed: exit code {proc.returncode}"),
318              flush=True)
319        if proc.returncode:
320            raise AssertionError(f"{name} failed")
321    else:
322        assert_python_ok("-u", script, "-v")
323