1# Common utility functions used by various script execution tests 2# e.g. test_cmd_line, test_cmd_line_script and test_runpy 3 4import collections 5import importlib 6import sys 7import os 8import os.path 9import subprocess 10import py_compile 11import zipfile 12 13from importlib.util import source_from_cache 14from test import support 15from test.support.import_helper import make_legacy_pyc 16 17 18# Cached result of the expensive test performed in the function below. 19__cached_interp_requires_environment = None 20 21 22def interpreter_requires_environment(): 23 """ 24 Returns True if our sys.executable interpreter requires environment 25 variables in order to be able to run at all. 26 27 This is designed to be used with @unittest.skipIf() to annotate tests 28 that need to use an assert_python*() function to launch an isolated 29 mode (-I) or no environment mode (-E) sub-interpreter process. 30 31 A normal build & test does not run into this situation but it can happen 32 when trying to run the standard library test suite from an interpreter that 33 doesn't have an obvious home with Python's current home finding logic. 34 35 Setting PYTHONHOME is one way to get most of the testsuite to run in that 36 situation. PYTHONPATH or PYTHONUSERSITE are other common environment 37 variables that might impact whether or not the interpreter can start. 38 """ 39 global __cached_interp_requires_environment 40 if __cached_interp_requires_environment is None: 41 # If PYTHONHOME is set, assume that we need it 42 if 'PYTHONHOME' in os.environ: 43 __cached_interp_requires_environment = True 44 return True 45 46 # Try running an interpreter with -E to see if it works or not. 47 try: 48 subprocess.check_call([sys.executable, '-E', 49 '-c', 'import sys; sys.exit(0)']) 50 except subprocess.CalledProcessError: 51 __cached_interp_requires_environment = True 52 else: 53 __cached_interp_requires_environment = False 54 55 return __cached_interp_requires_environment 56 57 58class _PythonRunResult(collections.namedtuple("_PythonRunResult", 59 ("rc", "out", "err"))): 60 """Helper for reporting Python subprocess run results""" 61 def fail(self, cmd_line): 62 """Provide helpful details about failed subcommand runs""" 63 # Limit to 80 lines to ASCII characters 64 maxlen = 80 * 100 65 out, err = self.out, self.err 66 if len(out) > maxlen: 67 out = b'(... truncated stdout ...)' + out[-maxlen:] 68 if len(err) > maxlen: 69 err = b'(... truncated stderr ...)' + err[-maxlen:] 70 out = out.decode('ascii', 'replace').rstrip() 71 err = err.decode('ascii', 'replace').rstrip() 72 raise AssertionError("Process return code is %d\n" 73 "command line: %r\n" 74 "\n" 75 "stdout:\n" 76 "---\n" 77 "%s\n" 78 "---\n" 79 "\n" 80 "stderr:\n" 81 "---\n" 82 "%s\n" 83 "---" 84 % (self.rc, cmd_line, 85 out, 86 err)) 87 88 89# Executing the interpreter in a subprocess 90def run_python_until_end(*args, **env_vars): 91 env_required = interpreter_requires_environment() 92 cwd = env_vars.pop('__cwd', None) 93 if '__isolated' in env_vars: 94 isolated = env_vars.pop('__isolated') 95 else: 96 isolated = not env_vars and not env_required 97 cmd_line = [sys.executable, '-X', 'faulthandler'] 98 if isolated: 99 # isolated mode: ignore Python environment variables, ignore user 100 # site-packages, and don't add the current directory to sys.path 101 cmd_line.append('-I') 102 elif not env_vars and not env_required: 103 # ignore Python environment variables 104 cmd_line.append('-E') 105 106 # But a special flag that can be set to override -- in this case, the 107 # caller is responsible to pass the full environment. 108 if env_vars.pop('__cleanenv', None): 109 env = {} 110 if sys.platform == 'win32': 111 # Windows requires at least the SYSTEMROOT environment variable to 112 # start Python. 113 env['SYSTEMROOT'] = os.environ['SYSTEMROOT'] 114 115 # Other interesting environment variables, not copied currently: 116 # COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP. 117 else: 118 # Need to preserve the original environment, for in-place testing of 119 # shared library builds. 120 env = os.environ.copy() 121 122 # set TERM='' unless the TERM environment variable is passed explicitly 123 # see issues #11390 and #18300 124 if 'TERM' not in env_vars: 125 env['TERM'] = '' 126 127 env.update(env_vars) 128 cmd_line.extend(args) 129 proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 130 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 131 env=env, cwd=cwd) 132 with proc: 133 try: 134 out, err = proc.communicate() 135 finally: 136 proc.kill() 137 subprocess._cleanup() 138 rc = proc.returncode 139 return _PythonRunResult(rc, out, err), cmd_line 140 141 142def _assert_python(expected_success, /, *args, **env_vars): 143 res, cmd_line = run_python_until_end(*args, **env_vars) 144 if (res.rc and expected_success) or (not res.rc and not expected_success): 145 res.fail(cmd_line) 146 return res 147 148 149def assert_python_ok(*args, **env_vars): 150 """ 151 Assert that running the interpreter with `args` and optional environment 152 variables `env_vars` succeeds (rc == 0) and return a (return code, stdout, 153 stderr) tuple. 154 155 If the __cleanenv keyword is set, env_vars is used as a fresh environment. 156 157 Python is started in isolated mode (command line option -I), 158 except if the __isolated keyword is set to False. 159 """ 160 return _assert_python(True, *args, **env_vars) 161 162 163def assert_python_failure(*args, **env_vars): 164 """ 165 Assert that running the interpreter with `args` and optional environment 166 variables `env_vars` fails (rc != 0) and return a (return code, stdout, 167 stderr) tuple. 168 169 See assert_python_ok() for more options. 170 """ 171 return _assert_python(False, *args, **env_vars) 172 173 174def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw): 175 """Run a Python subprocess with the given arguments. 176 177 kw is extra keyword args to pass to subprocess.Popen. Returns a Popen 178 object. 179 """ 180 cmd_line = [sys.executable] 181 if not interpreter_requires_environment(): 182 cmd_line.append('-E') 183 cmd_line.extend(args) 184 # Under Fedora (?), GNU readline can output junk on stderr when initialized, 185 # depending on the TERM setting. Setting TERM=vt100 is supposed to disable 186 # that. References: 187 # - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html 188 # - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import 189 # - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html 190 env = kw.setdefault('env', dict(os.environ)) 191 env['TERM'] = 'vt100' 192 return subprocess.Popen(cmd_line, stdin=subprocess.PIPE, 193 stdout=stdout, stderr=stderr, 194 **kw) 195 196 197def kill_python(p): 198 """Run the given Popen process until completion and return stdout.""" 199 p.stdin.close() 200 data = p.stdout.read() 201 p.stdout.close() 202 # try to cleanup the child so we don't appear to leak when running 203 # with regrtest -R. 204 p.wait() 205 subprocess._cleanup() 206 return data 207 208 209def make_script(script_dir, script_basename, source, omit_suffix=False): 210 script_filename = script_basename 211 if not omit_suffix: 212 script_filename += os.extsep + 'py' 213 script_name = os.path.join(script_dir, script_filename) 214 # The script should be encoded to UTF-8, the default string encoding 215 with open(script_name, 'w', encoding='utf-8') as script_file: 216 script_file.write(source) 217 importlib.invalidate_caches() 218 return script_name 219 220 221def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None): 222 zip_filename = zip_basename+os.extsep+'zip' 223 zip_name = os.path.join(zip_dir, zip_filename) 224 with zipfile.ZipFile(zip_name, 'w') as zip_file: 225 if name_in_zip is None: 226 parts = script_name.split(os.sep) 227 if len(parts) >= 2 and parts[-2] == '__pycache__': 228 legacy_pyc = make_legacy_pyc(source_from_cache(script_name)) 229 name_in_zip = os.path.basename(legacy_pyc) 230 script_name = legacy_pyc 231 else: 232 name_in_zip = os.path.basename(script_name) 233 zip_file.write(script_name, name_in_zip) 234 #if test.support.verbose: 235 # with zipfile.ZipFile(zip_name, 'r') as zip_file: 236 # print 'Contents of %r:' % zip_name 237 # zip_file.printdir() 238 return zip_name, os.path.join(zip_name, name_in_zip) 239 240 241def make_pkg(pkg_dir, init_source=''): 242 os.mkdir(pkg_dir) 243 make_script(pkg_dir, '__init__', init_source) 244 245 246def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename, 247 source, depth=1, compiled=False): 248 unlink = [] 249 init_name = make_script(zip_dir, '__init__', '') 250 unlink.append(init_name) 251 init_basename = os.path.basename(init_name) 252 script_name = make_script(zip_dir, script_basename, source) 253 unlink.append(script_name) 254 if compiled: 255 init_name = py_compile.compile(init_name, doraise=True) 256 script_name = py_compile.compile(script_name, doraise=True) 257 unlink.extend((init_name, script_name)) 258 pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)] 259 script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name)) 260 zip_filename = zip_basename+os.extsep+'zip' 261 zip_name = os.path.join(zip_dir, zip_filename) 262 with zipfile.ZipFile(zip_name, 'w') as zip_file: 263 for name in pkg_names: 264 init_name_in_zip = os.path.join(name, init_basename) 265 zip_file.write(init_name, init_name_in_zip) 266 zip_file.write(script_name, script_name_in_zip) 267 for name in unlink: 268 os.unlink(name) 269 #if test.support.verbose: 270 # with zipfile.ZipFile(zip_name, 'r') as zip_file: 271 # print 'Contents of %r:' % zip_name 272 # zip_file.printdir() 273 return zip_name, os.path.join(zip_name, script_name_in_zip) 274 275 276def run_test_script(script): 277 # use -u to try to get the full output if the test hangs or crash 278 if support.verbose: 279 def title(text): 280 return f"===== {text} ======" 281 282 name = f"script {os.path.basename(script)}" 283 print() 284 print(title(name), flush=True) 285 # In verbose mode, the child process inherit stdout and stdout, 286 # to see output in realtime and reduce the risk of losing output. 287 args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"] 288 proc = subprocess.run(args) 289 print(title(f"{name} completed: exit code {proc.returncode}"), 290 flush=True) 291 if proc.returncode: 292 raise AssertionError(f"{name} failed") 293 else: 294 assert_python_ok("-u", script, "-v") 295