1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Tools for running presubmit checks in a Git repository. 15 16Presubmit checks are defined as a function or other callable. The function may 17take either no arguments or a list of the paths on which to run. Presubmit 18checks communicate failure by raising any exception. 19 20For example, either of these functions may be used as presubmit checks: 21 22 @pw_presubmit.filter_paths(endswith='.py') 23 def file_contains_ni(ctx: PresubmitContext): 24 for path in ctx.paths: 25 with open(path) as file: 26 contents = file.read() 27 if 'ni' not in contents and 'nee' not in contents: 28 raise PresumitFailure('Files must say "ni"!', path=path) 29 30 def run_the_build(): 31 subprocess.run(['make', 'release'], check=True) 32 33Presubmit checks that accept a list of paths may use the filter_paths decorator 34to automatically filter the paths list for file types they care about. See the 35pragma_once function for an example. 36 37See pigweed_presbumit.py for an example of how to define presubmit checks. 38""" 39 40import collections 41import contextlib 42import dataclasses 43import enum 44from inspect import Parameter, signature 45import itertools 46import logging 47import os 48from pathlib import Path 49import re 50import subprocess 51import time 52from typing import (Callable, Collection, Dict, Iterable, Iterator, List, 53 NamedTuple, Optional, Pattern, Sequence, Set, Tuple, Union) 54 55from pw_presubmit import git_repo, tools 56from pw_presubmit.tools import plural 57 58_LOG: logging.Logger = logging.getLogger(__name__) 59 60color_red = tools.make_color(31) 61color_bold_red = tools.make_color(31, 1) 62color_black_on_red = tools.make_color(30, 41) 63color_yellow = tools.make_color(33, 1) 64color_green = tools.make_color(32) 65color_black_on_green = tools.make_color(30, 42) 66color_aqua = tools.make_color(36) 67color_bold_white = tools.make_color(37, 1) 68 69_SUMMARY_BOX = '══╦╗ ║║══╩╝' 70_CHECK_UPPER = '━━━┓ ' 71_CHECK_LOWER = ' ━━━┛' 72 73WIDTH = 80 74 75_LEFT = 7 76_RIGHT = 11 77 78 79def _title(msg, style=_SUMMARY_BOX) -> str: 80 msg = f' {msg} '.center(WIDTH - 2) 81 return tools.make_box('^').format(*style, section1=msg, width1=len(msg)) 82 83 84def _format_time(time_s: float) -> str: 85 minutes, seconds = divmod(time_s, 60) 86 if minutes < 60: 87 return f' {int(minutes)}:{seconds:04.1f}' 88 hours, minutes = divmod(minutes, 60) 89 return f'{int(hours):d}:{int(minutes):02}:{int(seconds):02}' 90 91 92def _box(style, left, middle, right, box=tools.make_box('><>')) -> str: 93 return box.format(*style, 94 section1=left + ('' if left.endswith(' ') else ' '), 95 width1=_LEFT, 96 section2=' ' + middle, 97 width2=WIDTH - _LEFT - _RIGHT - 4, 98 section3=right + ' ', 99 width3=_RIGHT) 100 101 102class PresubmitFailure(Exception): 103 """Optional exception to use for presubmit failures.""" 104 def __init__(self, description: str = '', path=None): 105 super().__init__(f'{path}: {description}' if path else description) 106 107 108class _Result(enum.Enum): 109 110 PASS = 'PASSED' # Check completed successfully. 111 FAIL = 'FAILED' # Check failed. 112 CANCEL = 'CANCEL' # Check didn't complete. 113 114 def colorized(self, width: int, invert: bool = False) -> str: 115 if self is _Result.PASS: 116 color = color_black_on_green if invert else color_green 117 elif self is _Result.FAIL: 118 color = color_black_on_red if invert else color_red 119 elif self is _Result.CANCEL: 120 color = color_yellow 121 else: 122 color = lambda value: value 123 124 padding = (width - len(self.value)) // 2 * ' ' 125 return padding + color(self.value) + padding 126 127 128class Program(collections.abc.Sequence): 129 """A sequence of presubmit checks; basically a tuple with a name.""" 130 def __init__(self, name: str, steps: Iterable[Callable]): 131 self.name = name 132 self._steps = tuple(tools.flatten(steps)) 133 134 def __getitem__(self, i): 135 return self._steps[i] 136 137 def __len__(self): 138 return len(self._steps) 139 140 def __str__(self): 141 return self.name 142 143 def title(self): 144 return f'{self.name if self.name else ""} presubmit checks'.strip() 145 146 147class Programs(collections.abc.Mapping): 148 """A mapping of presubmit check programs. 149 150 Use is optional. Helpful when managing multiple presubmit check programs. 151 """ 152 def __init__(self, **programs: Sequence): 153 """Initializes a name: program mapping from the provided keyword args. 154 155 A program is a sequence of presubmit check functions. The sequence may 156 contain nested sequences, which are flattened. 157 """ 158 self._programs: Dict[str, Program] = { 159 name: Program(name, checks) 160 for name, checks in programs.items() 161 } 162 163 def all_steps(self) -> Dict[str, Callable]: 164 return {c.__name__: c for c in itertools.chain(*self.values())} 165 166 def __getitem__(self, item: str) -> Program: 167 return self._programs[item] 168 169 def __iter__(self) -> Iterator[str]: 170 return iter(self._programs) 171 172 def __len__(self) -> int: 173 return len(self._programs) 174 175 176@dataclasses.dataclass(frozen=True) 177class PresubmitContext: 178 """Context passed into presubmit checks.""" 179 root: Path 180 repos: Tuple[Path, ...] 181 output_dir: Path 182 paths: Tuple[Path, ...] 183 package_root: Path 184 185 def relative_paths(self, start: Optional[Path] = None) -> Tuple[Path, ...]: 186 return tuple( 187 tools.relative_paths(self.paths, start if start else self.root)) 188 189 def paths_by_repo(self) -> Dict[Path, List[Path]]: 190 repos = collections.defaultdict(list) 191 192 for path in self.paths: 193 repos[git_repo.root(path)].append(path) 194 195 return repos 196 197 198class _Filter(NamedTuple): 199 endswith: Tuple[str, ...] = ('', ) 200 exclude: Tuple[Pattern[str], ...] = () 201 202 def matches(self, path: str) -> bool: 203 return (any(path.endswith(end) for end in self.endswith) 204 and not any(exp.search(path) for exp in self.exclude)) 205 206 207def _print_ui(*args) -> None: 208 """Prints to stdout and flushes to stay in sync with logs on stderr.""" 209 print(*args, flush=True) 210 211 212class Presubmit: 213 """Runs a series of presubmit checks on a list of files.""" 214 def __init__(self, root: Path, repos: Sequence[Path], 215 output_directory: Path, paths: Sequence[Path], 216 package_root: Path): 217 self._root = root.resolve() 218 self._repos = tuple(repos) 219 self._output_directory = output_directory.resolve() 220 self._paths = tuple(paths) 221 self._relative_paths = tuple( 222 tools.relative_paths(self._paths, self._root)) 223 self._package_root = package_root.resolve() 224 225 def run(self, program: Program, keep_going: bool = False) -> bool: 226 """Executes a series of presubmit checks on the paths.""" 227 228 checks = self._apply_filters(program) 229 230 _LOG.debug('Running %s for %s', program.title(), self._root.name) 231 _print_ui(_title(f'{self._root.name}: {program.title()}')) 232 233 _LOG.info('%d of %d checks apply to %s in %s', len(checks), 234 len(program), plural(self._paths, 'file'), self._root) 235 236 _print_ui() 237 for line in tools.file_summary(self._relative_paths): 238 _print_ui(line) 239 _print_ui() 240 241 if not self._paths: 242 _print_ui(color_yellow('No files are being checked!')) 243 244 _LOG.debug('Checks:\n%s', '\n'.join(c.name for c, _ in checks)) 245 246 start_time: float = time.time() 247 passed, failed, skipped = self._execute_checks(checks, keep_going) 248 self._log_summary(time.time() - start_time, passed, failed, skipped) 249 250 return not failed and not skipped 251 252 def _apply_filters( 253 self, program: Sequence[Callable] 254 ) -> List[Tuple['_Check', Sequence[Path]]]: 255 """Returns list of (check, paths) for checks that should run.""" 256 checks = [c if isinstance(c, _Check) else _Check(c) for c in program] 257 filter_to_checks: Dict[_Filter, 258 List[_Check]] = collections.defaultdict(list) 259 260 for check in checks: 261 filter_to_checks[check.filter].append(check) 262 263 check_to_paths = self._map_checks_to_paths(filter_to_checks) 264 return [(c, check_to_paths[c]) for c in checks if c in check_to_paths] 265 266 def _map_checks_to_paths( 267 self, filter_to_checks: Dict[_Filter, List['_Check']] 268 ) -> Dict['_Check', Sequence[Path]]: 269 checks_to_paths: Dict[_Check, Sequence[Path]] = {} 270 271 posix_paths = tuple(p.as_posix() for p in self._relative_paths) 272 273 for filt, checks in filter_to_checks.items(): 274 filtered_paths = tuple( 275 path for path, filter_path in zip(self._paths, posix_paths) 276 if filt.matches(filter_path)) 277 278 for check in checks: 279 if filtered_paths or check.always_run: 280 checks_to_paths[check] = filtered_paths 281 else: 282 _LOG.debug('Skipping "%s": no relevant files', check.name) 283 284 return checks_to_paths 285 286 def _log_summary(self, time_s: float, passed: int, failed: int, 287 skipped: int) -> None: 288 summary_items = [] 289 if passed: 290 summary_items.append(f'{passed} passed') 291 if failed: 292 summary_items.append(f'{failed} failed') 293 if skipped: 294 summary_items.append(f'{skipped} not run') 295 summary = ', '.join(summary_items) or 'nothing was done' 296 297 result = _Result.FAIL if failed or skipped else _Result.PASS 298 total = passed + failed + skipped 299 300 _LOG.debug('Finished running %d checks on %s in %.1f s', total, 301 plural(self._paths, 'file'), time_s) 302 _LOG.debug('Presubmit checks %s: %s', result.value, summary) 303 304 _print_ui( 305 _box( 306 _SUMMARY_BOX, result.colorized(_LEFT, invert=True), 307 f'{total} checks on {plural(self._paths, "file")}: {summary}', 308 _format_time(time_s))) 309 310 @contextlib.contextmanager 311 def _context(self, name: str, paths: Tuple[Path, ...]): 312 # There are many characters banned from filenames on Windows. To 313 # simplify things, just strip everything that's not a letter, digit, 314 # or underscore. 315 sanitized_name = re.sub(r'[\W_]+', '_', name).lower() 316 output_directory = self._output_directory.joinpath(sanitized_name) 317 os.makedirs(output_directory, exist_ok=True) 318 319 handler = logging.FileHandler(output_directory.joinpath('step.log'), 320 mode='w') 321 handler.setLevel(logging.DEBUG) 322 323 try: 324 _LOG.addHandler(handler) 325 326 yield PresubmitContext( 327 root=self._root, 328 repos=self._repos, 329 output_dir=output_directory, 330 paths=paths, 331 package_root=self._package_root, 332 ) 333 334 finally: 335 _LOG.removeHandler(handler) 336 337 def _execute_checks(self, program, 338 keep_going: bool) -> Tuple[int, int, int]: 339 """Runs presubmit checks; returns (passed, failed, skipped) lists.""" 340 passed = failed = 0 341 342 for i, (check, paths) in enumerate(program, 1): 343 with self._context(check.name, paths) as ctx: 344 result = check.run(ctx, i, len(program)) 345 346 if result is _Result.PASS: 347 passed += 1 348 elif result is _Result.CANCEL: 349 break 350 else: 351 failed += 1 352 if not keep_going: 353 break 354 355 return passed, failed, len(program) - passed - failed 356 357 358def _process_pathspecs(repos: Iterable[Path], 359 pathspecs: Iterable[str]) -> Dict[Path, List[str]]: 360 pathspecs_by_repo: Dict[Path, List[str]] = {repo: [] for repo in repos} 361 repos_with_paths: Set[Path] = set() 362 363 for pathspec in pathspecs: 364 # If the pathspec is a path to an existing file, only use it for the 365 # repo it is in. 366 if os.path.exists(pathspec): 367 # Raise an exception if the path exists but is not in a known repo. 368 repo = git_repo.within_repo(pathspec) 369 if repo not in pathspecs_by_repo: 370 raise ValueError( 371 f'{pathspec} is not in a Git repository in this presubmit') 372 373 # Make the path relative to the repo's root. 374 pathspecs_by_repo[repo].append(os.path.relpath(pathspec, repo)) 375 repos_with_paths.add(repo) 376 else: 377 # Pathspecs that are not paths (e.g. '*.h') are used for all repos. 378 for patterns in pathspecs_by_repo.values(): 379 patterns.append(pathspec) 380 381 # If any paths were specified, only search for paths in those repos. 382 if repos_with_paths: 383 for repo in set(pathspecs_by_repo) - repos_with_paths: 384 del pathspecs_by_repo[repo] 385 386 return pathspecs_by_repo 387 388 389def run(program: Sequence[Callable], 390 root: Path, 391 repos: Collection[Path] = (), 392 base: Optional[str] = None, 393 paths: Sequence[str] = (), 394 exclude: Sequence[Pattern] = (), 395 output_directory: Optional[Path] = None, 396 package_root: Path = None, 397 keep_going: bool = False) -> bool: 398 """Lists files in the current Git repo and runs a Presubmit with them. 399 400 This changes the directory to the root of the Git repository after listing 401 paths, so all presubmit checks can assume they run from there. 402 403 The paths argument contains Git pathspecs. If no pathspecs are provided, all 404 paths in all repos are included. If paths to files or directories are 405 provided, only files within those repositories are searched. Patterns are 406 searched across all repositories. For example, if the pathspecs "my_module/" 407 and "*.h", paths under "my_module/" in the containing repo and paths in all 408 repos matching "*.h" will be included in the presubmit. 409 410 Args: 411 program: list of presubmit check functions to run 412 root: root path of the project 413 repos: paths to the roots of Git repositories to check 414 name: name to use to refer to this presubmit check run 415 base: optional base Git commit to list files against 416 paths: optional list of Git pathspecs to run the checks against 417 exclude: regular expressions for Posix-style paths to exclude 418 output_directory: where to place output files 419 package_root: where to place package files 420 keep_going: whether to continue running checks if an error occurs 421 422 Returns: 423 True if all presubmit checks succeeded 424 """ 425 repos = [repo.resolve() for repo in repos] 426 427 for repo in repos: 428 if git_repo.root(repo) != repo: 429 raise ValueError(f'{repo} is not the root of a Git repo; ' 430 'presubmit checks must be run from a Git repo') 431 432 pathspecs_by_repo = _process_pathspecs(repos, paths) 433 434 files: List[Path] = [] 435 436 for repo, pathspecs in pathspecs_by_repo.items(): 437 files += tools.exclude_paths( 438 exclude, git_repo.list_files(base, pathspecs, repo), root) 439 440 _LOG.info( 441 'Checking %s', 442 git_repo.describe_files(repo, repo, base, pathspecs, exclude)) 443 444 if output_directory is None: 445 output_directory = root / '.presubmit' 446 447 if package_root is None: 448 package_root = output_directory / 'packages' 449 450 presubmit = Presubmit( 451 root=root, 452 repos=repos, 453 output_directory=output_directory, 454 paths=files, 455 package_root=package_root, 456 ) 457 458 if not isinstance(program, Program): 459 program = Program('', program) 460 461 return presubmit.run(program, keep_going) 462 463 464class _Check: 465 """Wraps a presubmit check function. 466 467 This class consolidates the logic for running and logging a presubmit check. 468 It also supports filtering the paths passed to the presubmit check. 469 """ 470 def __init__(self, 471 check_function: Callable, 472 path_filter: _Filter = _Filter(), 473 always_run: bool = True): 474 _ensure_is_valid_presubmit_check_function(check_function) 475 476 self._check: Callable = check_function 477 self.filter: _Filter = path_filter 478 self.always_run: bool = always_run 479 480 # Since _Check wraps a presubmit function, adopt that function's name. 481 self.__name__ = self._check.__name__ 482 483 @property 484 def name(self): 485 return self.__name__ 486 487 def run(self, ctx: PresubmitContext, count: int, total: int) -> _Result: 488 """Runs the presubmit check on the provided paths.""" 489 490 _print_ui( 491 _box(_CHECK_UPPER, f'{count}/{total}', self.name, 492 plural(ctx.paths, "file"))) 493 494 _LOG.debug('[%d/%d] Running %s on %s', count, total, self.name, 495 plural(ctx.paths, "file")) 496 497 start_time_s = time.time() 498 result = self._call_function(ctx) 499 time_str = _format_time(time.time() - start_time_s) 500 _LOG.debug('%s %s', self.name, result.value) 501 502 _print_ui( 503 _box(_CHECK_LOWER, result.colorized(_LEFT), self.name, time_str)) 504 _LOG.debug('%s duration:%s', self.name, time_str) 505 506 return result 507 508 def _call_function(self, ctx: PresubmitContext) -> _Result: 509 try: 510 self._check(ctx) 511 except PresubmitFailure as failure: 512 if str(failure): 513 _LOG.warning('%s', failure) 514 return _Result.FAIL 515 except Exception as failure: # pylint: disable=broad-except 516 _LOG.exception('Presubmit check %s failed!', self.name) 517 return _Result.FAIL 518 except KeyboardInterrupt: 519 _print_ui() 520 return _Result.CANCEL 521 522 return _Result.PASS 523 524 def __call__(self, ctx: PresubmitContext, *args, **kwargs): 525 """Calling a _Check calls its underlying function directly. 526 527 This makes it possible to call functions wrapped by @filter_paths. The 528 prior filters are ignored, so new filters may be applied. 529 """ 530 return self._check(ctx, *args, **kwargs) 531 532 533def _required_args(function: Callable) -> Iterable[Parameter]: 534 """Returns the required arguments for a function.""" 535 optional_types = Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD 536 537 for param in signature(function).parameters.values(): 538 if param.default is param.empty and param.kind not in optional_types: 539 yield param 540 541 542def _ensure_is_valid_presubmit_check_function(check: Callable) -> None: 543 """Checks if a Callable can be used as a presubmit check.""" 544 try: 545 required_args = tuple(_required_args(check)) 546 except (TypeError, ValueError): 547 raise TypeError('Presubmit checks must be callable, but ' 548 f'{check!r} is a {type(check).__name__}') 549 550 if len(required_args) != 1: 551 raise TypeError( 552 f'Presubmit check functions must have exactly one required ' 553 f'positional argument (the PresubmitContext), but ' 554 f'{check.__name__} has {len(required_args)} required arguments' + 555 (f' ({", ".join(a.name for a in required_args)})' 556 if required_args else '')) 557 558 559def _make_str_tuple(value: Iterable[str]) -> Tuple[str, ...]: 560 return tuple([value] if isinstance(value, str) else value) 561 562 563def filter_paths(endswith: Iterable[str] = (''), 564 exclude: Iterable[Union[Pattern[str], str]] = (), 565 always_run: bool = False) -> Callable[[Callable], _Check]: 566 """Decorator for filtering the paths list for a presubmit check function. 567 568 Path filters only apply when the function is used as a presubmit check. 569 Filters are ignored when the functions are called directly. This makes it 570 possible to reuse functions wrapped in @filter_paths in other presubmit 571 checks, potentially with different path filtering rules. 572 573 Args: 574 endswith: str or iterable of path endings to include 575 exclude: regular expressions of paths to exclude 576 577 Returns: 578 a wrapped version of the presubmit function 579 """ 580 def filter_paths_for_function(function: Callable): 581 return _Check(function, 582 _Filter(_make_str_tuple(endswith), 583 tuple(re.compile(e) for e in exclude)), 584 always_run=always_run) 585 586 return filter_paths_for_function 587 588 589@filter_paths(endswith='.h', exclude=(r'\.pb\.h$', )) 590def pragma_once(ctx: PresubmitContext) -> None: 591 """Presubmit check that ensures all header files contain '#pragma once'.""" 592 593 for path in ctx.paths: 594 with open(path) as file: 595 for line in file: 596 if line.startswith('#pragma once'): 597 break 598 else: 599 raise PresubmitFailure('#pragma once is missing!', path=path) 600 601 602def call(*args, **kwargs) -> None: 603 """Optional subprocess wrapper that causes a PresubmitFailure on errors.""" 604 attributes, command = tools.format_command(args, kwargs) 605 _LOG.debug('[RUN] %s\n%s', attributes, command) 606 607 process = subprocess.run(args, 608 stdout=subprocess.PIPE, 609 stderr=subprocess.STDOUT, 610 **kwargs) 611 logfunc = _LOG.warning if process.returncode else _LOG.debug 612 613 logfunc('[FINISHED]\n%s', command) 614 logfunc('[RESULT] %s with return code %d', 615 'Failed' if process.returncode else 'Passed', process.returncode) 616 617 output = process.stdout.decode(errors='backslashreplace') 618 if output: 619 logfunc('[OUTPUT]\n%s', output) 620 621 if process.returncode: 622 raise PresubmitFailure 623