• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Common utility functions used by various script execution tests
2#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
3
4import collections
5import importlib
6import sys
7import os
8import os.path
9import subprocess
10import py_compile
11import zipfile
12
13from importlib.util import source_from_cache
14from test.support import make_legacy_pyc
15
16
17# Cached result of the expensive test performed in the function below.
18__cached_interp_requires_environment = None
19
20def interpreter_requires_environment():
21    """
22    Returns True if our sys.executable interpreter requires environment
23    variables in order to be able to run at all.
24
25    This is designed to be used with @unittest.skipIf() to annotate tests
26    that need to use an assert_python*() function to launch an isolated
27    mode (-I) or no environment mode (-E) sub-interpreter process.
28
29    A normal build & test does not run into this situation but it can happen
30    when trying to run the standard library test suite from an interpreter that
31    doesn't have an obvious home with Python's current home finding logic.
32
33    Setting PYTHONHOME is one way to get most of the testsuite to run in that
34    situation.  PYTHONPATH or PYTHONUSERSITE are other common environment
35    variables that might impact whether or not the interpreter can start.
36    """
37    global __cached_interp_requires_environment
38    if __cached_interp_requires_environment is None:
39        # If PYTHONHOME is set, assume that we need it
40        if 'PYTHONHOME' in os.environ:
41            __cached_interp_requires_environment = True
42            return True
43
44        # Try running an interpreter with -E to see if it works or not.
45        try:
46            subprocess.check_call([sys.executable, '-E',
47                                   '-c', 'import sys; sys.exit(0)'])
48        except subprocess.CalledProcessError:
49            __cached_interp_requires_environment = True
50        else:
51            __cached_interp_requires_environment = False
52
53    return __cached_interp_requires_environment
54
55
56class _PythonRunResult(collections.namedtuple("_PythonRunResult",
57                                          ("rc", "out", "err"))):
58    """Helper for reporting Python subprocess run results"""
59    def fail(self, cmd_line):
60        """Provide helpful details about failed subcommand runs"""
61        # Limit to 80 lines to ASCII characters
62        maxlen = 80 * 100
63        out, err = self.out, self.err
64        if len(out) > maxlen:
65            out = b'(... truncated stdout ...)' + out[-maxlen:]
66        if len(err) > maxlen:
67            err = b'(... truncated stderr ...)' + err[-maxlen:]
68        out = out.decode('ascii', 'replace').rstrip()
69        err = err.decode('ascii', 'replace').rstrip()
70        raise AssertionError("Process return code is %d\n"
71                             "command line: %r\n"
72                             "\n"
73                             "stdout:\n"
74                             "---\n"
75                             "%s\n"
76                             "---\n"
77                             "\n"
78                             "stderr:\n"
79                             "---\n"
80                             "%s\n"
81                             "---"
82                             % (self.rc, cmd_line,
83                                out,
84                                err))
85
86
87# Executing the interpreter in a subprocess
88def run_python_until_end(*args, **env_vars):
89    env_required = interpreter_requires_environment()
90    cwd = env_vars.pop('__cwd', None)
91    if '__isolated' in env_vars:
92        isolated = env_vars.pop('__isolated')
93    else:
94        isolated = not env_vars and not env_required
95    cmd_line = [sys.executable, '-X', 'faulthandler']
96    if isolated:
97        # isolated mode: ignore Python environment variables, ignore user
98        # site-packages, and don't add the current directory to sys.path
99        cmd_line.append('-I')
100    elif not env_vars and not env_required:
101        # ignore Python environment variables
102        cmd_line.append('-E')
103
104    # But a special flag that can be set to override -- in this case, the
105    # caller is responsible to pass the full environment.
106    if env_vars.pop('__cleanenv', None):
107        env = {}
108        if sys.platform == 'win32':
109            # Windows requires at least the SYSTEMROOT environment variable to
110            # start Python.
111            env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
112
113        # Other interesting environment variables, not copied currently:
114        # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
115    else:
116        # Need to preserve the original environment, for in-place testing of
117        # shared library builds.
118        env = os.environ.copy()
119
120    # set TERM='' unless the TERM environment variable is passed explicitly
121    # see issues #11390 and #18300
122    if 'TERM' not in env_vars:
123        env['TERM'] = ''
124
125    env.update(env_vars)
126    cmd_line.extend(args)
127    proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
128                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
129                         env=env, cwd=cwd)
130    with proc:
131        try:
132            out, err = proc.communicate()
133        finally:
134            proc.kill()
135            subprocess._cleanup()
136    rc = proc.returncode
137    return _PythonRunResult(rc, out, err), cmd_line
138
139def _assert_python(expected_success, /, *args, **env_vars):
140    res, cmd_line = run_python_until_end(*args, **env_vars)
141    if (res.rc and expected_success) or (not res.rc and not expected_success):
142        res.fail(cmd_line)
143    return res
144
145def assert_python_ok(*args, **env_vars):
146    """
147    Assert that running the interpreter with `args` and optional environment
148    variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
149    stderr) tuple.
150
151    If the __cleanenv keyword is set, env_vars is used as a fresh environment.
152
153    Python is started in isolated mode (command line option -I),
154    except if the __isolated keyword is set to False.
155    """
156    return _assert_python(True, *args, **env_vars)
157
158def assert_python_failure(*args, **env_vars):
159    """
160    Assert that running the interpreter with `args` and optional environment
161    variables `env_vars` fails (rc != 0) and return a (return code, stdout,
162    stderr) tuple.
163
164    See assert_python_ok() for more options.
165    """
166    return _assert_python(False, *args, **env_vars)
167
168def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
169    """Run a Python subprocess with the given arguments.
170
171    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
172    object.
173    """
174    cmd_line = [sys.executable]
175    if not interpreter_requires_environment():
176        cmd_line.append('-E')
177    cmd_line.extend(args)
178    # Under Fedora (?), GNU readline can output junk on stderr when initialized,
179    # depending on the TERM setting.  Setting TERM=vt100 is supposed to disable
180    # that.  References:
181    # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
182    # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
183    # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
184    env = kw.setdefault('env', dict(os.environ))
185    env['TERM'] = 'vt100'
186    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
187                            stdout=stdout, stderr=stderr,
188                            **kw)
189
190def kill_python(p):
191    """Run the given Popen process until completion and return stdout."""
192    p.stdin.close()
193    data = p.stdout.read()
194    p.stdout.close()
195    # try to cleanup the child so we don't appear to leak when running
196    # with regrtest -R.
197    p.wait()
198    subprocess._cleanup()
199    return data
200
201def make_script(script_dir, script_basename, source, omit_suffix=False):
202    script_filename = script_basename
203    if not omit_suffix:
204        script_filename += os.extsep + 'py'
205    script_name = os.path.join(script_dir, script_filename)
206    # The script should be encoded to UTF-8, the default string encoding
207    with open(script_name, 'w', encoding='utf-8') as script_file:
208        script_file.write(source)
209    importlib.invalidate_caches()
210    return script_name
211
212def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
213    zip_filename = zip_basename+os.extsep+'zip'
214    zip_name = os.path.join(zip_dir, zip_filename)
215    with zipfile.ZipFile(zip_name, 'w') as zip_file:
216        if name_in_zip is None:
217            parts = script_name.split(os.sep)
218            if len(parts) >= 2 and parts[-2] == '__pycache__':
219                legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
220                name_in_zip = os.path.basename(legacy_pyc)
221                script_name = legacy_pyc
222            else:
223                name_in_zip = os.path.basename(script_name)
224        zip_file.write(script_name, name_in_zip)
225    #if test.support.verbose:
226    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
227    #        print 'Contents of %r:' % zip_name
228    #        zip_file.printdir()
229    return zip_name, os.path.join(zip_name, name_in_zip)
230
231def make_pkg(pkg_dir, init_source=''):
232    os.mkdir(pkg_dir)
233    make_script(pkg_dir, '__init__', init_source)
234
235def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
236                 source, depth=1, compiled=False):
237    unlink = []
238    init_name = make_script(zip_dir, '__init__', '')
239    unlink.append(init_name)
240    init_basename = os.path.basename(init_name)
241    script_name = make_script(zip_dir, script_basename, source)
242    unlink.append(script_name)
243    if compiled:
244        init_name = py_compile.compile(init_name, doraise=True)
245        script_name = py_compile.compile(script_name, doraise=True)
246        unlink.extend((init_name, script_name))
247    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
248    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
249    zip_filename = zip_basename+os.extsep+'zip'
250    zip_name = os.path.join(zip_dir, zip_filename)
251    with zipfile.ZipFile(zip_name, 'w') as zip_file:
252        for name in pkg_names:
253            init_name_in_zip = os.path.join(name, init_basename)
254            zip_file.write(init_name, init_name_in_zip)
255        zip_file.write(script_name, script_name_in_zip)
256    for name in unlink:
257        os.unlink(name)
258    #if test.support.verbose:
259    #    with zipfile.ZipFile(zip_name, 'r') as zip_file:
260    #        print 'Contents of %r:' % zip_name
261    #        zip_file.printdir()
262    return zip_name, os.path.join(zip_name, script_name_in_zip)
263