• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright 2019 The ChromiumOS Authors
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Runs tests for the given input files.
9
10Tries its best to autodetect all tests based on path name without being *too*
11aggressive.
12
13In short, there's a small set of directories in which, if you make any change,
14all of the tests in those directories get run. Additionally, if you change a
15python file named foo, it'll run foo_test.py or foo_unittest.py if either of
16those exist.
17
18All tests are run in parallel.
19"""
20
21# NOTE: An alternative mentioned on the initial CL for this
22# https://chromium-review.googlesource.com/c/chromiumos/third_party/toolchain-utils/+/1516414
23# is pytest. It looks like that brings some complexity (and makes use outside
24# of the chroot a bit more obnoxious?), but might be worth exploring if this
25# starts to grow quite complex on its own.
26
27
28import argparse
29import collections
30import signal
31import multiprocessing.pool
32import os
33import pipes
34import subprocess
35import sys
36from typing import Tuple, Optional
37
38
39TestSpec = collections.namedtuple("TestSpec", ["directory", "command"])
40
41# List of python scripts that are not test with relative path to
42# toolchain-utils.
43non_test_py_files = {
44    "debug_info_test/debug_info_test.py",
45}
46
47
48def _make_relative_to_toolchain_utils(toolchain_utils, path):
49    """Cleans & makes a path relative to toolchain_utils.
50
51    Raises if that path isn't under toolchain_utils.
52    """
53    # abspath has the nice property that it removes any markers like './'.
54    as_abs = os.path.abspath(path)
55    result = os.path.relpath(as_abs, start=toolchain_utils)
56
57    if result.startswith("../"):
58        raise ValueError("Non toolchain-utils directory found: %s" % result)
59    return result
60
61
62def _filter_python_tests(test_files, toolchain_utils):
63    """Returns all files that are real python tests."""
64    python_tests = []
65    for test_file in test_files:
66        rel_path = _make_relative_to_toolchain_utils(toolchain_utils, test_file)
67        if rel_path not in non_test_py_files:
68            python_tests.append(_python_test_to_spec(test_file))
69        else:
70            print("## %s ... NON_TEST_PY_FILE" % rel_path)
71    return python_tests
72
73
74def _gather_python_tests_in(rel_subdir, toolchain_utils):
75    """Returns all files that appear to be Python tests in a given directory."""
76    subdir = os.path.join(toolchain_utils, rel_subdir)
77    test_files = (
78        os.path.join(subdir, file_name)
79        for file_name in os.listdir(subdir)
80        if file_name.endswith("_test.py") or file_name.endswith("_unittest.py")
81    )
82    return _filter_python_tests(test_files, toolchain_utils)
83
84
85def _run_test(test_spec: TestSpec, timeout: int) -> Tuple[Optional[int], str]:
86    """Runs a test.
87
88    Returns a tuple indicating the process' exit code, and the combined
89    stdout+stderr of the process. If the exit code is None, the process timed
90    out.
91    """
92    # Each subprocess gets its own process group, since many of these tests
93    # spawn subprocesses for a variety of reasons. If these tests time out, we
94    # want to be able to clean up all of the children swiftly.
95    p = subprocess.Popen(
96        test_spec.command,
97        cwd=test_spec.directory,
98        stdin=subprocess.DEVNULL,
99        stdout=subprocess.PIPE,
100        stderr=subprocess.STDOUT,
101        encoding="utf-8",
102        preexec_fn=lambda: os.setpgid(0, 0),
103    )
104
105    child_pgid = p.pid
106    try:
107        out, _ = p.communicate(timeout=timeout)
108        return p.returncode, out
109    except BaseException as e:
110        # Try to shut the processes down gracefully.
111        os.killpg(child_pgid, signal.SIGINT)
112        try:
113            # 2 seconds is arbitrary, but given that these are unittests,
114            # should be plenty of time for them to shut down.
115            p.wait(timeout=2)
116        except subprocess.TimeoutExpired:
117            os.killpg(child_pgid, signal.SIGKILL)
118        except:
119            os.killpg(child_pgid, signal.SIGKILL)
120            raise
121
122        if isinstance(e, subprocess.TimeoutExpired):
123            # We just killed the entire process group. This should complete
124            # ~immediately. If it doesn't, something is very wrong.
125            out, _ = p.communicate(timeout=5)
126            return (None, out)
127        raise
128
129
130def _python_test_to_spec(test_file):
131    """Given a .py file, convert it to a TestSpec."""
132    # Run tests in the directory they exist in, since some of them are sensitive
133    # to that.
134    test_directory = os.path.dirname(os.path.abspath(test_file))
135    file_name = os.path.basename(test_file)
136
137    if os.access(test_file, os.X_OK):
138        command = ["./" + file_name]
139    else:
140        # Assume the user wanted py3.
141        command = ["python3", file_name]
142
143    return TestSpec(directory=test_directory, command=command)
144
145
146def _autodetect_python_tests_for(test_file, toolchain_utils):
147    """Given a test file, detect if there may be related tests."""
148    if not test_file.endswith(".py"):
149        return []
150
151    test_prefixes = ("test_", "unittest_")
152    test_suffixes = ("_test.py", "_unittest.py")
153
154    test_file_name = os.path.basename(test_file)
155    test_file_is_a_test = any(
156        test_file_name.startswith(x) for x in test_prefixes
157    ) or any(test_file_name.endswith(x) for x in test_suffixes)
158
159    if test_file_is_a_test:
160        test_files = [test_file]
161    else:
162        test_file_no_suffix = test_file[:-3]
163        candidates = [test_file_no_suffix + x for x in test_suffixes]
164
165        dir_name = os.path.dirname(test_file)
166        candidates += (
167            os.path.join(dir_name, x + test_file_name) for x in test_prefixes
168        )
169        test_files = (x for x in candidates if os.path.exists(x))
170    return _filter_python_tests(test_files, toolchain_utils)
171
172
173def _run_test_scripts(pool, all_tests, timeout, show_successful_output=False):
174    """Runs a list of TestSpecs. Returns whether all of them succeeded."""
175    results = [
176        pool.apply_async(_run_test, (test, timeout)) for test in all_tests
177    ]
178
179    failures = []
180    for i, (test, future) in enumerate(zip(all_tests, results)):
181        # Add a bit more spacing between outputs.
182        if show_successful_output and i:
183            print("\n")
184
185        pretty_test = " ".join(
186            pipes.quote(test_arg) for test_arg in test.command
187        )
188        pretty_directory = os.path.relpath(test.directory)
189        if pretty_directory == ".":
190            test_message = pretty_test
191        else:
192            test_message = "%s in %s/" % (pretty_test, pretty_directory)
193
194        print("## %s ... " % test_message, end="")
195        # Be sure that the users sees which test is running.
196        sys.stdout.flush()
197
198        exit_code, stdout = future.get()
199        if exit_code == 0:
200            print("PASS")
201            is_failure = False
202        else:
203            print("TIMEOUT" if exit_code is None else "FAIL")
204            failures.append(test_message)
205            is_failure = True
206
207        if show_successful_output or is_failure:
208            if stdout:
209                print("-- Stdout:\n", stdout)
210            else:
211                print("-- No stdout was produced.")
212
213    if failures:
214        word = "tests" if len(failures) > 1 else "test"
215        print(f"{len(failures)} {word} failed:")
216        for failure in failures:
217            print(f"\t{failure}")
218
219    return not failures
220
221
222def _compress_list(l):
223    """Removes consecutive duplicate elements from |l|.
224
225    >>> _compress_list([])
226    []
227    >>> _compress_list([1, 1])
228    [1]
229    >>> _compress_list([1, 2, 1])
230    [1, 2, 1]
231    """
232    result = []
233    for e in l:
234        if result and result[-1] == e:
235            continue
236        result.append(e)
237    return result
238
239
240def _fix_python_path(toolchain_utils):
241    pypath = os.environ.get("PYTHONPATH", "")
242    if pypath:
243        pypath = ":" + pypath
244    os.environ["PYTHONPATH"] = toolchain_utils + pypath
245
246
247def _find_forced_subdir_python_tests(test_paths, toolchain_utils):
248    assert all(os.path.isabs(path) for path in test_paths)
249
250    # Directories under toolchain_utils for which any change will cause all tests
251    # in that directory to be rerun. Includes changes in subdirectories.
252    all_dirs = {
253        "crosperf",
254        "cros_utils",
255    }
256
257    relative_paths = [
258        _make_relative_to_toolchain_utils(toolchain_utils, path)
259        for path in test_paths
260    ]
261
262    gather_test_dirs = set()
263
264    for path in relative_paths:
265        top_level_dir = path.split("/")[0]
266        if top_level_dir in all_dirs:
267            gather_test_dirs.add(top_level_dir)
268
269    results = []
270    for d in sorted(gather_test_dirs):
271        results += _gather_python_tests_in(d, toolchain_utils)
272    return results
273
274
275def _find_go_tests(test_paths):
276    """Returns TestSpecs for the go folders of the given files"""
277    assert all(os.path.isabs(path) for path in test_paths)
278
279    dirs_with_gofiles = set(
280        os.path.dirname(p) for p in test_paths if p.endswith(".go")
281    )
282    command = ["go", "test", "-vet=all"]
283    # Note: We sort the directories to be deterministic.
284    return [
285        TestSpec(directory=d, command=command)
286        for d in sorted(dirs_with_gofiles)
287    ]
288
289
290def main(argv):
291    default_toolchain_utils = os.path.abspath(os.path.dirname(__file__))
292
293    parser = argparse.ArgumentParser(description=__doc__)
294    parser.add_argument(
295        "--show_all_output",
296        action="store_true",
297        help="show stdout of successful tests",
298    )
299    parser.add_argument(
300        "--toolchain_utils",
301        default=default_toolchain_utils,
302        help="directory of toolchain-utils. Often auto-detected",
303    )
304    parser.add_argument(
305        "file", nargs="*", help="a file that we should run tests for"
306    )
307    parser.add_argument(
308        "--timeout",
309        default=120,
310        type=int,
311        help="Time to allow a test to execute before timing it out, in "
312        "seconds.",
313    )
314    args = parser.parse_args(argv)
315
316    modified_files = [os.path.abspath(f) for f in args.file]
317    show_all_output = args.show_all_output
318    toolchain_utils = args.toolchain_utils
319
320    if not modified_files:
321        print("No files given. Exit.")
322        return 0
323
324    _fix_python_path(toolchain_utils)
325
326    tests_to_run = _find_forced_subdir_python_tests(
327        modified_files, toolchain_utils
328    )
329    for f in modified_files:
330        tests_to_run += _autodetect_python_tests_for(f, toolchain_utils)
331    tests_to_run += _find_go_tests(modified_files)
332
333    # TestSpecs have lists, so we can't use a set. We'd likely want to keep them
334    # sorted for determinism anyway.
335    tests_to_run.sort()
336    tests_to_run = _compress_list(tests_to_run)
337
338    with multiprocessing.pool.ThreadPool() as pool:
339        success = _run_test_scripts(
340            pool, tests_to_run, args.timeout, show_all_output
341        )
342    return 0 if success else 1
343
344
345if __name__ == "__main__":
346    sys.exit(main(sys.argv[1:]))
347