1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# 4# Copyright 2019 The ChromiumOS Authors 5# Use of this source code is governed by a BSD-style license that can be 6# found in the LICENSE file. 7 8"""Runs tests for the given input files. 9 10Tries its best to autodetect all tests based on path name without being *too* 11aggressive. 12 13In short, there's a small set of directories in which, if you make any change, 14all of the tests in those directories get run. Additionally, if you change a 15python file named foo, it'll run foo_test.py or foo_unittest.py if either of 16those exist. 17 18All tests are run in parallel. 19""" 20 21# NOTE: An alternative mentioned on the initial CL for this 22# https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/1516414 23# is pytest. It looks like that brings some complexity (and makes use outside 24# of the chroot a bit more obnoxious?), but might be worth exploring if this 25# starts to grow quite complex on its own. 26 27 28import argparse 29import collections 30import signal 31import multiprocessing.pool 32import os 33import pipes 34import subprocess 35import sys 36from typing import Tuple, Optional 37 38 39TestSpec = collections.namedtuple("TestSpec", ["directory", "command"]) 40 41# List of python scripts that are not test with relative path to 42# toolchain-utils. 43non_test_py_files = { 44 "debug_info_test/debug_info_test.py", 45} 46 47 48def _make_relative_to_toolchain_utils(toolchain_utils, path): 49 """Cleans & makes a path relative to toolchain_utils. 50 51 Raises if that path isn't under toolchain_utils. 52 """ 53 # abspath has the nice property that it removes any markers like './'. 54 as_abs = os.path.abspath(path) 55 result = os.path.relpath(as_abs, start=toolchain_utils) 56 57 if result.startswith("../"): 58 raise ValueError("Non toolchain-utils directory found: %s" % result) 59 return result 60 61 62def _filter_python_tests(test_files, toolchain_utils): 63 """Returns all files that are real python tests.""" 64 python_tests = [] 65 for test_file in test_files: 66 rel_path = _make_relative_to_toolchain_utils(toolchain_utils, test_file) 67 if rel_path not in non_test_py_files: 68 python_tests.append(_python_test_to_spec(test_file)) 69 else: 70 print("## %s ... NON_TEST_PY_FILE" % rel_path) 71 return python_tests 72 73 74def _gather_python_tests_in(rel_subdir, toolchain_utils): 75 """Returns all files that appear to be Python tests in a given directory.""" 76 subdir = os.path.join(toolchain_utils, rel_subdir) 77 test_files = ( 78 os.path.join(subdir, file_name) 79 for file_name in os.listdir(subdir) 80 if file_name.endswith("_test.py") or file_name.endswith("_unittest.py") 81 ) 82 return _filter_python_tests(test_files, toolchain_utils) 83 84 85def _run_test(test_spec: TestSpec, timeout: int) -> Tuple[Optional[int], str]: 86 """Runs a test. 87 88 Returns a tuple indicating the process' exit code, and the combined 89 stdout+stderr of the process. If the exit code is None, the process timed 90 out. 91 """ 92 # Each subprocess gets its own process group, since many of these tests 93 # spawn subprocesses for a variety of reasons. If these tests time out, we 94 # want to be able to clean up all of the children swiftly. 95 p = subprocess.Popen( 96 test_spec.command, 97 cwd=test_spec.directory, 98 stdin=subprocess.DEVNULL, 99 stdout=subprocess.PIPE, 100 stderr=subprocess.STDOUT, 101 encoding="utf-8", 102 preexec_fn=lambda: os.setpgid(0, 0), 103 ) 104 105 child_pgid = p.pid 106 try: 107 out, _ = p.communicate(timeout=timeout) 108 return p.returncode, out 109 except BaseException as e: 110 # Try to shut the processes down gracefully. 111 os.killpg(child_pgid, signal.SIGINT) 112 try: 113 # 2 seconds is arbitrary, but given that these are unittests, 114 # should be plenty of time for them to shut down. 115 p.wait(timeout=2) 116 except subprocess.TimeoutExpired: 117 os.killpg(child_pgid, signal.SIGKILL) 118 except: 119 os.killpg(child_pgid, signal.SIGKILL) 120 raise 121 122 if isinstance(e, subprocess.TimeoutExpired): 123 # We just killed the entire process group. This should complete 124 # ~immediately. If it doesn't, something is very wrong. 125 out, _ = p.communicate(timeout=5) 126 return (None, out) 127 raise 128 129 130def _python_test_to_spec(test_file): 131 """Given a .py file, convert it to a TestSpec.""" 132 # Run tests in the directory they exist in, since some of them are sensitive 133 # to that. 134 test_directory = os.path.dirname(os.path.abspath(test_file)) 135 file_name = os.path.basename(test_file) 136 137 if os.access(test_file, os.X_OK): 138 command = ["./" + file_name] 139 else: 140 # Assume the user wanted py3. 141 command = ["python3", file_name] 142 143 return TestSpec(directory=test_directory, command=command) 144 145 146def _autodetect_python_tests_for(test_file, toolchain_utils): 147 """Given a test file, detect if there may be related tests.""" 148 if not test_file.endswith(".py"): 149 return [] 150 151 test_prefixes = ("test_", "unittest_") 152 test_suffixes = ("_test.py", "_unittest.py") 153 154 test_file_name = os.path.basename(test_file) 155 test_file_is_a_test = any( 156 test_file_name.startswith(x) for x in test_prefixes 157 ) or any(test_file_name.endswith(x) for x in test_suffixes) 158 159 if test_file_is_a_test: 160 test_files = [test_file] 161 else: 162 test_file_no_suffix = test_file[:-3] 163 candidates = [test_file_no_suffix + x for x in test_suffixes] 164 165 dir_name = os.path.dirname(test_file) 166 candidates += ( 167 os.path.join(dir_name, x + test_file_name) for x in test_prefixes 168 ) 169 test_files = (x for x in candidates if os.path.exists(x)) 170 return _filter_python_tests(test_files, toolchain_utils) 171 172 173def _run_test_scripts(pool, all_tests, timeout, show_successful_output=False): 174 """Runs a list of TestSpecs. Returns whether all of them succeeded.""" 175 results = [ 176 pool.apply_async(_run_test, (test, timeout)) for test in all_tests 177 ] 178 179 failures = [] 180 for i, (test, future) in enumerate(zip(all_tests, results)): 181 # Add a bit more spacing between outputs. 182 if show_successful_output and i: 183 print("\n") 184 185 pretty_test = " ".join( 186 pipes.quote(test_arg) for test_arg in test.command 187 ) 188 pretty_directory = os.path.relpath(test.directory) 189 if pretty_directory == ".": 190 test_message = pretty_test 191 else: 192 test_message = "%s in %s/" % (pretty_test, pretty_directory) 193 194 print("## %s ... " % test_message, end="") 195 # Be sure that the users sees which test is running. 196 sys.stdout.flush() 197 198 exit_code, stdout = future.get() 199 if exit_code == 0: 200 print("PASS") 201 is_failure = False 202 else: 203 print("TIMEOUT" if exit_code is None else "FAIL") 204 failures.append(test_message) 205 is_failure = True 206 207 if show_successful_output or is_failure: 208 if stdout: 209 print("-- Stdout:\n", stdout) 210 else: 211 print("-- No stdout was produced.") 212 213 if failures: 214 word = "tests" if len(failures) > 1 else "test" 215 print(f"{len(failures)} {word} failed:") 216 for failure in failures: 217 print(f"\t{failure}") 218 219 return not failures 220 221 222def _compress_list(l): 223 """Removes consecutive duplicate elements from |l|. 224 225 >>> _compress_list([]) 226 [] 227 >>> _compress_list([1, 1]) 228 [1] 229 >>> _compress_list([1, 2, 1]) 230 [1, 2, 1] 231 """ 232 result = [] 233 for e in l: 234 if result and result[-1] == e: 235 continue 236 result.append(e) 237 return result 238 239 240def _fix_python_path(toolchain_utils): 241 pypath = os.environ.get("PYTHONPATH", "") 242 if pypath: 243 pypath = ":" + pypath 244 os.environ["PYTHONPATH"] = toolchain_utils + pypath 245 246 247def _find_forced_subdir_python_tests(test_paths, toolchain_utils): 248 assert all(os.path.isabs(path) for path in test_paths) 249 250 # Directories under toolchain_utils for which any change will cause all tests 251 # in that directory to be rerun. Includes changes in subdirectories. 252 all_dirs = { 253 "crosperf", 254 "cros_utils", 255 } 256 257 relative_paths = [ 258 _make_relative_to_toolchain_utils(toolchain_utils, path) 259 for path in test_paths 260 ] 261 262 gather_test_dirs = set() 263 264 for path in relative_paths: 265 top_level_dir = path.split("/")[0] 266 if top_level_dir in all_dirs: 267 gather_test_dirs.add(top_level_dir) 268 269 results = [] 270 for d in sorted(gather_test_dirs): 271 results += _gather_python_tests_in(d, toolchain_utils) 272 return results 273 274 275def _find_go_tests(test_paths): 276 """Returns TestSpecs for the go folders of the given files""" 277 assert all(os.path.isabs(path) for path in test_paths) 278 279 dirs_with_gofiles = set( 280 os.path.dirname(p) for p in test_paths if p.endswith(".go") 281 ) 282 command = ["go", "test", "-vet=all"] 283 # Note: We sort the directories to be deterministic. 284 return [ 285 TestSpec(directory=d, command=command) 286 for d in sorted(dirs_with_gofiles) 287 ] 288 289 290def main(argv): 291 default_toolchain_utils = os.path.abspath(os.path.dirname(__file__)) 292 293 parser = argparse.ArgumentParser(description=__doc__) 294 parser.add_argument( 295 "--show_all_output", 296 action="store_true", 297 help="show stdout of successful tests", 298 ) 299 parser.add_argument( 300 "--toolchain_utils", 301 default=default_toolchain_utils, 302 help="directory of toolchain-utils. Often auto-detected", 303 ) 304 parser.add_argument( 305 "file", nargs="*", help="a file that we should run tests for" 306 ) 307 parser.add_argument( 308 "--timeout", 309 default=120, 310 type=int, 311 help="Time to allow a test to execute before timing it out, in " 312 "seconds.", 313 ) 314 args = parser.parse_args(argv) 315 316 modified_files = [os.path.abspath(f) for f in args.file] 317 show_all_output = args.show_all_output 318 toolchain_utils = args.toolchain_utils 319 320 if not modified_files: 321 print("No files given. Exit.") 322 return 0 323 324 _fix_python_path(toolchain_utils) 325 326 tests_to_run = _find_forced_subdir_python_tests( 327 modified_files, toolchain_utils 328 ) 329 for f in modified_files: 330 tests_to_run += _autodetect_python_tests_for(f, toolchain_utils) 331 tests_to_run += _find_go_tests(modified_files) 332 333 # TestSpecs have lists, so we can't use a set. We'd likely want to keep them 334 # sorted for determinism anyway. 335 tests_to_run.sort() 336 tests_to_run = _compress_list(tests_to_run) 337 338 with multiprocessing.pool.ThreadPool() as pool: 339 success = _run_test_scripts( 340 pool, tests_to_run, args.timeout, show_all_output 341 ) 342 return 0 if success else 1 343 344 345if __name__ == "__main__": 346 sys.exit(main(sys.argv[1:])) 347