1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Helpful commands for working with a Git repository.""" 15 16from datetime import datetime 17import itertools 18import logging 19from pathlib import Path 20import re 21import shlex 22import subprocess 23from typing import Collection, Iterable, Pattern 24 25from pw_cli.plural import plural 26from pw_cli.tool_runner import ToolRunner 27 28_LOG = logging.getLogger(__name__) 29 30TRACKING_BRANCH_ALIAS = '@{upstream}' 31_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}' 32_NON_TRACKING_FALLBACK = 'HEAD~10' 33 34 35class GitError(Exception): 36 """A Git-raised exception.""" 37 38 def __init__( 39 self, args: Iterable[str], message: str, returncode: int 40 ) -> None: 41 super().__init__(f'`git {shlex.join(args)}` failed: {message}') 42 self.returncode = returncode 43 44 45class _GitTool: 46 def __init__(self, tool_runner: ToolRunner, working_dir: Path) -> None: 47 self._run_tool = tool_runner 48 self._working_dir = working_dir 49 50 def __call__(self, *args, **kwargs) -> str: 51 cmd = ('-C', str(self._working_dir), *args) 52 proc = self._run_tool(tool='git', args=cmd, **kwargs) 53 54 if proc.returncode != 0: 55 if not proc.stderr: 56 err = '(no output)' 57 else: 58 err = proc.stderr.decode().strip() 59 raise GitError((str(s) for s in cmd), err, proc.returncode) 60 61 return '' if not proc.stdout else proc.stdout.decode().strip() 62 63 64class GitRepo: 65 """Represents a checked out Git repository that may be queried for info.""" 66 67 def __init__(self, root: Path, tool_runner: ToolRunner): 68 self._root = root.resolve() 69 self._git = _GitTool(tool_runner, self._root) 70 71 def tracking_branch( 72 self, 73 fallback: str | None = None, 74 ) -> str | None: 75 """Returns the tracking branch of the current branch. 76 77 Since most callers of this function can safely handle a return value of 78 None, suppress exceptions and return None if there is no tracking 79 branch. 80 81 Returns: 82 the remote tracking branch name or None if there is none 83 """ 84 85 # This command should only error out if there's no upstream branch set. 86 try: 87 return self._git( 88 'rev-parse', 89 '--abbrev-ref', 90 '--symbolic-full-name', 91 TRACKING_BRANCH_ALIAS, 92 ) 93 94 except GitError: 95 return fallback 96 97 def current_branch(self) -> str | None: 98 """Returns the current branch, or None if it cannot be determined.""" 99 try: 100 return self._git('rev-parse', '--abbrev-ref', 'HEAD') 101 except GitError: 102 return None 103 104 def _ls_files(self, pathspecs: Collection[Path | str]) -> Iterable[Path]: 105 """Returns results of git ls-files as absolute paths.""" 106 for file in self._git('ls-files', '--', *pathspecs).splitlines(): 107 full_path = self._root / file 108 # Modified submodules will show up as directories and should be 109 # ignored. 110 if full_path.is_file(): 111 yield full_path 112 113 def _diff_names( 114 self, commit: str, pathspecs: Collection[Path | str] 115 ) -> Iterable[Path]: 116 """Returns paths of files changed since the specified commit. 117 118 All returned paths are absolute file paths. 119 """ 120 for file in self._git( 121 'diff', 122 '--name-only', 123 '--diff-filter=d', 124 commit, 125 '--', 126 *pathspecs, 127 ).splitlines(): 128 full_path = self._root / file 129 # Modified submodules will show up as directories and should be 130 # ignored. 131 if full_path.is_file(): 132 yield full_path 133 134 def list_files( 135 self, 136 commit: str | None = None, 137 pathspecs: Collection[Path | str] = (), 138 ) -> list[Path]: 139 """Lists files modified since the specified commit. 140 141 If ``commit`` is not found in the current repo, all files in the 142 repository are listed. 143 144 Arugments: 145 commit: The Git hash to start from when listing modified files 146 pathspecs: Git pathspecs use when filtering results 147 148 Returns: 149 A sorted list of absolute paths. 150 """ 151 152 if commit in _TRACKING_BRANCH_ALIASES: 153 commit = self.tracking_branch(fallback=_NON_TRACKING_FALLBACK) 154 155 if commit: 156 try: 157 return sorted(self._diff_names(commit, pathspecs)) 158 except GitError: 159 _LOG.warning( 160 'Error comparing with base revision %s of %s, listing all ' 161 'files instead of just changed files', 162 commit, 163 self._root, 164 ) 165 166 return sorted(self._ls_files(pathspecs)) 167 168 def has_uncommitted_changes(self) -> bool: 169 """Returns True if this Git repo has uncommitted changes in it. 170 171 Note: This does not check for untracked files. 172 173 Returns: 174 True if the Git repo has uncommitted changes in it. 175 """ 176 177 # Refresh the Git index so that the diff-index command will be accurate. 178 # The `git update-index` command isn't reliable when run in parallel 179 # with other processes that may touch files in the repo directory, so 180 # retry a few times before giving up. The hallmark of this failure mode 181 # is the lack of an error message on stderr, so if we see something 182 # there we can assume it's some other issue and raise. 183 retries = 6 184 for i in range(retries): 185 try: 186 self._git( 187 'update-index', 188 '-q', 189 '--refresh', 190 pw_presubmit_ignore_dry_run=True, 191 ) 192 except subprocess.CalledProcessError as err: 193 if err.stderr or i == retries - 1: 194 raise 195 continue 196 197 try: 198 self._git( 199 'diff-index', 200 '--quiet', 201 'HEAD', 202 '--', 203 pw_presubmit_ignore_dry_run=True, 204 ) 205 except GitError as err: 206 # diff-index exits with 1 if there are uncommitted changes. 207 if err.returncode == 1: 208 return True 209 210 # Unexpected error. 211 raise 212 213 return False 214 215 def root(self) -> Path: 216 """The root file path of this Git repository. 217 218 Returns: 219 The repository root as an absolute path. 220 """ 221 return self._root 222 223 def list_submodules( 224 self, excluded_paths: Collection[Pattern | str] = () 225 ) -> list[Path]: 226 """Query Git and return a list of submodules in the current project. 227 228 Arguments: 229 excluded_paths: Pattern or string that match submodules that should 230 not be returned. All matches are done on posix-style paths 231 relative to the project root. 232 233 Returns: 234 List of "Path"s which were found but not excluded. All paths are 235 absolute. 236 """ 237 discovery_report = self._git( 238 'submodule', 239 'foreach', 240 '--quiet', 241 '--recursive', 242 'echo $toplevel/$sm_path', 243 ) 244 module_dirs = [Path(line) for line in discovery_report.split()] 245 246 for exclude in excluded_paths: 247 if isinstance(exclude, Pattern): 248 for module_dir in reversed(module_dirs): 249 if exclude.fullmatch( 250 module_dir.relative_to(self._root).as_posix() 251 ): 252 module_dirs.remove(module_dir) 253 else: 254 for module_dir in reversed(module_dirs): 255 print(f'not regex: {exclude}') 256 if exclude == module_dir.relative_to(self._root).as_posix(): 257 module_dirs.remove(module_dir) 258 259 return module_dirs 260 261 def commit_message(self, commit: str = 'HEAD') -> str: 262 """Returns the commit message of the specified commit. 263 264 Defaults to ``HEAD`` if no commit specified. 265 266 Returns: 267 Commit message contents as a string. 268 """ 269 return self._git('log', '--format=%B', '-n1', commit) 270 271 def commit_author(self, commit: str = 'HEAD') -> str: 272 """Returns the author of the specified commit. 273 274 Defaults to ``HEAD`` if no commit specified. 275 276 Returns: 277 Commit author as a string. 278 """ 279 return self._git('log', '--format=%ae', '-n1', commit) 280 281 def commit_date(self, commit: str = 'HEAD') -> datetime: 282 """Returns the datetime of the specified commit. 283 284 Defaults to ``HEAD`` if no commit specified. 285 286 Returns: 287 Commit datetime as a datetime object. 288 """ 289 return datetime.fromisoformat( 290 self._git('log', '--format=%aI', '-n1', commit) 291 ) 292 293 def commit_hash( 294 self, 295 commit: str = 'HEAD', 296 short: bool = True, 297 ) -> str: 298 """Returns the hash associated with the specified commit. 299 300 Defaults to ``HEAD`` if no commit specified. 301 302 Returns: 303 Commit hash as a string. 304 """ 305 args = ['rev-parse'] 306 if short: 307 args += ['--short'] 308 args += [commit] 309 return self._git(*args) 310 311 def commit_change_id(self, commit: str = 'HEAD') -> str | None: 312 """Returns the Gerrit Change-Id of the specified commit. 313 314 Defaults to ``HEAD`` if no commit specified. 315 316 Returns: 317 Change-Id as a string, or ``None`` if it does not exist. 318 """ 319 message = self.commit_message(commit) 320 regex = re.compile( 321 'Change-Id: (I[a-fA-F0-9]+)', 322 re.MULTILINE, 323 ) 324 match = regex.search(message) 325 return match.group(1) if match else None 326 327 def commit_parents(self, commit: str = 'HEAD') -> list[str]: 328 args = ['log', '--pretty=%P', '-n', '1', commit] 329 return self._git(*args).split() 330 331 def diff(self, *args) -> str: 332 return self._git('diff', *args) 333 334 335class GitRepoFinder: 336 """An efficient way to map files to the repo that tracks them (if any). 337 338 This class is optimized to minimize subprocess calls to git so that many 339 file paths can efficiently be mapped to their parent repo. 340 """ 341 342 def __init__(self, tool_runner: ToolRunner): 343 self.tool_runner = tool_runner 344 # A dictionary mapping an absolute path to a directory to the 345 # absolute path of the owning repo (if any). 346 self._known_repo_roots: dict[Path, Path | None] = {} 347 self.repos: dict[Path | None, GitRepo | None] = {None: None} 348 349 def _add_known_repo_path( 350 self, repo: Path | None, path_in_repo: Path 351 ) -> None: 352 path_to_add = ( 353 path_in_repo.resolve() 354 if not repo 355 else repo.joinpath(path_in_repo).resolve() 356 ) 357 self._known_repo_roots[path_to_add] = repo 358 359 def _repo_is_known(self, path: Path) -> bool: 360 return path.resolve() in self._known_repo_roots 361 362 def find_git_repo(self, path_in_repo: Path | str) -> GitRepo | None: 363 """Finds the git repo that contains this pathspec. 364 365 Returns: 366 A GitRepo if the file is enclosed by a Git repository, otherwise 367 returns None. 368 """ 369 path = Path(path_in_repo) 370 search_from = path if path.is_dir() else path.parent 371 if not search_from.exists(): 372 raise ValueError( 373 f"Can't find parent repo of `{path_in_repo}`, " 374 "path does not exist" 375 ) 376 377 if not self._repo_is_known(search_from): 378 try: 379 git_tool = _GitTool( 380 self.tool_runner, 381 search_from, 382 ) 383 root = Path( 384 git_tool( 385 'rev-parse', 386 '--show-toplevel', 387 ) 388 ) 389 # Now that we found the absolute path root, we know every 390 # directory between the repo root and the query are owned 391 # by that repo. For example: 392 # query: bar/baz_subrepo/my_dir/nested/b.txt 393 # cwd: /dev/null/foo_repo/ 394 # root: /dev/null/foo_repo/bar/baz_subrepo 395 # parents (relative to root): 396 # my_dir/nested 397 # my_dir 398 # new known git paths: 399 # /dev/null/foo_repo/bar/baz_subrepo/my_dir/nested 400 # /dev/null/foo_repo/bar/baz_subrepo/my_dir 401 # /dev/null/foo_repo/bar/baz_subrepo 402 subpath = search_from.resolve().relative_to(root) 403 for parent in itertools.chain([subpath], subpath.parents): 404 if self._repo_is_known(root.joinpath(parent)): 405 break 406 self._add_known_repo_path(root, root.joinpath(parent)) 407 408 if root not in self.repos: 409 self.repos[root] = GitRepo(root, self.tool_runner) 410 411 return self.repos[root] 412 413 except GitError: 414 for parent in itertools.chain( 415 [search_from], search_from.parents 416 ): 417 self._add_known_repo_path(None, search_from) 418 419 return None 420 421 return self.repos[self._known_repo_roots[search_from.resolve()]] 422 423 def make_pathspec_relative( 424 self, pathspec: Path | str 425 ) -> tuple[GitRepo | None, str]: 426 """Finds the root repo of a pathspec, and then relativizes the pathspec. 427 428 Example: Assuming a repo at `external/foo_repo/` and a pathspec of 429 `external/foo_repo/ba*`, returns a GitRepo at `external/foo_repo` and 430 a relativized pathspec of `ba*`. 431 432 Args: 433 pathspec: The pathspec to relativize. 434 Returns: 435 The GitRepo of the pathspec and the pathspec relative to the parent 436 repo's root as a tuple. If the pathspec is not tracked by a repo, 437 the GitRepo is None and the pathspec is returned as-is. 438 """ 439 repo = self.find_git_repo(pathspec) 440 441 if repo is None: 442 return None, str(pathspec) 443 444 if Path(pathspec).is_absolute(): 445 relative_pattern = Path(pathspec).relative_to(repo.root()) 446 else: 447 # Don't resolve(), we don't want to follow symlinks. 448 logical_absolute = Path.cwd() / Path(pathspec) 449 relative_pattern = Path(logical_absolute).relative_to(repo.root()) 450 451 # Sometimes the effective pathspec is empty because it matches the root 452 # directory of a repo. 453 if not relative_pattern: 454 return repo, str(Path('.')) 455 456 return repo, str(relative_pattern) 457 458 459def find_git_repo(path_in_repo: Path, tool_runner: ToolRunner) -> GitRepo: 460 """Tries to find the root of the Git repo that owns ``path_in_repo``. 461 462 Raises: 463 GitError: The specified path does not live in a Git repository. 464 465 Returns: 466 A GitRepo representing the the enclosing repository that tracks the 467 specified file or folder. 468 """ 469 git_tool = _GitTool( 470 tool_runner, 471 path_in_repo if path_in_repo.is_dir() else path_in_repo.parent, 472 ) 473 root = Path( 474 git_tool( 475 'rev-parse', 476 '--show-toplevel', 477 ) 478 ) 479 480 return GitRepo(root, tool_runner) 481 482 483def is_in_git_repo(p: Path, tool_runner: ToolRunner) -> bool: 484 """Returns true if the specified path is tracked by a Git repository. 485 486 Returns: 487 True if the specified file or folder is tracked by a Git repository. 488 """ 489 try: 490 find_git_repo(p, tool_runner) 491 except GitError: 492 return False 493 494 return True 495 496 497def _describe_constraints( 498 repo: GitRepo, 499 working_dir: Path, 500 commit: str | None, 501 pathspecs: Collection[Path | str], 502 exclude: Collection[Pattern[str]], 503) -> Iterable[str]: 504 if not repo.root().samefile(working_dir): 505 yield ( 506 'under the ' 507 f'{working_dir.resolve().relative_to(repo.root().resolve())}' 508 ' subdirectory' 509 ) 510 511 if commit in _TRACKING_BRANCH_ALIASES: 512 commit = repo.tracking_branch() 513 if commit is None: 514 _LOG.warning( 515 'Attempted to list files changed since the remote tracking ' 516 'branch, but the repo is not tracking a branch' 517 ) 518 519 if commit: 520 yield f'that have changed since {commit}' 521 522 if pathspecs: 523 paths_str = ', '.join(str(p) for p in pathspecs) 524 yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})' 525 526 if exclude: 527 yield ( 528 f'that do not match {plural(exclude, "pattern")} (' 529 + ', '.join(p.pattern for p in exclude) 530 + ')' 531 ) 532 533 534def describe_git_pattern( 535 working_dir: Path, 536 commit: str | None, 537 pathspecs: Collection[Path | str], 538 exclude: Collection[Pattern], 539 tool_runner: ToolRunner, 540 project_root: Path | None = None, 541) -> str: 542 """Provides a description for a set of files in a Git repo. 543 544 Example: 545 546 files in the pigweed repo 547 - that have changed since origin/main..HEAD 548 - that do not match 7 patterns (...) 549 550 The unit tests for this function are the source of truth for the expected 551 output. 552 553 Returns: 554 A multi-line string with descriptive information about the provided 555 Git pathspecs. 556 """ 557 repo = find_git_repo(working_dir, tool_runner) 558 constraints = list( 559 _describe_constraints(repo, working_dir, commit, pathspecs, exclude) 560 ) 561 562 name = repo.root().name 563 if project_root and project_root != repo.root(): 564 name = str(repo.root().relative_to(project_root)) 565 566 if not constraints: 567 return f'all files in the {name} repo' 568 569 msg = f'files in the {name} repo' 570 if len(constraints) == 1: 571 return f'{msg} {constraints[0]}' 572 573 return msg + ''.join(f'\n - {line}' for line in constraints) 574