1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tools for running presubmit checks in a Git repository. 15 16Presubmit checks are defined as a function or other callable. The function may 17take either no arguments or a list of the paths on which to run. Presubmit 18checks communicate failure by raising any exception. 19 20For example, either of these functions may be used as presubmit checks: 21 22 @pw_presubmit.filter_paths(endswith='.py') 23 def file_contains_ni(ctx: PresubmitContext): 24 for path in ctx.paths: 25 with open(path) as file: 26 contents = file.read() 27 if 'ni' not in contents and 'nee' not in contents: 28 raise PresumitFailure('Files must say "ni"!', path=path) 29 30 def run_the_build(): 31 subprocess.run(['make', 'release'], check=True) 32 33Presubmit checks that accept a list of paths may use the filter_paths decorator 34to automatically filter the paths list for file types they care about. See the 35pragma_once function for an example. 36 37See pigweed_presbumit.py for an example of how to define presubmit checks. 38""" 39 40from __future__ import annotations 41 42import collections 43import contextlib 44import dataclasses 45import enum 46from inspect import Parameter, signature 47import itertools 48import logging 49import os 50from pathlib import Path 51import re 52import subprocess 53import time 54from typing import (Callable, Collection, Dict, Iterable, Iterator, List, 55 NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union) 56 57import pw_cli.env 58from pw_presubmit import git_repo, tools 59from pw_presubmit.tools import plural 60 61_LOG: logging.Logger = logging.getLogger(__name__) 62 63color_red = tools.make_color(31) 64color_bold_red = tools.make_color(31, 1) 65color_black_on_red = tools.make_color(30, 41) 66color_yellow = tools.make_color(33, 1) 67color_green = tools.make_color(32) 68color_black_on_green = tools.make_color(30, 42) 69color_aqua = tools.make_color(36) 70color_bold_white = tools.make_color(37, 1) 71 72_SUMMARY_BOX = '══╦╗ ║║══╩╝' 73_CHECK_UPPER = '━━━┓ ' 74_CHECK_LOWER = ' ━━━┛' 75 76WIDTH = 80 77 78_LEFT = 7 79_RIGHT = 11 80 81 82def _title(msg, style=_SUMMARY_BOX) -> str: 83 msg = f' {msg} '.center(WIDTH - 2) 84 return tools.make_box('^').format(*style, section1=msg, width1=len(msg)) 85 86 87def _format_time(time_s: float) -> str: 88 minutes, seconds = divmod(time_s, 60) 89 if minutes < 60: 90 return f' {int(minutes)}:{seconds:04.1f}' 91 hours, minutes = divmod(minutes, 60) 92 return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}' 93 94 95def _box(style, left, middle, right, box=tools.make_box('><>')) -> str: 96 return box.format(*style, 97 section1=left + ('' if left.endswith(' ') else ' '), 98 width1=_LEFT, 99 section2=' ' + middle, 100 width2=WIDTH - _LEFT - _RIGHT - 4, 101 section3=right + ' ', 102 width3=_RIGHT) 103 104 105class PresubmitFailure(Exception): 106 """Optional exception to use for presubmit failures.""" 107 def __init__(self, description: str = '', path=None): 108 super().__init__(f'{path}: {description}' if path else description) 109 110 111class _Result(enum.Enum): 112 113 PASS = 'PASSED' # Check completed successfully. 114 FAIL = 'FAILED' # Check failed. 115 CANCEL = 'CANCEL' # Check didn't complete. 116 117 def colorized(self, width: int, invert: bool = False) -> str: 118 if self is _Result.PASS: 119 color = color_black_on_green if invert else color_green 120 elif self is _Result.FAIL: 121 color = color_black_on_red if invert else color_red 122 elif self is _Result.CANCEL: 123 color = color_yellow 124 else: 125 color = lambda value: value 126 127 padding = (width - len(self.value)) // 2 * ' ' 128 return padding + color(self.value) + padding 129 130 131class Program(collections.abc.Sequence): 132 """A sequence of presubmit checks; basically a tuple with a name.""" 133 def __init__(self, name: str, steps: Iterable[Callable]): 134 self.name = name 135 self._steps = tuple({s: None for s in tools.flatten(steps)}) 136 137 def __getitem__(self, i): 138 return self._steps[i] 139 140 def __len__(self): 141 return len(self._steps) 142 143 def __str__(self): 144 return self.name 145 146 def title(self): 147 return f'{self.name if self.name else ""} presubmit checks'.strip() 148 149 150class Programs(collections.abc.Mapping): 151 """A mapping of presubmit check programs. 152 153 Use is optional. Helpful when managing multiple presubmit check programs. 154 """ 155 def __init__(self, **programs: Sequence): 156 """Initializes a name: program mapping from the provided keyword args. 157 158 A program is a sequence of presubmit check functions. The sequence may 159 contain nested sequences, which are flattened. 160 """ 161 self._programs: Dict[str, Program] = { 162 name: Program(name, checks) 163 for name, checks in programs.items() 164 } 165 166 def all_steps(self) -> Dict[str, Callable]: 167 return {c.__name__: c for c in itertools.chain(*self.values())} 168 169 def __getitem__(self, item: str) -> Program: 170 return self._programs[item] 171 172 def __iter__(self) -> Iterator[str]: 173 return iter(self._programs) 174 175 def __len__(self) -> int: 176 return len(self._programs) 177 178 179@dataclasses.dataclass(frozen=True) 180class PresubmitContext: 181 """Context passed into presubmit checks.""" 182 root: Path 183 repos: Tuple[Path, ...] 184 output_dir: Path 185 paths: Tuple[Path, ...] 186 package_root: Path 187 188 def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]: 189 return tuple( 190 tools.relative_paths(self.paths, start if start else self.root)) 191 192 def paths_by_repo(self) -> Dict[Path, List[Path]]: 193 repos = collections.defaultdict(list) 194 195 for path in self.paths: 196 repos[git_repo.root(path)].append(path) 197 198 return repos 199 200 201class _Filter(NamedTuple): 202 endswith: Tuple[str, ...] = ('', ) 203 exclude: Tuple[Pattern[str], ...] = () 204 205 def matches(self, path: str) -> bool: 206 return (any(path.endswith(end) for end in self.endswith) 207 and not any(exp.search(path) for exp in self.exclude)) 208 209 210def _print_ui(*args) -> None: 211 """Prints to stdout and flushes to stay in sync with logs on stderr.""" 212 print(*args, flush=True) 213 214 215class Presubmit: 216 """Runs a series of presubmit checks on a list of files.""" 217 def __init__(self, root: Path, repos: Sequence[Path], 218 output_directory: Path, paths: Sequence[Path], 219 package_root: Path): 220 self._root = root.resolve() 221 self._repos = tuple(repos) 222 self._output_directory = output_directory.resolve() 223 self._paths = tuple(paths) 224 self._relative_paths = tuple( 225 tools.relative_paths(self._paths, self._root)) 226 self._package_root = package_root.resolve() 227 228 def run(self, program: Program, keep_going: bool = False) -> bool: 229 """Executes a series of presubmit checks on the paths.""" 230 231 checks = self.apply_filters(program) 232 233 _LOG.debug('Running %s for %s', program.title(), self._root.name) 234 _print_ui(_title(f'{self._root.name}: {program.title()}')) 235 236 _LOG.info('%d of %d checks apply to %s in %s', len(checks), 237 len(program), plural(self._paths, 'file'), self._root) 238 239 _print_ui() 240 for line in tools.file_summary(self._relative_paths): 241 _print_ui(line) 242 _print_ui() 243 244 if not self._paths: 245 _print_ui(color_yellow('No files are being checked!')) 246 247 _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks)) 248 249 start_time: float = time.time() 250 passed, failed, skipped = self._execute_checks(checks, keep_going) 251 self._log_summary(time.time() - start_time, passed, failed, skipped) 252 253 return not failed and not skipped 254 255 def apply_filters( 256 self, 257 program: Sequence[Callable]) -> List[Tuple[Check, Sequence[Path]]]: 258 """Returns list of (check, paths) for checks that should run.""" 259 checks = [c if isinstance(c, Check) else Check(c) for c in program] 260 filter_to_checks: Dict[_Filter, 261 List[Check]] = collections.defaultdict(list) 262 263 for check in checks: 264 filter_to_checks[check.filter].append(check) 265 266 check_to_paths = self._map_checks_to_paths(filter_to_checks) 267 return [(c, check_to_paths[c]) for c in checks if c in check_to_paths] 268 269 def _map_checks_to_paths( 270 self, filter_to_checks: Dict[_Filter, List[Check]] 271 ) -> Dict[Check, Sequence[Path]]: 272 checks_to_paths: Dict[Check, Sequence[Path]] = {} 273 274 posix_paths = tuple(p.as_posix() for p in self._relative_paths) 275 276 for filt, checks in filter_to_checks.items(): 277 filtered_paths = tuple( 278 path for path, filter_path in zip(self._paths, posix_paths) 279 if filt.matches(filter_path)) 280 281 for check in checks: 282 if filtered_paths or check.always_run: 283 checks_to_paths[check] = filtered_paths 284 else: 285 _LOG.debug('Skipping "%s": no relevant files', check.name) 286 287 return checks_to_paths 288 289 def _log_summary(self, time_s: float, passed: int, failed: int, 290 skipped: int) -> None: 291 summary_items = [] 292 if passed: 293 summary_items.append(f'{passed} passed') 294 if failed: 295 summary_items.append(f'{failed} failed') 296 if skipped: 297 summary_items.append(f'{skipped} not run') 298 summary = ', '.join(summary_items) or 'nothing was done' 299 300 result = _Result.FAIL if failed or skipped else _Result.PASS 301 total = passed + failed + skipped 302 303 _LOG.debug('Finished running %d checks on %s in %.1f s', total, 304 plural(self._paths, 'file'), time_s) 305 _LOG.debug('Presubmit checks %s: %s', result.value, summary) 306 307 _print_ui( 308 _box( 309 _SUMMARY_BOX, result.colorized(_LEFT, invert=True), 310 f'{total} checks on {plural(self._paths, "file")}: {summary}', 311 _format_time(time_s))) 312 313 @contextlib.contextmanager 314 def _context(self, name: str, paths: Tuple[Path, ...]): 315 # There are many characters banned from filenames on Windows. To 316 # simplify things, just strip everything that's not a letter, digit, 317 # or underscore. 318 sanitized_name = re.sub(r'[\W_]+', '_', name).lower() 319 output_directory = self._output_directory.joinpath(sanitized_name) 320 os.makedirs(output_directory, exist_ok=True) 321 322 handler = logging.FileHandler(output_directory.joinpath('step.log'), 323 mode='w') 324 handler.setLevel(logging.DEBUG) 325 326 try: 327 _LOG.addHandler(handler) 328 329 yield PresubmitContext( 330 root=self._root, 331 repos=self._repos, 332 output_dir=output_directory, 333 paths=paths, 334 package_root=self._package_root, 335 ) 336 337 finally: 338 _LOG.removeHandler(handler) 339 340 def _execute_checks(self, program, 341 keep_going: bool) -> Tuple[int, int, int]: 342 """Runs presubmit checks; returns (passed, failed, skipped) lists.""" 343 passed = failed = 0 344 345 for i, (check, paths) in enumerate(program, 1): 346 with self._context(check.name, paths) as ctx: 347 result = check.run(ctx, i, len(program)) 348 349 if result is _Result.PASS: 350 passed += 1 351 elif result is _Result.CANCEL: 352 break 353 else: 354 failed += 1 355 if not keep_going: 356 break 357 358 return passed, failed, len(program) - passed - failed 359 360 361def _process_pathspecs(repos: Iterable[Path], 362 pathspecs: Iterable[str]) -> Dict[Path, List[str]]: 363 pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos} 364 repos_with_paths: Set[Path] = set() 365 366 for pathspec in pathspecs: 367 # If the pathspec is a path to an existing file, only use it for the 368 # repo it is in. 369 if os.path.exists(pathspec): 370 # Raise an exception if the path exists but is not in a known repo. 371 repo = git_repo.within_repo(pathspec) 372 if repo not in pathspecs_by_repo: 373 raise ValueError( 374 f'{pathspec} is not in a Git repository in this presubmit') 375 376 # Make the path relative to the repo's root. 377 pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo)) 378 repos_with_paths.add(repo) 379 else: 380 # Pathspecs that are not paths (e.g. '*.h') are used for all repos. 381 for patterns in pathspecs_by_repo.values(): 382 patterns.append(pathspec) 383 384 # If any paths were specified, only search for paths in those repos. 385 if repos_with_paths: 386 for repo in set(pathspecs_by_repo) - repos_with_paths: 387 del pathspecs_by_repo[repo] 388 389 return pathspecs_by_repo 390 391 392def run(program: Sequence[Callable], 393 root: Path, 394 repos: Collection[Path] = (), 395 base: Optional[str] = None, 396 paths: Sequence[str] = (), 397 exclude: Sequence[Pattern] = (), 398 output_directory: Optional[Path] = None, 399 package_root: Path = None, 400 only_list_steps: bool = False, 401 keep_going: bool = False) -> bool: 402 """Lists files in the current Git repo and runs a Presubmit with them. 403 404 This changes the directory to the root of the Git repository after listing 405 paths, so all presubmit checks can assume they run from there. 406 407 The paths argument contains Git pathspecs. If no pathspecs are provided, all 408 paths in all repos are included. If paths to files or directories are 409 provided, only files within those repositories are searched. Patterns are 410 searched across all repositories. For example, if the pathspecs "my_module/" 411 and "*.h", paths under "my_module/" in the containing repo and paths in all 412 repos matching "*.h" will be included in the presubmit. 413 414 Args: 415 program: list of presubmit check functions to run 416 root: root path of the project 417 repos: paths to the roots of Git repositories to check 418 name: name to use to refer to this presubmit check run 419 base: optional base Git commit to list files against 420 paths: optional list of Git pathspecs to run the checks against 421 exclude: regular expressions for Posix-style paths to exclude 422 output_directory: where to place output files 423 package_root: where to place package files 424 only_list_steps: print step names instead of running them 425 keep_going: whether to continue running checks if an error occurs 426 427 Returns: 428 True if all presubmit checks succeeded 429 """ 430 repos = [repo.resolve() for repo in repos] 431 432 for repo in repos: 433 if git_repo.root(repo) != repo: 434 raise ValueError(f'{repo} is not the root of a Git repo; ' 435 'presubmit checks must be run from a Git repo') 436 437 pathspecs_by_repo = _process_pathspecs(repos, paths) 438 439 files: List[Path] = [] 440 441 for repo, pathspecs in pathspecs_by_repo.items(): 442 files += tools.exclude_paths( 443 exclude, git_repo.list_files(base, pathspecs, repo), root) 444 445 _LOG.info( 446 'Checking %s', 447 git_repo.describe_files(repo, repo, base, pathspecs, exclude, 448 root)) 449 450 if output_directory is None: 451 output_directory = root / '.presubmit' 452 453 if package_root is None: 454 package_root = output_directory / 'packages' 455 456 presubmit = Presubmit( 457 root=root, 458 repos=repos, 459 output_directory=output_directory, 460 paths=files, 461 package_root=package_root, 462 ) 463 464 if only_list_steps: 465 for check, _ in presubmit.apply_filters(program): 466 print(check.name) 467 return True 468 469 if not isinstance(program, Program): 470 program = Program('', program) 471 472 return presubmit.run(program, keep_going) 473 474 475def _make_str_tuple(value: Union[Iterable[str], str]) -> Tuple[str, ...]: 476 return tuple([value] if isinstance(value, str) else value) 477 478 479class Check: 480 """Wraps a presubmit check function. 481 482 This class consolidates the logic for running and logging a presubmit check. 483 It also supports filtering the paths passed to the presubmit check. 484 """ 485 def __init__(self, 486 check_function: Callable, 487 path_filter: _Filter = _Filter(), 488 always_run: bool = True): 489 _ensure_is_valid_presubmit_check_function(check_function) 490 491 self._check: Callable = check_function 492 self.filter: _Filter = path_filter 493 self.always_run: bool = always_run 494 495 # Since Check wraps a presubmit function, adopt that function's name. 496 self.__name__ = self._check.__name__ 497 498 def with_filter( 499 self, 500 *, 501 endswith: Iterable[str] = '', 502 exclude: Iterable[Union[Pattern[str], str]] = () 503 ) -> Check: 504 endswith = self.filter.endswith 505 if endswith: 506 endswith = endswith + _make_str_tuple(endswith) 507 exclude = self.filter.exclude + tuple(re.compile(e) for e in exclude) 508 509 return Check(check_function=self._check, 510 path_filter=_Filter(endswith=endswith, exclude=exclude), 511 always_run=self.always_run) 512 513 @property 514 def name(self): 515 return self.__name__ 516 517 def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result: 518 """Runs the presubmit check on the provided paths.""" 519 520 _print_ui( 521 _box(_CHECK_UPPER, f'{count}/{total}', self.name, 522 plural(ctx.paths, "file"))) 523 524 _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name, 525 plural(ctx.paths, "file")) 526 527 start_time_s = time.time() 528 result = self._call_function(ctx) 529 time_str = _format_time(time.time() - start_time_s) 530 _LOG.debug('%s %s', self.name, result.value) 531 532 _print_ui( 533 _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str)) 534 _LOG.debug('%s duration:%s', self.name, time_str) 535 536 return result 537 538 def _call_function(self, ctx: PresubmitContext) -> _Result: 539 try: 540 self._check(ctx) 541 except PresubmitFailure as failure: 542 if str(failure): 543 _LOG.warning('%s', failure) 544 return _Result.FAIL 545 except Exception as failure: # pylint: disable=broad-except 546 _LOG.exception('Presubmit check %s failed!', self.name) 547 return _Result.FAIL 548 except KeyboardInterrupt: 549 _print_ui() 550 return _Result.CANCEL 551 552 return _Result.PASS 553 554 def __call__(self, ctx: PresubmitContext, *args, **kwargs): 555 """Calling a Check calls its underlying function directly. 556 557 This makes it possible to call functions wrapped by @filter_paths. The 558 prior filters are ignored, so new filters may be applied. 559 """ 560 return self._check(ctx, *args, **kwargs) 561 562 563def _required_args(function: Callable) -> Iterable[Parameter]: 564 """Returns the required arguments for a function.""" 565 optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD 566 567 for param in signature(function).parameters.values(): 568 if param.default is param.empty and param.kind not in optional_types: 569 yield param 570 571 572def _ensure_is_valid_presubmit_check_function(check: Callable) -> None: 573 """Checks if a Callable can be used as a presubmit check.""" 574 try: 575 required_args = tuple(_required_args(check)) 576 except (TypeError, ValueError): 577 raise TypeError('Presubmit checks must be callable, but ' 578 f'{check!r} is a {type(check).__name__}') 579 580 if len(required_args) != 1: 581 raise TypeError( 582 f'Presubmit check functions must have exactly one required ' 583 f'positional argument (the PresubmitContext), but ' 584 f'{check.__name__} has {len(required_args)} required arguments' + 585 (f' ({", ".join(a.name for a in required_args)})' 586 if required_args else '')) 587 588 589def filter_paths(endswith: Iterable[str] = '', 590 exclude: Iterable[Union[Pattern[str], str]] = (), 591 always_run: bool = False) -> Callable[[Callable], Check]: 592 """Decorator for filtering the paths list for a presubmit check function. 593 594 Path filters only apply when the function is used as a presubmit check. 595 Filters are ignored when the functions are called directly. This makes it 596 possible to reuse functions wrapped in @filter_paths in other presubmit 597 checks, potentially with different path filtering rules. 598 599 Args: 600 endswith: str or iterable of path endings to include 601 exclude: regular expressions of paths to exclude 602 603 Returns: 604 a wrapped version of the presubmit function 605 """ 606 def filter_paths_for_function(function: Callable): 607 return Check(function, 608 _Filter(_make_str_tuple(endswith), 609 tuple(re.compile(e) for e in exclude)), 610 always_run=always_run) 611 612 return filter_paths_for_function 613 614 615def call(*args, **kwargs) -> None: 616 """Optional subprocess wrapper that causes a PresubmitFailure on errors.""" 617 attributes, command = tools.format_command(args, kwargs) 618 _LOG.debug('[RUN] %s\n%s', attributes, command) 619 620 env = pw_cli.env.pigweed_environment() 621 kwargs['stdout'] = subprocess.PIPE 622 kwargs['stderr'] = subprocess.STDOUT 623 624 process = subprocess.Popen(args, **kwargs) 625 assert process.stdout 626 627 if env.PW_PRESUBMIT_DISABLE_SUBPROCESS_CAPTURE: 628 while True: 629 line = process.stdout.readline().decode(errors='backslashreplace') 630 if not line: 631 break 632 _LOG.info(line.rstrip()) 633 634 stdout, _ = process.communicate() 635 636 logfunc = _LOG.warning if process.returncode else _LOG.debug 637 logfunc('[FINISHED]\n%s', command) 638 logfunc('[RESULT] %s with return code %d', 639 'Failed' if process.returncode else 'Passed', process.returncode) 640 if stdout: 641 logfunc('[OUTPUT]\n%s', stdout.decode(errors='backslashreplace')) 642 643 if process.returncode: 644 raise PresubmitFailure 645