• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3#
4# Copyright 2019 The ChromiumOS Authors
5# Use of this source code is governed by a BSD-style license that can be
6# found in the LICENSE file.
7
8"""Runs presubmit checks against a bundle of files."""
9
10import argparse
11import datetime
12import multiprocessing
13import multiprocessing.pool
14import os
15from pathlib import Path
16import re
17import shlex
18import shutil
19import subprocess
20import sys
21import threading
22import traceback
23import typing as t
24
25
26def run_command_unchecked(
27    command: t.List[str], cwd: str, env: t.Dict[str, str] = None
28) -> t.Tuple[int, str]:
29    """Runs a command in the given dir, returning its exit code and stdio."""
30    p = subprocess.run(
31        command,
32        check=False,
33        cwd=cwd,
34        stdin=subprocess.DEVNULL,
35        stdout=subprocess.PIPE,
36        stderr=subprocess.STDOUT,
37        env=env,
38        encoding="utf-8",
39        errors="replace",
40    )
41    return p.returncode, p.stdout
42
43
44def has_executable_on_path(exe: str) -> bool:
45    """Returns whether we have `exe` somewhere on our $PATH"""
46    return shutil.which(exe) is not None
47
48
49def escape_command(command: t.Iterable[str]) -> str:
50    """Returns a human-readable and copy-pastable shell command.
51
52    Only intended for use in output to users. shell=True is strongly discouraged.
53    """
54    return " ".join(shlex.quote(x) for x in command)
55
56
57def remove_deleted_files(files: t.Iterable[str]) -> t.List[str]:
58    return [f for f in files if os.path.exists(f)]
59
60
61def is_file_executable(file_path: str) -> bool:
62    return os.access(file_path, os.X_OK)
63
64
65# As noted in our docs, some of our Python code depends on modules that sit in
66# toolchain-utils/. Add that to PYTHONPATH to ensure that things like `cros
67# lint` are kept happy.
68def env_with_pythonpath(toolchain_utils_root: str) -> t.Dict[str, str]:
69    env = dict(os.environ)
70    if "PYTHONPATH" in env:
71        env["PYTHONPATH"] += ":" + toolchain_utils_root
72    else:
73        env["PYTHONPATH"] = toolchain_utils_root
74    return env
75
76
77# Each checker represents an independent check that's done on our sources.
78#
79# They should:
80#  - never write to stdout/stderr or read from stdin directly
81#  - return either a CheckResult, or a list of [(subcheck_name, CheckResult)]
82#  - ideally use thread_pool to check things concurrently
83#    - though it's important to note that these *also* live on the threadpool
84#      we've provided. It's the caller's responsibility to guarantee that at
85#      least ${number_of_concurrently_running_checkers}+1 threads are present
86#      in the pool. In order words, blocking on results from the provided
87#      threadpool is OK.
88CheckResult = t.NamedTuple(
89    "CheckResult",
90    (
91        ("ok", bool),
92        ("output", str),
93        ("autofix_commands", t.List[t.List[str]]),
94    ),
95)
96
97
98def get_check_result_or_catch(
99    task: multiprocessing.pool.ApplyResult,
100) -> CheckResult:
101    """Returns the result of task(); if that raises, returns a CheckResult.
102
103    The task is expected to return a CheckResult on get().
104    """
105    try:
106        return task.get()
107    except Exception:
108        return CheckResult(
109            ok=False,
110            output="Check exited with an unexpected exception:\n%s"
111            % traceback.format_exc(),
112            autofix_commands=[],
113        )
114
115
116def check_isort(
117    toolchain_utils_root: str, python_files: t.Iterable[str]
118) -> CheckResult:
119    """Subchecker of check_py_format. Checks python file formats with isort"""
120    chromite = Path("/mnt/host/source/chromite")
121    isort = chromite / "scripts" / "isort"
122    config_file = chromite / ".isort.cfg"
123
124    if not (isort.exists() and config_file.exists()):
125        return CheckResult(
126            ok=True,
127            output="isort not found; skipping",
128            autofix_commands=[],
129        )
130
131    config_file_flag = f"--settings-file={config_file}"
132    command = [isort, "-c", config_file_flag] + python_files
133    exit_code, stdout_and_stderr = run_command_unchecked(
134        command, cwd=toolchain_utils_root
135    )
136
137    # isort fails when files have broken formatting.
138    if not exit_code:
139        return CheckResult(
140            ok=True,
141            output="",
142            autofix_commands=[],
143        )
144
145    bad_files = []
146    bad_file_re = re.compile(
147        r"^ERROR: (.*) Imports are incorrectly sorted and/or formatted\.$"
148    )
149    for line in stdout_and_stderr.splitlines():
150        m = bad_file_re.match(line)
151        if m:
152            (file_name,) = m.groups()
153            bad_files.append(file_name.strip())
154
155    if not bad_files:
156        return CheckResult(
157            ok=False,
158            output="`%s` failed; stdout/stderr:\n%s"
159            % (escape_command(command), stdout_and_stderr),
160            autofix_commands=[],
161        )
162
163    autofix = [str(isort), config_file_flag] + bad_files
164    return CheckResult(
165        ok=False,
166        output="The following file(s) have formatting errors: %s" % bad_files,
167        autofix_commands=[autofix],
168    )
169
170
171def check_black(
172    toolchain_utils_root: str, black: Path, python_files: t.Iterable[str]
173) -> CheckResult:
174    """Subchecker of check_py_format. Checks python file formats with black"""
175    # Folks have been bitten by accidentally using multiple formatter versions in
176    # the past. This is an issue, since newer versions of black may format things
177    # differently. Make the version obvious.
178    command = [black, "--version"]
179    exit_code, stdout_and_stderr = run_command_unchecked(
180        command, cwd=toolchain_utils_root
181    )
182    if exit_code:
183        return CheckResult(
184            ok=False,
185            output=f"Failed getting black version; stdstreams: {stdout_and_stderr}",
186            autofix_commands=[],
187        )
188
189    black_version = stdout_and_stderr.strip()
190    black_invocation: t.List[str] = [str(black), "--line-length=80"]
191    command = black_invocation + ["--check"] + list(python_files)
192    exit_code, stdout_and_stderr = run_command_unchecked(
193        command, cwd=toolchain_utils_root
194    )
195    # black fails when files are poorly formatted.
196    if exit_code == 0:
197        return CheckResult(
198            ok=True,
199            output=f"Using {black_version!r}, no issues were found.",
200            autofix_commands=[],
201        )
202
203    # Output format looks something like:
204    # f'{complaints}\nOh no!{emojis}\n{summary}'
205    # Whittle it down to complaints.
206    complaints = stdout_and_stderr.split("\nOh no!", 1)
207    if len(complaints) != 2:
208        return CheckResult(
209            ok=False,
210            output=f"Unparseable `black` output:\n{stdout_and_stderr}",
211            autofix_commands=[],
212        )
213
214    bad_files = []
215    errors = []
216    refmt_prefix = "would reformat "
217    for line in complaints[0].strip().splitlines():
218        line = line.strip()
219        if line.startswith("error:"):
220            errors.append(line)
221            continue
222
223        if not line.startswith(refmt_prefix):
224            return CheckResult(
225                ok=False,
226                output=f"Unparseable `black` output:\n{stdout_and_stderr}",
227                autofix_commands=[],
228            )
229
230        bad_files.append(line[len(refmt_prefix) :].strip())
231
232    # If black had internal errors that it could handle, print them out and exit
233    # without an autofix.
234    if errors:
235        err_str = "\n".join(errors)
236        return CheckResult(
237            ok=False,
238            output=f"Using {black_version!r} had the following errors:\n{err_str}",
239            autofix_commands=[],
240        )
241
242    autofix = black_invocation + bad_files
243    return CheckResult(
244        ok=False,
245        output=f"Using {black_version!r}, these file(s) have formatting errors: "
246        f"{bad_files}",
247        autofix_commands=[autofix],
248    )
249
250
251def check_python_file_headers(python_files: t.Iterable[str]) -> CheckResult:
252    """Subchecker of check_py_format. Checks python #!s"""
253    add_hashbang = []
254    remove_hashbang = []
255
256    for python_file in python_files:
257        needs_hashbang = is_file_executable(python_file)
258        with open(python_file, encoding="utf-8") as f:
259            has_hashbang = f.read(2) == "#!"
260            if needs_hashbang == has_hashbang:
261                continue
262
263            if needs_hashbang:
264                add_hashbang.append(python_file)
265            else:
266                remove_hashbang.append(python_file)
267
268    autofix = []
269    output = []
270    if add_hashbang:
271        output.append(
272            "The following files have no #!, but need one: %s" % add_hashbang
273        )
274        autofix.append(["sed", "-i", "1i#!/usr/bin/env python3"] + add_hashbang)
275
276    if remove_hashbang:
277        output.append(
278            "The following files have a #!, but shouldn't: %s" % remove_hashbang
279        )
280        autofix.append(["sed", "-i", "1d"] + remove_hashbang)
281
282    if not output:
283        return CheckResult(
284            ok=True,
285            output="",
286            autofix_commands=[],
287        )
288    return CheckResult(
289        ok=False,
290        output="\n".join(output),
291        autofix_commands=autofix,
292    )
293
294
295def check_py_format(
296    toolchain_utils_root: str,
297    thread_pool: multiprocessing.pool.ThreadPool,
298    files: t.Iterable[str],
299) -> t.List[CheckResult]:
300    """Runs yapf on files to check for style bugs. Also checks for #!s."""
301    black = "black"
302    if not has_executable_on_path(black):
303        return CheckResult(
304            ok=False,
305            output="black isn't available on your $PATH. Please either "
306            "enter a chroot, or place depot_tools on your $PATH.",
307            autofix_commands=[],
308        )
309
310    python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
311    if not python_files:
312        return CheckResult(
313            ok=True,
314            output="no python files to check",
315            autofix_commands=[],
316        )
317
318    tasks = [
319        (
320            "check_black",
321            thread_pool.apply_async(
322                check_black, (toolchain_utils_root, black, python_files)
323            ),
324        ),
325        (
326            "check_isort",
327            thread_pool.apply_async(
328                check_isort, (toolchain_utils_root, python_files)
329            ),
330        ),
331        (
332            "check_file_headers",
333            thread_pool.apply_async(check_python_file_headers, (python_files,)),
334        ),
335    ]
336    return [(name, get_check_result_or_catch(task)) for name, task in tasks]
337
338
339def find_chromeos_root_directory() -> t.Optional[str]:
340    return os.getenv("CHROMEOS_ROOT_DIRECTORY")
341
342
343def check_cros_lint(
344    toolchain_utils_root: str,
345    thread_pool: multiprocessing.pool.ThreadPool,
346    files: t.Iterable[str],
347) -> t.Union[t.List[CheckResult], CheckResult]:
348    """Runs `cros lint`"""
349
350    fixed_env = env_with_pythonpath(toolchain_utils_root)
351
352    # We have to support users who don't have a chroot. So we either run `cros
353    # lint` (if it's been made available to us), or we try a mix of
354    # pylint+golint.
355    def try_run_cros_lint(cros_binary: str) -> t.Optional[CheckResult]:
356        exit_code, output = run_command_unchecked(
357            [cros_binary, "lint", "--"] + files,
358            toolchain_utils_root,
359            env=fixed_env,
360        )
361
362        # This is returned specifically if cros couldn't find the ChromeOS tree
363        # root.
364        if exit_code == 127:
365            return None
366
367        return CheckResult(
368            ok=exit_code == 0,
369            output=output,
370            autofix_commands=[],
371        )
372
373    cros_lint = try_run_cros_lint("cros")
374    if cros_lint is not None:
375        return cros_lint
376
377    cros_root = find_chromeos_root_directory()
378    if cros_root:
379        cros_lint = try_run_cros_lint(
380            os.path.join(cros_root, "chromite/bin/cros")
381        )
382        if cros_lint is not None:
383            return cros_lint
384
385    tasks = []
386
387    def check_result_from_command(command: t.List[str]) -> CheckResult:
388        exit_code, output = run_command_unchecked(
389            command, toolchain_utils_root, env=fixed_env
390        )
391        return CheckResult(
392            ok=exit_code == 0,
393            output=output,
394            autofix_commands=[],
395        )
396
397    python_files = [f for f in remove_deleted_files(files) if f.endswith(".py")]
398    if python_files:
399
400        def run_pylint() -> CheckResult:
401            # pylint is required. Fail hard if it DNE.
402            return check_result_from_command(["pylint"] + python_files)
403
404        tasks.append(("pylint", thread_pool.apply_async(run_pylint)))
405
406    go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
407    if go_files:
408
409        def run_golint() -> CheckResult:
410            if has_executable_on_path("golint"):
411                return check_result_from_command(
412                    ["golint", "-set_exit_status"] + go_files
413                )
414
415            complaint = "\n".join(
416                (
417                    "WARNING: go linting disabled. golint is not on your $PATH.",
418                    "Please either enter a chroot, or install go locally. Continuing.",
419                )
420            )
421            return CheckResult(
422                ok=True,
423                output=complaint,
424                autofix_commands=[],
425            )
426
427        tasks.append(("golint", thread_pool.apply_async(run_golint)))
428
429    complaint = "\n".join(
430        (
431            "WARNING: No ChromeOS checkout detected, and no viable CrOS tree",
432            "found; falling back to linting only python and go. If you have a",
433            "ChromeOS checkout, please either develop from inside of the source",
434            "tree, or set $CHROMEOS_ROOT_DIRECTORY to the root of it.",
435        )
436    )
437
438    results = [(name, get_check_result_or_catch(task)) for name, task in tasks]
439    if not results:
440        return CheckResult(
441            ok=True,
442            output=complaint,
443            autofix_commands=[],
444        )
445
446    # We need to complain _somewhere_.
447    name, angry_result = results[0]
448    angry_complaint = (complaint + "\n\n" + angry_result.output).strip()
449    results[0] = (name, angry_result._replace(output=angry_complaint))
450    return results
451
452
453def check_go_format(toolchain_utils_root, _thread_pool, files):
454    """Runs gofmt on files to check for style bugs."""
455    gofmt = "gofmt"
456    if not has_executable_on_path(gofmt):
457        return CheckResult(
458            ok=False,
459            output="gofmt isn't available on your $PATH. Please either "
460            "enter a chroot, or place your go bin/ directory on your $PATH.",
461            autofix_commands=[],
462        )
463
464    go_files = [f for f in remove_deleted_files(files) if f.endswith(".go")]
465    if not go_files:
466        return CheckResult(
467            ok=True,
468            output="no go files to check",
469            autofix_commands=[],
470        )
471
472    command = [gofmt, "-l"] + go_files
473    exit_code, output = run_command_unchecked(command, cwd=toolchain_utils_root)
474
475    if exit_code:
476        return CheckResult(
477            ok=False,
478            output="%s failed; stdout/stderr:\n%s"
479            % (escape_command(command), output),
480            autofix_commands=[],
481        )
482
483    output = output.strip()
484    if not output:
485        return CheckResult(
486            ok=True,
487            output="",
488            autofix_commands=[],
489        )
490
491    broken_files = [x.strip() for x in output.splitlines()]
492    autofix = [gofmt, "-w"] + broken_files
493    return CheckResult(
494        ok=False,
495        output="The following Go files have incorrect "
496        "formatting: %s" % broken_files,
497        autofix_commands=[autofix],
498    )
499
500
501def check_tests(
502    toolchain_utils_root: str,
503    _thread_pool: multiprocessing.pool.ThreadPool,
504    files: t.List[str],
505) -> CheckResult:
506    """Runs tests."""
507    exit_code, stdout_and_stderr = run_command_unchecked(
508        [os.path.join(toolchain_utils_root, "run_tests_for.py"), "--"] + files,
509        toolchain_utils_root,
510    )
511    return CheckResult(
512        ok=exit_code == 0,
513        output=stdout_and_stderr,
514        autofix_commands=[],
515    )
516
517
518def detect_toolchain_utils_root() -> str:
519    return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
520
521
522def process_check_result(
523    check_name: str,
524    check_results: t.Union[t.List[CheckResult], CheckResult],
525    start_time: datetime.datetime,
526) -> t.Tuple[bool, t.List[t.List[str]]]:
527    """Prints human-readable output for the given check_results."""
528    indent = "  "
529
530    def indent_block(text: str) -> str:
531        return indent + text.replace("\n", "\n" + indent)
532
533    if isinstance(check_results, CheckResult):
534        ok, output, autofix_commands = check_results
535        if not ok and autofix_commands:
536            recommendation = "Recommended command(s) to fix this: %s" % [
537                escape_command(x) for x in autofix_commands
538            ]
539            if output:
540                output += "\n" + recommendation
541            else:
542                output = recommendation
543    else:
544        output_pieces = []
545        autofix_commands = []
546        for subname, (ok, output, autofix) in check_results:
547            status = "succeeded" if ok else "failed"
548            message = ["*** %s.%s %s" % (check_name, subname, status)]
549            if output:
550                message.append(indent_block(output))
551            if not ok and autofix:
552                message.append(
553                    indent_block(
554                        "Recommended command(s) to fix this: %s"
555                        % [escape_command(x) for x in autofix]
556                    )
557                )
558
559            output_pieces.append("\n".join(message))
560            autofix_commands += autofix
561
562        ok = all(x.ok for _, x in check_results)
563        output = "\n\n".join(output_pieces)
564
565    time_taken = datetime.datetime.now() - start_time
566    if ok:
567        print("*** %s succeeded after %s" % (check_name, time_taken))
568    else:
569        print("*** %s failed after %s" % (check_name, time_taken))
570
571    if output:
572        print(indent_block(output))
573
574    print()
575    return ok, autofix_commands
576
577
578def try_autofix(
579    all_autofix_commands: t.List[t.List[str]], toolchain_utils_root: str
580) -> None:
581    """Tries to run all given autofix commands, if appropriate."""
582    if not all_autofix_commands:
583        return
584
585    exit_code, output = run_command_unchecked(
586        ["git", "status", "--porcelain"], cwd=toolchain_utils_root
587    )
588    if exit_code != 0:
589        print("Autofix aborted: couldn't get toolchain-utils git status.")
590        return
591
592    if output.strip():
593        # A clean repo makes checking/undoing autofix commands trivial. A dirty
594        # one... less so. :)
595        print("Git repo seems dirty; skipping autofix.")
596        return
597
598    anything_succeeded = False
599    for command in all_autofix_commands:
600        exit_code, output = run_command_unchecked(
601            command, cwd=toolchain_utils_root
602        )
603
604        if exit_code:
605            print(
606                "*** Autofix command `%s` exited with code %d; stdout/stderr:"
607                % (escape_command(command), exit_code)
608            )
609            print(output)
610        else:
611            print("*** Autofix `%s` succeeded" % escape_command(command))
612            anything_succeeded = True
613
614    if anything_succeeded:
615        print(
616            "NOTE: Autofixes have been applied. Please check your tree, since "
617            "some lints may now be fixed"
618        )
619
620
621def find_repo_root(base_dir: str) -> t.Optional[str]:
622    current = base_dir
623    while current != "/":
624        if os.path.isdir(os.path.join(current, ".repo")):
625            return current
626        current = os.path.dirname(current)
627    return None
628
629
630def is_in_chroot() -> bool:
631    return os.path.exists("/etc/cros_chroot_version")
632
633
634def maybe_reexec_inside_chroot(autofix: bool, files: t.List[str]) -> None:
635    if is_in_chroot():
636        return
637
638    enter_chroot = True
639    chdir_to = None
640    toolchain_utils = detect_toolchain_utils_root()
641    if find_repo_root(toolchain_utils) is None:
642        chromeos_root_dir = find_chromeos_root_directory()
643        if chromeos_root_dir is None:
644            print(
645                "Standalone toolchain-utils checkout detected; cannot enter "
646                "chroot."
647            )
648            enter_chroot = False
649        else:
650            chdir_to = chromeos_root_dir
651
652    if not has_executable_on_path("cros_sdk"):
653        print("No `cros_sdk` detected on $PATH; cannot enter chroot.")
654        enter_chroot = False
655
656    if not enter_chroot:
657        print(
658            "Giving up on entering the chroot; be warned that some presubmits "
659            "may be broken."
660        )
661        return
662
663    # We'll be changing ${PWD}, so make everything relative to toolchain-utils,
664    # which resides at a well-known place inside of the chroot.
665    chroot_toolchain_utils = "/mnt/host/source/src/third_party/toolchain-utils"
666
667    def rebase_path(path: str) -> str:
668        return os.path.join(
669            chroot_toolchain_utils, os.path.relpath(path, toolchain_utils)
670        )
671
672    args = [
673        "cros_sdk",
674        "--enter",
675        "--",
676        rebase_path(__file__),
677    ]
678
679    if not autofix:
680        args.append("--no_autofix")
681    args.extend(rebase_path(x) for x in files)
682
683    if chdir_to is None:
684        print("Attempting to enter the chroot...")
685    else:
686        print(f"Attempting to enter the chroot for tree at {chdir_to}...")
687        os.chdir(chdir_to)
688    os.execvp(args[0], args)
689
690
691def ensure_pip_deps_installed() -> None:
692    if not has_executable_on_path("pip"):
693        print("Autoinstalling `pip`...")
694        subprocess.check_call(["sudo", "emerge", "dev-python/pip"])
695
696    for package in ("scipy", "yapf"):
697        exit_code = subprocess.call(
698            ["python3", "-c", f"import {package}"],
699            stdout=subprocess.DEVNULL,
700            stderr=subprocess.DEVNULL,
701        )
702        if exit_code != 0:
703            print(f"Autoinstalling `{package}`...")
704            subprocess.check_call(["pip", "install", "--user", package])
705
706
707def main(argv: t.List[str]) -> int:
708    parser = argparse.ArgumentParser(description=__doc__)
709    parser.add_argument(
710        "--no_autofix",
711        dest="autofix",
712        action="store_false",
713        help="Don't run any autofix commands.",
714    )
715    parser.add_argument(
716        "--no_enter_chroot",
717        dest="enter_chroot",
718        action="store_false",
719        help="Prevent auto-entering the chroot if we're not already in it.",
720    )
721    parser.add_argument("files", nargs="*")
722    opts = parser.parse_args(argv)
723
724    files = opts.files
725    if not files:
726        return 0
727
728    if opts.enter_chroot:
729        maybe_reexec_inside_chroot(opts.autofix, opts.files)
730
731    # If you ask for --no_enter_chroot, you're on your own for installing these
732    # things.
733    if is_in_chroot():
734        ensure_pip_deps_installed()
735
736    files = [os.path.abspath(f) for f in files]
737
738    # Note that we extract .__name__s from these, so please name them in a
739    # user-friendly way.
740    checks = [
741        check_cros_lint,
742        check_py_format,
743        check_go_format,
744        check_tests,
745    ]
746
747    toolchain_utils_root = detect_toolchain_utils_root()
748
749    # NOTE: As mentioned above, checks can block on threads they spawn in this
750    # pool, so we need at least len(checks)+1 threads to avoid deadlock. Use *2
751    # so all checks can make progress at a decent rate.
752    num_threads = max(multiprocessing.cpu_count(), len(checks) * 2)
753    start_time = datetime.datetime.now()
754
755    # For our single print statement...
756    spawn_print_lock = threading.RLock()
757
758    def run_check(check_fn):
759        name = check_fn.__name__
760        with spawn_print_lock:
761            print("*** Spawning %s" % name)
762        return name, check_fn(toolchain_utils_root, pool, files)
763
764    # ThreadPool is a ContextManager in py3.
765    # pylint: disable=not-context-manager
766    with multiprocessing.pool.ThreadPool(num_threads) as pool:
767        all_checks_ok = True
768        all_autofix_commands = []
769        for check_name, result in pool.imap_unordered(run_check, checks):
770            ok, autofix_commands = process_check_result(
771                check_name, result, start_time
772            )
773            all_checks_ok = ok and all_checks_ok
774            all_autofix_commands += autofix_commands
775
776    # Run these after everything settles, so:
777    # - we don't collide with checkers that are running concurrently
778    # - we clearly print out everything that went wrong ahead of time, in case
779    #   any of these fail
780    if opts.autofix:
781        try_autofix(all_autofix_commands, toolchain_utils_root)
782
783    if not all_checks_ok:
784        return 1
785    return 0
786
787
788if __name__ == "__main__":
789    sys.exit(main(sys.argv[1:]))
790