• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Tools for running presubmit checks in a Git repository.
15
16Presubmit checks are defined as a function or other callable. The function may
17take either no arguments or a list of the paths on which to run. Presubmit
18checks communicate failure by raising any exception.
19
20For example, either of these functions may be used as presubmit checks:
21
22  @pw_presubmit.filter_paths(endswith='.py')
23  def file_contains_ni(ctx: PresubmitContext):
24      for path in ctx.paths:
25          with open(path) as file:
26              contents = file.read()
27              if 'ni' not in contents and 'nee' not in contents:
28                  raise PresumitFailure('Files must say "ni"!', path=path)
29
30  def run_the_build():
31      subprocess.run(['make', 'release'], check=True)
32
33Presubmit checks that accept a list of paths may use the filter_paths decorator
34to automatically filter the paths list for file types they care about. See the
35pragma_once function for an example.
36
37See pigweed_presbumit.py for an example of how to define presubmit checks.
38"""
39
40from __future__ import annotations
41
42import collections
43import contextlib
44import copy
45import dataclasses
46import enum
47from inspect import Parameter, signature
48import itertools
49import json
50import logging
51import os
52from pathlib import Path
53import re
54import signal
55import subprocess
56import sys
57import tempfile
58import time
59import types
60from typing import (
61    Any,
62    Callable,
63    Collection,
64    Iterable,
65    Iterator,
66    Pattern,
67    Sequence,
68    Set,
69)
70
71import pw_cli.color
72import pw_cli.env
73from pw_cli.plural import plural
74from pw_cli.file_filter import FileFilter
75from pw_package import package_manager
76from pw_presubmit import git_repo, tools
77from pw_presubmit.presubmit_context import (
78    FormatContext,
79    FormatOptions,
80    LuciContext,
81    PRESUBMIT_CONTEXT,
82    PresubmitContext,
83    PresubmitFailure,
84    log_check_traces,
85)
86
87_LOG: logging.Logger = logging.getLogger(__name__)
88
89_COLOR = pw_cli.color.colors()
90
91_SUMMARY_BOX = '══╦╗ ║║══╩╝'
92_CHECK_UPPER = '━━━┓       '
93_CHECK_LOWER = '       ━━━┛'
94
95WIDTH = 80
96
97_LEFT = 7
98_RIGHT = 11
99
100
101def _title(msg, style=_SUMMARY_BOX) -> str:
102    msg = f' {msg} '.center(WIDTH - 2)
103    return tools.make_box('^').format(*style, section1=msg, width1=len(msg))
104
105
106def _format_time(time_s: float) -> str:
107    minutes, seconds = divmod(time_s, 60)
108    if minutes < 60:
109        return f' {int(minutes)}:{seconds:04.1f}'
110    hours, minutes = divmod(minutes, 60)
111    return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}'
112
113
114def _box(style, left, middle, right, box=tools.make_box('><>')) -> str:
115    return box.format(
116        *style,
117        section1=left + ('' if left.endswith(' ') else ' '),
118        width1=_LEFT,
119        section2=' ' + middle,
120        width2=WIDTH - _LEFT - _RIGHT - 4,
121        section3=right + ' ',
122        width3=_RIGHT,
123    )
124
125
126class PresubmitResult(enum.Enum):
127    PASS = 'PASSED'  # Check completed successfully.
128    FAIL = 'FAILED'  # Check failed.
129    CANCEL = 'CANCEL'  # Check didn't complete.
130
131    def colorized(self, width: int, invert: bool = False) -> str:
132        if self is PresubmitResult.PASS:
133            color = _COLOR.black_on_green if invert else _COLOR.green
134        elif self is PresubmitResult.FAIL:
135            color = _COLOR.black_on_red if invert else _COLOR.red
136        elif self is PresubmitResult.CANCEL:
137            color = _COLOR.yellow
138        else:
139            color = lambda value: value
140
141        padding = (width - len(self.value)) // 2 * ' '
142        return padding + color(self.value) + padding
143
144
145class Program(collections.abc.Sequence):
146    """A sequence of presubmit checks; basically a tuple with a name."""
147
148    def __init__(self, name: str, steps: Iterable[Callable]):
149        self.name = name
150
151        def ensure_check(step):
152            if isinstance(step, Check):
153                return step
154            return Check(step)
155
156        self._steps: tuple[Check, ...] = tuple(
157            {ensure_check(s): None for s in tools.flatten(steps)}
158        )
159
160    def __getitem__(self, i):
161        return self._steps[i]
162
163    def __len__(self):
164        return len(self._steps)
165
166    def __str__(self):
167        return self.name
168
169    def title(self):
170        return f'{self.name if self.name else ""} presubmit checks'.strip()
171
172
173class Programs(collections.abc.Mapping):
174    """A mapping of presubmit check programs.
175
176    Use is optional. Helpful when managing multiple presubmit check programs.
177    """
178
179    def __init__(self, **programs: Sequence):
180        """Initializes a name: program mapping from the provided keyword args.
181
182        A program is a sequence of presubmit check functions. The sequence may
183        contain nested sequences, which are flattened.
184        """
185        self._programs: dict[str, Program] = {
186            name: Program(name, checks) for name, checks in programs.items()
187        }
188
189    def all_steps(self) -> dict[str, Check]:
190        return {c.name: c for c in itertools.chain(*self.values())}
191
192    def __getitem__(self, item: str) -> Program:
193        return self._programs[item]
194
195    def __iter__(self) -> Iterator[str]:
196        return iter(self._programs)
197
198    def __len__(self) -> int:
199        return len(self._programs)
200
201
202def download_cas_artifact(
203    ctx: PresubmitContext, digest: str, output_dir: str
204) -> None:
205    """Downloads the given digest to the given outputdirectory
206
207    Args:
208        ctx: the presubmit context
209        digest:
210        a string digest in the form "<digest hash>/<size bytes>"
211        i.e 693a04e41374150d9d4b645fccb49d6f96e10b527c7a24b1e17b331f508aa73b/86
212        output_dir: the directory we want to download the artifacts to
213    """
214    if ctx.luci is None:
215        raise PresubmitFailure('Lucicontext is None')
216    cmd = [
217        'cas',
218        'download',
219        '-cas-instance',
220        ctx.luci.cas_instance,
221        '-digest',
222        digest,
223        '-dir',
224        output_dir,
225    ]
226    try:
227        subprocess.check_call(cmd)
228    except subprocess.CalledProcessError as failure:
229        raise PresubmitFailure('cas download failed') from failure
230
231
232def archive_cas_artifact(
233    ctx: PresubmitContext, root: str, upload_paths: list[str]
234) -> str:
235    """Uploads the given artifacts into cas
236
237    Args:
238        ctx: the presubmit context
239        root: root directory of archived tree, should be absolutepath.
240        paths: path to archived files/dirs, should be absolute path.
241            If empty, [root] will be used.
242
243    Returns:
244        A string digest in the form "<digest hash>/<size bytes>"
245        i.e 693a04e41374150d9d4b645fccb49d6f96e10b527c7a24b1e17b331f508aa73b/86
246    """
247    if ctx.luci is None:
248        raise PresubmitFailure('Lucicontext is None')
249    assert os.path.abspath(root)
250    if not upload_paths:
251        upload_paths = [root]
252    for path in upload_paths:
253        assert os.path.abspath(path)
254
255    with tempfile.NamedTemporaryFile(mode='w+t') as tmp_digest_file:
256        with tempfile.NamedTemporaryFile(mode='w+t') as tmp_paths_file:
257            json_paths = json.dumps(
258                [
259                    [str(root), str(os.path.relpath(path, root))]
260                    for path in upload_paths
261                ]
262            )
263            tmp_paths_file.write(json_paths)
264            tmp_paths_file.seek(0)
265            cmd = [
266                'cas',
267                'archive',
268                '-cas-instance',
269                ctx.luci.cas_instance,
270                '-paths-json',
271                tmp_paths_file.name,
272                '-dump-digest',
273                tmp_digest_file.name,
274            ]
275            try:
276                subprocess.check_call(cmd)
277            except subprocess.CalledProcessError as failure:
278                raise PresubmitFailure('cas archive failed') from failure
279
280            tmp_digest_file.seek(0)
281            uploaded_digest = tmp_digest_file.read()
282            return uploaded_digest
283
284
285def _print_ui(*args) -> None:
286    """Prints to stdout and flushes to stay in sync with logs on stderr."""
287    print(*args, flush=True)
288
289
290@dataclasses.dataclass
291class FilteredCheck:
292    check: Check
293    paths: Sequence[Path]
294    substep: str | None = None
295
296    @property
297    def name(self) -> str:
298        return self.check.name
299
300    def run(self, ctx: PresubmitContext, count: int, total: int):
301        return self.check.run(ctx, count, total, self.substep)
302
303
304class Presubmit:
305    """Runs a series of presubmit checks on a list of files."""
306
307    def __init__(  # pylint: disable=too-many-arguments
308        self,
309        root: Path,
310        repos: Sequence[Path],
311        output_directory: Path,
312        paths: Sequence[Path],
313        all_paths: Sequence[Path],
314        package_root: Path,
315        override_gn_args: dict[str, str],
316        continue_after_build_error: bool,
317        rng_seed: int,
318        full: bool,
319    ):
320        self._root = root.resolve()
321        self._repos = tuple(repos)
322        self._output_directory = output_directory.resolve()
323        self._paths = tuple(paths)
324        self._all_paths = tuple(all_paths)
325        self._relative_paths = tuple(
326            tools.relative_paths(self._paths, self._root)
327        )
328        self._package_root = package_root.resolve()
329        self._override_gn_args = override_gn_args
330        self._continue_after_build_error = continue_after_build_error
331        self._rng_seed = rng_seed
332        self._full = full
333
334    def run(
335        self,
336        program: Program,
337        keep_going: bool = False,
338        substep: str | None = None,
339        dry_run: bool = False,
340    ) -> bool:
341        """Executes a series of presubmit checks on the paths."""
342        checks = self.apply_filters(program)
343        if substep:
344            assert (
345                len(checks) == 1
346            ), 'substeps not supported with multiple steps'
347            checks[0].substep = substep
348
349        _LOG.debug('Running %s for %s', program.title(), self._root.name)
350        _print_ui(_title(f'{self._root.name}: {program.title()}'))
351
352        _LOG.info(
353            '%d of %d checks apply to %s in %s',
354            len(checks),
355            len(program),
356            plural(self._paths, 'file'),
357            self._root,
358        )
359
360        _print_ui()
361        for line in tools.file_summary(self._relative_paths):
362            _print_ui(line)
363        _print_ui()
364
365        if not self._paths:
366            _print_ui(_COLOR.yellow('No files are being checked!'))
367
368        _LOG.debug('Checks:\n%s', '\n'.join(c.name for c in checks))
369
370        start_time: float = time.time()
371        passed, failed, skipped = self._execute_checks(
372            checks, keep_going, dry_run
373        )
374        self._log_summary(time.time() - start_time, passed, failed, skipped)
375
376        return not failed and not skipped
377
378    def apply_filters(self, program: Sequence[Callable]) -> list[FilteredCheck]:
379        """Returns list of FilteredCheck for checks that should run."""
380        checks = [c if isinstance(c, Check) else Check(c) for c in program]
381        filter_to_checks: dict[
382            FileFilter, list[Check]
383        ] = collections.defaultdict(list)
384
385        for chk in checks:
386            filter_to_checks[chk.filter].append(chk)
387
388        check_to_paths = self._map_checks_to_paths(filter_to_checks)
389        return [
390            FilteredCheck(c, check_to_paths[c])
391            for c in checks
392            if c in check_to_paths
393        ]
394
395    def _map_checks_to_paths(
396        self, filter_to_checks: dict[FileFilter, list[Check]]
397    ) -> dict[Check, Sequence[Path]]:
398        checks_to_paths: dict[Check, Sequence[Path]] = {}
399
400        posix_paths = tuple(p.as_posix() for p in self._relative_paths)
401
402        for filt, checks in filter_to_checks.items():
403            filtered_paths = tuple(
404                path
405                for path, filter_path in zip(self._paths, posix_paths)
406                if filt.matches(filter_path)
407            )
408
409            for chk in checks:
410                if filtered_paths or chk.always_run:
411                    checks_to_paths[chk] = filtered_paths
412                else:
413                    _LOG.debug('Skipping "%s": no relevant files', chk.name)
414
415        return checks_to_paths
416
417    def _log_summary(
418        self, time_s: float, passed: int, failed: int, skipped: int
419    ) -> None:
420        summary_items = []
421        if passed:
422            summary_items.append(f'{passed} passed')
423        if failed:
424            summary_items.append(f'{failed} failed')
425        if skipped:
426            summary_items.append(f'{skipped} not run')
427        summary = ', '.join(summary_items) or 'nothing was done'
428
429        if failed or skipped:
430            result = PresubmitResult.FAIL
431        else:
432            result = PresubmitResult.PASS
433        total = passed + failed + skipped
434
435        _LOG.debug(
436            'Finished running %d checks on %s in %.1f s',
437            total,
438            plural(self._paths, 'file'),
439            time_s,
440        )
441        _LOG.debug('Presubmit checks %s: %s', result.value, summary)
442
443        _print_ui(
444            _box(
445                _SUMMARY_BOX,
446                result.colorized(_LEFT, invert=True),
447                f'{total} checks on {plural(self._paths, "file")}: {summary}',
448                _format_time(time_s),
449            )
450        )
451
452    def _create_presubmit_context(  # pylint: disable=no-self-use
453        self, **kwargs
454    ):
455        """Create a PresubmitContext. Override if needed in subclasses."""
456        return PresubmitContext(**kwargs)
457
458    @contextlib.contextmanager
459    def _context(self, filtered_check: FilteredCheck, dry_run: bool = False):
460        # There are many characters banned from filenames on Windows. To
461        # simplify things, just strip everything that's not a letter, digit,
462        # or underscore.
463        sanitized_name = re.sub(r'[\W_]+', '_', filtered_check.name).lower()
464        output_directory = self._output_directory.joinpath(sanitized_name)
465        os.makedirs(output_directory, exist_ok=True)
466
467        failure_summary_log = output_directory / 'failure-summary.log'
468        failure_summary_log.unlink(missing_ok=True)
469
470        handler = logging.FileHandler(
471            output_directory.joinpath('step.log'), mode='w'
472        )
473        handler.setLevel(logging.DEBUG)
474
475        try:
476            _LOG.addHandler(handler)
477
478            yield self._create_presubmit_context(
479                root=self._root,
480                repos=self._repos,
481                output_dir=output_directory,
482                failure_summary_log=failure_summary_log,
483                paths=filtered_check.paths,
484                all_paths=self._all_paths,
485                package_root=self._package_root,
486                override_gn_args=self._override_gn_args,
487                continue_after_build_error=self._continue_after_build_error,
488                rng_seed=self._rng_seed,
489                full=self._full,
490                luci=LuciContext.create_from_environment(),
491                format_options=FormatOptions.load(),
492                dry_run=dry_run,
493            )
494
495        finally:
496            _LOG.removeHandler(handler)
497
498    def _execute_checks(
499        self,
500        program: list[FilteredCheck],
501        keep_going: bool,
502        dry_run: bool = False,
503    ) -> tuple[int, int, int]:
504        """Runs presubmit checks; returns (passed, failed, skipped) lists."""
505        passed = failed = 0
506
507        for i, filtered_check in enumerate(program, 1):
508            with self._context(filtered_check, dry_run) as ctx:
509                result = filtered_check.run(ctx, i, len(program))
510
511            if result is PresubmitResult.PASS:
512                passed += 1
513            elif result is PresubmitResult.CANCEL:
514                break
515            else:
516                failed += 1
517                if not keep_going:
518                    break
519
520        return passed, failed, len(program) - passed - failed
521
522
523def _process_pathspecs(
524    repos: Iterable[Path], pathspecs: Iterable[str]
525) -> dict[Path, list[str]]:
526    pathspecs_by_repo: dict[Path, list[str]] = {repo: [] for repo in repos}
527    repos_with_paths: Set[Path] = set()
528
529    for pathspec in pathspecs:
530        # If the pathspec is a path to an existing file, only use it for the
531        # repo it is in.
532        if os.path.exists(pathspec):
533            # Raise an exception if the path exists but is not in a known repo.
534            repo = git_repo.within_repo(pathspec)
535            if repo not in pathspecs_by_repo:
536                raise ValueError(
537                    f'{pathspec} is not in a Git repository in this presubmit'
538                )
539
540            # Make the path relative to the repo's root.
541            pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo))
542            repos_with_paths.add(repo)
543        else:
544            # Pathspecs that are not paths (e.g. '*.h') are used for all repos.
545            for patterns in pathspecs_by_repo.values():
546                patterns.append(pathspec)
547
548    # If any paths were specified, only search for paths in those repos.
549    if repos_with_paths:
550        for repo in set(pathspecs_by_repo) - repos_with_paths:
551            del pathspecs_by_repo[repo]
552
553    return pathspecs_by_repo
554
555
556def fetch_file_lists(
557    root: Path,
558    repo: Path,
559    pathspecs: list[str],
560    exclude: Sequence[Pattern] = (),
561    base: str | None = None,
562) -> tuple[list[Path], list[Path]]:
563    """Returns lists of all files and modified files for the given repo.
564
565    Args:
566        root: root path of the project
567        repo: path to the roots of Git repository to check
568        base: optional base Git commit to list files against
569        pathspecs: optional list of Git pathspecs to run the checks against
570        exclude: regular expressions for Posix-style paths to exclude
571    """
572
573    all_files: list[Path] = []
574    modified_files: list[Path] = []
575
576    all_files_repo = tuple(
577        tools.exclude_paths(
578            exclude, git_repo.list_files(None, pathspecs, repo), root
579        )
580    )
581    all_files += all_files_repo
582
583    if base is None:
584        modified_files += all_files_repo
585    else:
586        modified_files += tools.exclude_paths(
587            exclude, git_repo.list_files(base, pathspecs, repo), root
588        )
589
590    _LOG.info(
591        'Checking %s',
592        git_repo.describe_files(repo, repo, base, pathspecs, exclude, root),
593    )
594
595    return all_files, modified_files
596
597
598def run(  # pylint: disable=too-many-arguments,too-many-locals
599    program: Sequence[Check],
600    root: Path,
601    repos: Collection[Path] = (),
602    base: str | None = None,
603    paths: Sequence[str] = (),
604    exclude: Sequence[Pattern] = (),
605    output_directory: Path | None = None,
606    package_root: Path | None = None,
607    only_list_steps: bool = False,
608    override_gn_args: Sequence[tuple[str, str]] = (),
609    keep_going: bool = False,
610    continue_after_build_error: bool = False,
611    rng_seed: int = 1,
612    presubmit_class: type = Presubmit,
613    list_steps_file: Path | None = None,
614    substep: str | None = None,
615    dry_run: bool = False,
616) -> bool:
617    """Lists files in the current Git repo and runs a Presubmit with them.
618
619    This changes the directory to the root of the Git repository after listing
620    paths, so all presubmit checks can assume they run from there.
621
622    The paths argument contains Git pathspecs. If no pathspecs are provided, all
623    paths in all repos are included. If paths to files or directories are
624    provided, only files within those repositories are searched. Patterns are
625    searched across all repositories. For example, if the pathspecs "my_module/"
626    and "*.h", paths under "my_module/" in the containing repo and paths in all
627    repos matching "*.h" will be included in the presubmit.
628
629    Args:
630        program: list of presubmit check functions to run
631        root: root path of the project
632        repos: paths to the roots of Git repositories to check
633        name: name to use to refer to this presubmit check run
634        base: optional base Git commit to list files against
635        paths: optional list of Git pathspecs to run the checks against
636        exclude: regular expressions for Posix-style paths to exclude
637        output_directory: where to place output files
638        package_root: where to place package files
639        only_list_steps: print step names instead of running them
640        override_gn_args: additional GN args to set on steps
641        keep_going: continue running presubmit steps after a step fails
642        continue_after_build_error: continue building if a build step fails
643        rng_seed: seed for a random number generator, for the few steps that
644            need one
645        presubmit_class: class to use to run Presubmits, should inherit from
646            Presubmit class above
647        list_steps_file: File created by --only-list-steps, used to keep from
648            recalculating affected files.
649        substep: run only part of a single check
650
651    Returns:
652        True if all presubmit checks succeeded
653    """
654    repos = [repo.resolve() for repo in repos]
655
656    non_empty_repos = []
657    for repo in repos:
658        if list(repo.iterdir()):
659            non_empty_repos.append(repo)
660            if git_repo.root(repo) != repo:
661                raise ValueError(
662                    f'{repo} is not the root of a Git repo; '
663                    'presubmit checks must be run from a Git repo'
664                )
665    repos = non_empty_repos
666
667    pathspecs_by_repo = _process_pathspecs(repos, paths)
668
669    all_files: list[Path] = []
670    modified_files: list[Path] = []
671    list_steps_data: dict[str, Any] = {}
672
673    if list_steps_file:
674        with list_steps_file.open() as ins:
675            list_steps_data = json.load(ins)
676        all_files.extend(list_steps_data['all_files'])
677        for step in list_steps_data['steps']:
678            modified_files.extend(Path(x) for x in step.get("paths", ()))
679        modified_files = sorted(set(modified_files))
680        _LOG.info(
681            'Loaded %d paths from file %s',
682            len(modified_files),
683            list_steps_file,
684        )
685
686    else:
687        for repo, pathspecs in pathspecs_by_repo.items():
688            new_all_files_items, new_modified_file_items = fetch_file_lists(
689                root, repo, pathspecs, exclude, base
690            )
691            all_files.extend(new_all_files_items)
692            modified_files.extend(new_modified_file_items)
693
694    if output_directory is None:
695        output_directory = root / '.presubmit'
696
697    if package_root is None:
698        package_root = output_directory / 'packages'
699
700    presubmit = presubmit_class(
701        root=root,
702        repos=repos,
703        output_directory=output_directory,
704        paths=modified_files,
705        all_paths=all_files,
706        package_root=package_root,
707        override_gn_args=dict(override_gn_args or {}),
708        continue_after_build_error=continue_after_build_error,
709        rng_seed=rng_seed,
710        full=bool(base is None),
711    )
712
713    if only_list_steps:
714        steps: list[dict] = []
715        for filtered_check in presubmit.apply_filters(program):
716            step = {
717                'name': filtered_check.name,
718                'paths': [str(x) for x in filtered_check.paths],
719            }
720            substeps = filtered_check.check.substeps()
721            if len(substeps) > 1:
722                step['substeps'] = [x.name for x in substeps]
723            steps.append(step)
724
725        list_steps_data = {
726            'steps': steps,
727            'all_files': [str(x) for x in all_files],
728        }
729        json.dump(list_steps_data, sys.stdout, indent=2)
730        sys.stdout.write('\n')
731        return True
732
733    if not isinstance(program, Program):
734        program = Program('', program)
735
736    return presubmit.run(program, keep_going, substep=substep, dry_run=dry_run)
737
738
739def _make_str_tuple(value: Iterable[str] | str) -> tuple[str, ...]:
740    return tuple([value] if isinstance(value, str) else value)
741
742
743def check(*args, **kwargs):
744    """Turn a function into a presubmit check.
745
746    Args:
747        *args: Passed through to function.
748        *kwargs: Passed through to function.
749
750    If only one argument is provided and it's a function, this function acts
751    as a decorator and creates a Check from the function. Example of this kind
752    of usage:
753
754    @check
755    def pragma_once(ctx: PresubmitContext):
756        pass
757
758    Otherwise, save the arguments, and return a decorator that turns a function
759    into a Check, but with the arguments added onto the Check constructor.
760    Example of this kind of usage:
761
762    @check(name='pragma_twice')
763    def pragma_once(ctx: PresubmitContext):
764        pass
765    """
766    if (
767        len(args) == 1
768        and isinstance(args[0], types.FunctionType)
769        and not kwargs
770    ):
771        # Called as a regular decorator.
772        return Check(args[0])
773
774    def decorator(check_function):
775        return Check(check_function, *args, **kwargs)
776
777    return decorator
778
779
780@dataclasses.dataclass
781class SubStep:
782    name: str | None
783    _func: Callable[..., PresubmitResult]
784    args: Sequence[Any] = ()
785    kwargs: dict[str, Any] = dataclasses.field(default_factory=lambda: {})
786
787    def __call__(self, ctx: PresubmitContext) -> PresubmitResult:
788        if self.name:
789            _LOG.info('%s', self.name)
790        return self._func(ctx, *self.args, **self.kwargs)
791
792
793class Check:
794    """Wraps a presubmit check function.
795
796    This class consolidates the logic for running and logging a presubmit check.
797    It also supports filtering the paths passed to the presubmit check.
798    """
799
800    def __init__(
801        self,
802        check: (  # pylint: disable=redefined-outer-name
803            Callable | Iterable[SubStep]
804        ),
805        path_filter: FileFilter = FileFilter(),
806        always_run: bool = True,
807        name: str | None = None,
808        doc: str | None = None,
809    ) -> None:
810        # Since Check wraps a presubmit function, adopt that function's name.
811        self.name: str = ''
812        self.doc: str = ''
813        if isinstance(check, Check):
814            self.name = check.name
815            self.doc = check.doc
816        elif callable(check):
817            self.name = check.__name__
818            self.doc = check.__doc__ or ''
819
820        if name:
821            self.name = name
822        if doc:
823            self.doc = doc
824
825        if not self.name:
826            raise ValueError('no name for step')
827
828        self._substeps_raw: Iterable[SubStep]
829        if isinstance(check, collections.abc.Iterator):
830            self._substeps_raw = check
831        else:
832            assert callable(check)
833            _ensure_is_valid_presubmit_check_function(check)
834            self._substeps_raw = iter((SubStep(None, check),))
835        self._substeps_saved: Sequence[SubStep] = ()
836
837        self.filter = path_filter
838        self.always_run: bool = always_run
839
840        self._is_presubmit_check_object = True
841
842    def substeps(self) -> Sequence[SubStep]:
843        """Return the SubSteps of the current step.
844
845        This is where the list of SubSteps is actually evaluated. It can't be
846        evaluated in the constructor because the Iterable passed into the
847        constructor might not be ready yet.
848        """
849        if not self._substeps_saved:
850            self._substeps_saved = tuple(self._substeps_raw)
851        return self._substeps_saved
852
853    def __repr__(self):
854        # This returns just the name so it's easy to show the entire list of
855        # steps with '--help'.
856        return self.name
857
858    def unfiltered(self) -> Check:
859        """Create a new check identical to this one, but without the filter."""
860        clone = copy.copy(self)
861        clone.filter = FileFilter()
862        return clone
863
864    def with_filter(
865        self,
866        *,
867        endswith: Iterable[str] = (),
868        exclude: Iterable[Pattern[str] | str] = (),
869    ) -> Check:
870        """Create a new check identical to this one, but with extra filters.
871
872        Add to the existing filter, perhaps to exclude an additional directory.
873
874        Args:
875            endswith: Passed through to FileFilter.
876            exclude: Passed through to FileFilter.
877
878        Returns a new check.
879        """
880        return self.with_file_filter(
881            FileFilter(endswith=_make_str_tuple(endswith), exclude=exclude)
882        )
883
884    def with_file_filter(self, file_filter: FileFilter) -> Check:
885        """Create a new check identical to this one, but with extra filters.
886
887        Add to the existing filter, perhaps to exclude an additional directory.
888
889        Args:
890            file_filter: Additional filter rules.
891
892        Returns a new check.
893        """
894        clone = copy.copy(self)
895        if clone.filter:
896            clone.filter.exclude = clone.filter.exclude + file_filter.exclude
897            clone.filter.endswith = clone.filter.endswith + file_filter.endswith
898            clone.filter.name = file_filter.name or clone.filter.name
899            clone.filter.suffix = clone.filter.suffix + file_filter.suffix
900        else:
901            clone.filter = file_filter
902        return clone
903
904    def run(
905        self,
906        ctx: PresubmitContext,
907        count: int,
908        total: int,
909        substep: str | None = None,
910    ) -> PresubmitResult:
911        """Runs the presubmit check on the provided paths."""
912
913        _print_ui(
914            _box(
915                _CHECK_UPPER,
916                f'{count}/{total}',
917                self.name,
918                plural(ctx.paths, "file"),
919            )
920        )
921
922        substep_part = f'.{substep}' if substep else ''
923        _LOG.debug(
924            '[%d/%d] Running %s%s on %s',
925            count,
926            total,
927            self.name,
928            substep_part,
929            plural(ctx.paths, "file"),
930        )
931
932        start_time_s = time.time()
933        result: PresubmitResult
934        if substep:
935            result = self.run_substep(ctx, substep)
936        else:
937            result = self(ctx)
938        time_str = _format_time(time.time() - start_time_s)
939        _LOG.debug('%s %s', self.name, result.value)
940
941        if ctx.dry_run:
942            log_check_traces(ctx)
943
944        _print_ui(
945            _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str)
946        )
947        _LOG.debug('%s duration:%s', self.name, time_str)
948
949        return result
950
951    def _try_call(
952        self,
953        func: Callable,
954        ctx,
955        *args,
956        **kwargs,
957    ) -> PresubmitResult:
958        try:
959            result = func(ctx, *args, **kwargs)
960            if ctx.failed:
961                return PresubmitResult.FAIL
962            if isinstance(result, PresubmitResult):
963                return result
964            return PresubmitResult.PASS
965
966        except PresubmitFailure as failure:
967            if str(failure):
968                _LOG.warning('%s', failure)
969            return PresubmitResult.FAIL
970
971        except Exception as _failure:  # pylint: disable=broad-except
972            _LOG.exception('Presubmit check %s failed!', self.name)
973            return PresubmitResult.FAIL
974
975        except KeyboardInterrupt:
976            _print_ui()
977            return PresubmitResult.CANCEL
978
979    def run_substep(
980        self, ctx: PresubmitContext, name: str | None
981    ) -> PresubmitResult:
982        for substep in self.substeps():
983            if substep.name == name:
984                return substep(ctx)
985
986        expected = ', '.join(repr(s.name) for s in self.substeps())
987        raise LookupError(f'bad substep name: {name!r} (expected: {expected})')
988
989    def __call__(self, ctx: PresubmitContext) -> PresubmitResult:
990        """Calling a Check calls its underlying substeps directly.
991
992        This makes it possible to call functions wrapped by @filter_paths. The
993        prior filters are ignored, so new filters may be applied.
994        """
995        result: PresubmitResult
996        for substep in self.substeps():
997            result = self._try_call(substep, ctx)
998            if result and result != PresubmitResult.PASS:
999                return result
1000        return PresubmitResult.PASS
1001
1002
1003def _required_args(function: Callable) -> Iterable[Parameter]:
1004    """Returns the required arguments for a function."""
1005    optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD
1006
1007    for param in signature(function).parameters.values():
1008        if param.default is param.empty and param.kind not in optional_types:
1009            yield param
1010
1011
1012def _ensure_is_valid_presubmit_check_function(chk: Callable) -> None:
1013    """Checks if a Callable can be used as a presubmit check."""
1014    try:
1015        required_args = tuple(_required_args(chk))
1016    except (TypeError, ValueError):
1017        raise TypeError(
1018            'Presubmit checks must be callable, but '
1019            f'{chk!r} is a {type(chk).__name__}'
1020        )
1021
1022    if len(required_args) != 1:
1023        raise TypeError(
1024            f'Presubmit check functions must have exactly one required '
1025            f'positional argument (the PresubmitContext), but '
1026            f'{chk.__name__} has {len(required_args)} required arguments'
1027            + (
1028                f' ({", ".join(a.name for a in required_args)})'
1029                if required_args
1030                else ''
1031            )
1032        )
1033
1034
1035def filter_paths(
1036    *,
1037    endswith: Iterable[str] = (),
1038    exclude: Iterable[Pattern[str] | str] = (),
1039    file_filter: FileFilter | None = None,
1040    always_run: bool = False,
1041) -> Callable[[Callable], Check]:
1042    """Decorator for filtering the paths list for a presubmit check function.
1043
1044    Path filters only apply when the function is used as a presubmit check.
1045    Filters are ignored when the functions are called directly. This makes it
1046    possible to reuse functions wrapped in @filter_paths in other presubmit
1047    checks, potentially with different path filtering rules.
1048
1049    Args:
1050        endswith: str or iterable of path endings to include
1051        exclude: regular expressions of paths to exclude
1052        file_filter: FileFilter used to select files
1053        always_run: Run check even when no files match
1054    Returns:
1055        a wrapped version of the presubmit function
1056    """
1057
1058    if file_filter:
1059        real_file_filter = file_filter
1060        if endswith or exclude:
1061            raise ValueError(
1062                'Must specify either file_filter or '
1063                'endswith/exclude args, not both'
1064            )
1065    else:
1066        # TODO: b/238426363 - Remove these arguments and use FileFilter only.
1067        real_file_filter = FileFilter(
1068            endswith=_make_str_tuple(endswith), exclude=exclude
1069        )
1070
1071    def filter_paths_for_function(function: Callable):
1072        return Check(function, real_file_filter, always_run=always_run)
1073
1074    return filter_paths_for_function
1075
1076
1077def call(
1078    *args, call_annotation: dict[Any, Any] | None = None, **kwargs
1079) -> None:
1080    """Optional subprocess wrapper that causes a PresubmitFailure on errors."""
1081    ctx = PRESUBMIT_CONTEXT.get()
1082    if ctx:
1083        call_annotation = call_annotation if call_annotation else {}
1084        ctx.append_check_command(
1085            *args, call_annotation=call_annotation, **kwargs
1086        )
1087        if ctx.dry_run:
1088            return
1089
1090    attributes, command = tools.format_command(args, kwargs)
1091    _LOG.debug('[RUN] %s\n%s', attributes, command)
1092
1093    tee = kwargs.pop('tee', None)
1094    propagate_sigterm = kwargs.pop('propagate_sigterm', False)
1095
1096    env = pw_cli.env.pigweed_environment()
1097    kwargs.setdefault('stdout', subprocess.PIPE)
1098    kwargs.setdefault('stderr', subprocess.STDOUT)
1099
1100    process = subprocess.Popen(args, **kwargs)
1101    assert process.stdout
1102
1103    # Set up signal handler if requested.
1104    signaled = False
1105    if propagate_sigterm:
1106
1107        def signal_handler(_signal_number: int, _stack_frame: Any) -> None:
1108            nonlocal signaled
1109            signaled = True
1110            process.terminate()
1111
1112        previous_signal_handler = signal.signal(signal.SIGTERM, signal_handler)
1113
1114    if env.PW_PRESUBMIT_DISABLE_SUBPROCESS_CAPTURE:
1115        while True:
1116            line = process.stdout.readline().decode(errors='backslashreplace')
1117            if not line:
1118                break
1119            _LOG.info(line.rstrip())
1120            if tee:
1121                tee.write(line)
1122
1123    stdout, _ = process.communicate()
1124    if tee:
1125        tee.write(stdout.decode(errors='backslashreplace'))
1126
1127    logfunc = _LOG.warning if process.returncode else _LOG.debug
1128    logfunc('[FINISHED]\n%s', command)
1129    logfunc(
1130        '[RESULT] %s with return code %d',
1131        'Failed' if process.returncode else 'Passed',
1132        process.returncode,
1133    )
1134    if stdout:
1135        logfunc('[OUTPUT]\n%s', stdout.decode(errors='backslashreplace'))
1136
1137    if propagate_sigterm:
1138        signal.signal(signal.SIGTERM, previous_signal_handler)
1139        if signaled:
1140            _LOG.warning('Exiting due to SIGTERM.')
1141            sys.exit(1)
1142
1143    if process.returncode:
1144        raise PresubmitFailure
1145
1146
1147def install_package(
1148    ctx: FormatContext | PresubmitContext,
1149    name: str,
1150    force: bool = False,
1151) -> None:
1152    """Install package with given name in given path."""
1153    root = ctx.package_root
1154    mgr = package_manager.PackageManager(root)
1155
1156    ctx.append_check_command(
1157        'pw',
1158        'package',
1159        'install',
1160        name,
1161        call_annotation={'pw_package_install': name},
1162    )
1163    if ctx.dry_run:
1164        return
1165
1166    if not mgr.list():
1167        raise PresubmitFailure(
1168            'no packages configured, please import your pw_package '
1169            'configuration module'
1170        )
1171
1172    if not mgr.status(name) or force:
1173        mgr.install(name, force=force)
1174