• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Helpful commands for working with a Git repository."""
15
16from datetime import datetime
17import itertools
18import logging
19from pathlib import Path
20import re
21import shlex
22import subprocess
23from typing import Collection, Iterable, Pattern
24
25from pw_cli.plural import plural
26from pw_cli.tool_runner import ToolRunner
27
28_LOG = logging.getLogger(__name__)
29
30TRACKING_BRANCH_ALIAS = '@{upstream}'
31_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}'
32_NON_TRACKING_FALLBACK = 'HEAD~10'
33
34
35class GitError(Exception):
36    """A Git-raised exception."""
37
38    def __init__(
39        self, args: Iterable[str], message: str, returncode: int
40    ) -> None:
41        super().__init__(f'`git {shlex.join(args)}` failed: {message}')
42        self.returncode = returncode
43
44
45class _GitTool:
46    def __init__(self, tool_runner: ToolRunner, working_dir: Path) -> None:
47        self._run_tool = tool_runner
48        self._working_dir = working_dir
49
50    def __call__(self, *args, **kwargs) -> str:
51        cmd = ('-C', str(self._working_dir), *args)
52        proc = self._run_tool(tool='git', args=cmd, **kwargs)
53
54        if proc.returncode != 0:
55            if not proc.stderr:
56                err = '(no output)'
57            else:
58                err = proc.stderr.decode().strip()
59            raise GitError((str(s) for s in cmd), err, proc.returncode)
60
61        return '' if not proc.stdout else proc.stdout.decode().strip()
62
63
64class GitRepo:
65    """Represents a checked out Git repository that may be queried for info."""
66
67    def __init__(self, root: Path, tool_runner: ToolRunner):
68        self._root = root.resolve()
69        self._git = _GitTool(tool_runner, self._root)
70
71    def tracking_branch(
72        self,
73        fallback: str | None = None,
74    ) -> str | None:
75        """Returns the tracking branch of the current branch.
76
77        Since most callers of this function can safely handle a return value of
78        None, suppress exceptions and return None if there is no tracking
79        branch.
80
81        Returns:
82          the remote tracking branch name or None if there is none
83        """
84
85        # This command should only error out if there's no upstream branch set.
86        try:
87            return self._git(
88                'rev-parse',
89                '--abbrev-ref',
90                '--symbolic-full-name',
91                TRACKING_BRANCH_ALIAS,
92            )
93
94        except GitError:
95            return fallback
96
97    def current_branch(self) -> str | None:
98        """Returns the current branch, or None if it cannot be determined."""
99        try:
100            return self._git('rev-parse', '--abbrev-ref', 'HEAD')
101        except GitError:
102            return None
103
104    def _ls_files(self, pathspecs: Collection[Path | str]) -> Iterable[Path]:
105        """Returns results of git ls-files as absolute paths."""
106        for file in self._git('ls-files', '--', *pathspecs).splitlines():
107            full_path = self._root / file
108            # Modified submodules will show up as directories and should be
109            # ignored.
110            if full_path.is_file():
111                yield full_path
112
113    def _diff_names(
114        self, commit: str, pathspecs: Collection[Path | str]
115    ) -> Iterable[Path]:
116        """Returns paths of files changed since the specified commit.
117
118        All returned paths are absolute file paths.
119        """
120        for file in self._git(
121            'diff',
122            '--name-only',
123            '--diff-filter=d',
124            commit,
125            '--',
126            *pathspecs,
127        ).splitlines():
128            full_path = self._root / file
129            # Modified submodules will show up as directories and should be
130            # ignored.
131            if full_path.is_file():
132                yield full_path
133
134    def list_files(
135        self,
136        commit: str | None = None,
137        pathspecs: Collection[Path | str] = (),
138    ) -> list[Path]:
139        """Lists files modified since the specified commit.
140
141        If ``commit`` is not found in the current repo, all files in the
142        repository are listed.
143
144        Arugments:
145            commit: The Git hash to start from when listing modified files
146            pathspecs: Git pathspecs use when filtering results
147
148        Returns:
149            A sorted list of absolute paths.
150        """
151
152        if commit in _TRACKING_BRANCH_ALIASES:
153            commit = self.tracking_branch(fallback=_NON_TRACKING_FALLBACK)
154
155        if commit:
156            try:
157                return sorted(self._diff_names(commit, pathspecs))
158            except GitError:
159                _LOG.warning(
160                    'Error comparing with base revision %s of %s, listing all '
161                    'files instead of just changed files',
162                    commit,
163                    self._root,
164                )
165
166        return sorted(self._ls_files(pathspecs))
167
168    def has_uncommitted_changes(self) -> bool:
169        """Returns True if this Git repo has uncommitted changes in it.
170
171        Note: This does not check for untracked files.
172
173        Returns:
174            True if the Git repo has uncommitted changes in it.
175        """
176
177        # Refresh the Git index so that the diff-index command will be accurate.
178        # The `git update-index` command isn't reliable when run in parallel
179        # with other processes that may touch files in the repo directory, so
180        # retry a few times before giving up. The hallmark of this failure mode
181        # is the lack of an error message on stderr, so if we see something
182        # there we can assume it's some other issue and raise.
183        retries = 6
184        for i in range(retries):
185            try:
186                self._git(
187                    'update-index',
188                    '-q',
189                    '--refresh',
190                    pw_presubmit_ignore_dry_run=True,
191                )
192            except subprocess.CalledProcessError as err:
193                if err.stderr or i == retries - 1:
194                    raise
195                continue
196
197        try:
198            self._git(
199                'diff-index',
200                '--quiet',
201                'HEAD',
202                '--',
203                pw_presubmit_ignore_dry_run=True,
204            )
205        except GitError as err:
206            # diff-index exits with 1 if there are uncommitted changes.
207            if err.returncode == 1:
208                return True
209
210            # Unexpected error.
211            raise
212
213        return False
214
215    def root(self) -> Path:
216        """The root file path of this Git repository.
217
218        Returns:
219            The repository root as an absolute path.
220        """
221        return self._root
222
223    def list_submodules(
224        self, excluded_paths: Collection[Pattern | str] = ()
225    ) -> list[Path]:
226        """Query Git and return a list of submodules in the current project.
227
228        Arguments:
229            excluded_paths: Pattern or string that match submodules that should
230                not be returned. All matches are done on posix-style paths
231                relative to the project root.
232
233        Returns:
234            List of "Path"s which were found but not excluded. All paths are
235            absolute.
236        """
237        discovery_report = self._git(
238            'submodule',
239            'foreach',
240            '--quiet',
241            '--recursive',
242            'echo $toplevel/$sm_path',
243        )
244        module_dirs = [Path(line) for line in discovery_report.split()]
245
246        for exclude in excluded_paths:
247            if isinstance(exclude, Pattern):
248                for module_dir in reversed(module_dirs):
249                    if exclude.fullmatch(
250                        module_dir.relative_to(self._root).as_posix()
251                    ):
252                        module_dirs.remove(module_dir)
253            else:
254                for module_dir in reversed(module_dirs):
255                    print(f'not regex: {exclude}')
256                    if exclude == module_dir.relative_to(self._root).as_posix():
257                        module_dirs.remove(module_dir)
258
259        return module_dirs
260
261    def commit_message(self, commit: str = 'HEAD') -> str:
262        """Returns the commit message of the specified commit.
263
264        Defaults to ``HEAD`` if no commit specified.
265
266        Returns:
267            Commit message contents as a string.
268        """
269        return self._git('log', '--format=%B', '-n1', commit)
270
271    def commit_author(self, commit: str = 'HEAD') -> str:
272        """Returns the author of the specified commit.
273
274        Defaults to ``HEAD`` if no commit specified.
275
276        Returns:
277            Commit author as a string.
278        """
279        return self._git('log', '--format=%ae', '-n1', commit)
280
281    def commit_date(self, commit: str = 'HEAD') -> datetime:
282        """Returns the datetime of the specified commit.
283
284        Defaults to ``HEAD`` if no commit specified.
285
286        Returns:
287            Commit datetime as a datetime object.
288        """
289        return datetime.fromisoformat(
290            self._git('log', '--format=%aI', '-n1', commit)
291        )
292
293    def commit_hash(
294        self,
295        commit: str = 'HEAD',
296        short: bool = True,
297    ) -> str:
298        """Returns the hash associated with the specified commit.
299
300        Defaults to ``HEAD`` if no commit specified.
301
302        Returns:
303            Commit hash as a string.
304        """
305        args = ['rev-parse']
306        if short:
307            args += ['--short']
308        args += [commit]
309        return self._git(*args)
310
311    def commit_change_id(self, commit: str = 'HEAD') -> str | None:
312        """Returns the Gerrit Change-Id of the specified commit.
313
314        Defaults to ``HEAD`` if no commit specified.
315
316        Returns:
317            Change-Id as a string, or ``None`` if it does not exist.
318        """
319        message = self.commit_message(commit)
320        regex = re.compile(
321            'Change-Id: (I[a-fA-F0-9]+)',
322            re.MULTILINE,
323        )
324        match = regex.search(message)
325        return match.group(1) if match else None
326
327    def commit_parents(self, commit: str = 'HEAD') -> list[str]:
328        args = ['log', '--pretty=%P', '-n', '1', commit]
329        return self._git(*args).split()
330
331    def diff(self, *args) -> str:
332        return self._git('diff', *args)
333
334
335class GitRepoFinder:
336    """An efficient way to map files to the repo that tracks them (if any).
337
338    This class is optimized to minimize subprocess calls to git so that many
339    file paths can efficiently be mapped to their parent repo.
340    """
341
342    def __init__(self, tool_runner: ToolRunner):
343        self.tool_runner = tool_runner
344        # A dictionary mapping an absolute path to a directory to the
345        # absolute path of the owning repo (if any).
346        self._known_repo_roots: dict[Path, Path | None] = {}
347        self.repos: dict[Path | None, GitRepo | None] = {None: None}
348
349    def _add_known_repo_path(
350        self, repo: Path | None, path_in_repo: Path
351    ) -> None:
352        path_to_add = (
353            path_in_repo.resolve()
354            if not repo
355            else repo.joinpath(path_in_repo).resolve()
356        )
357        self._known_repo_roots[path_to_add] = repo
358
359    def _repo_is_known(self, path: Path) -> bool:
360        return path.resolve() in self._known_repo_roots
361
362    def find_git_repo(self, path_in_repo: Path | str) -> GitRepo | None:
363        """Finds the git repo that contains this pathspec.
364
365        Returns:
366            A GitRepo if the file is enclosed by a Git repository, otherwise
367            returns None.
368        """
369        path = Path(path_in_repo)
370        search_from = path if path.is_dir() else path.parent
371        if not search_from.exists():
372            raise ValueError(
373                f"Can't find parent repo of `{path_in_repo}`, "
374                "path does not exist"
375            )
376
377        if not self._repo_is_known(search_from):
378            try:
379                git_tool = _GitTool(
380                    self.tool_runner,
381                    search_from,
382                )
383                root = Path(
384                    git_tool(
385                        'rev-parse',
386                        '--show-toplevel',
387                    )
388                )
389                # Now that we found the absolute path root, we know every
390                # directory between the repo root and the query are owned
391                # by that repo. For example:
392                #   query: bar/baz_subrepo/my_dir/nested/b.txt
393                #   cwd: /dev/null/foo_repo/
394                #   root: /dev/null/foo_repo/bar/baz_subrepo
395                #   parents (relative to root):
396                #     my_dir/nested
397                #     my_dir
398                #   new known git paths:
399                #     /dev/null/foo_repo/bar/baz_subrepo/my_dir/nested
400                #     /dev/null/foo_repo/bar/baz_subrepo/my_dir
401                #     /dev/null/foo_repo/bar/baz_subrepo
402                subpath = search_from.resolve().relative_to(root)
403                for parent in itertools.chain([subpath], subpath.parents):
404                    if self._repo_is_known(root.joinpath(parent)):
405                        break
406                    self._add_known_repo_path(root, root.joinpath(parent))
407
408                if root not in self.repos:
409                    self.repos[root] = GitRepo(root, self.tool_runner)
410
411                return self.repos[root]
412
413            except GitError:
414                for parent in itertools.chain(
415                    [search_from], search_from.parents
416                ):
417                    self._add_known_repo_path(None, search_from)
418
419            return None
420
421        return self.repos[self._known_repo_roots[search_from.resolve()]]
422
423    def make_pathspec_relative(
424        self, pathspec: Path | str
425    ) -> tuple[GitRepo | None, str]:
426        """Finds the root repo of a pathspec, and then relativizes the pathspec.
427
428        Example: Assuming a repo at `external/foo_repo/` and a pathspec of
429        `external/foo_repo/ba*`, returns a GitRepo at `external/foo_repo` and
430        a relativized pathspec of `ba*`.
431
432        Args:
433            pathspec: The pathspec to relativize.
434        Returns:
435            The GitRepo of the pathspec and the pathspec relative to the parent
436            repo's root as a tuple. If the pathspec is not tracked by a repo,
437            the GitRepo is None and the pathspec is returned as-is.
438        """
439        repo = self.find_git_repo(pathspec)
440
441        if repo is None:
442            return None, str(pathspec)
443
444        if Path(pathspec).is_absolute():
445            relative_pattern = Path(pathspec).relative_to(repo.root())
446        else:
447            # Don't resolve(), we don't want to follow symlinks.
448            logical_absolute = Path.cwd() / Path(pathspec)
449            relative_pattern = Path(logical_absolute).relative_to(repo.root())
450
451        # Sometimes the effective pathspec is empty because it matches the root
452        # directory of a repo.
453        if not relative_pattern:
454            return repo, str(Path('.'))
455
456        return repo, str(relative_pattern)
457
458
459def find_git_repo(path_in_repo: Path, tool_runner: ToolRunner) -> GitRepo:
460    """Tries to find the root of the Git repo that owns ``path_in_repo``.
461
462    Raises:
463        GitError: The specified path does not live in a Git repository.
464
465    Returns:
466        A GitRepo representing the the enclosing repository that tracks the
467        specified file or folder.
468    """
469    git_tool = _GitTool(
470        tool_runner,
471        path_in_repo if path_in_repo.is_dir() else path_in_repo.parent,
472    )
473    root = Path(
474        git_tool(
475            'rev-parse',
476            '--show-toplevel',
477        )
478    )
479
480    return GitRepo(root, tool_runner)
481
482
483def is_in_git_repo(p: Path, tool_runner: ToolRunner) -> bool:
484    """Returns true if the specified path is tracked by a Git repository.
485
486    Returns:
487        True if the specified file or folder is tracked by a Git repository.
488    """
489    try:
490        find_git_repo(p, tool_runner)
491    except GitError:
492        return False
493
494    return True
495
496
497def _describe_constraints(
498    repo: GitRepo,
499    working_dir: Path,
500    commit: str | None,
501    pathspecs: Collection[Path | str],
502    exclude: Collection[Pattern[str]],
503) -> Iterable[str]:
504    if not repo.root().samefile(working_dir):
505        yield (
506            'under the '
507            f'{working_dir.resolve().relative_to(repo.root().resolve())}'
508            ' subdirectory'
509        )
510
511    if commit in _TRACKING_BRANCH_ALIASES:
512        commit = repo.tracking_branch()
513        if commit is None:
514            _LOG.warning(
515                'Attempted to list files changed since the remote tracking '
516                'branch, but the repo is not tracking a branch'
517            )
518
519    if commit:
520        yield f'that have changed since {commit}'
521
522    if pathspecs:
523        paths_str = ', '.join(str(p) for p in pathspecs)
524        yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
525
526    if exclude:
527        yield (
528            f'that do not match {plural(exclude, "pattern")} ('
529            + ', '.join(p.pattern for p in exclude)
530            + ')'
531        )
532
533
534def describe_git_pattern(
535    working_dir: Path,
536    commit: str | None,
537    pathspecs: Collection[Path | str],
538    exclude: Collection[Pattern],
539    tool_runner: ToolRunner,
540    project_root: Path | None = None,
541) -> str:
542    """Provides a description for a set of files in a Git repo.
543
544    Example:
545
546        files in the pigweed repo
547        - that have changed since origin/main..HEAD
548        - that do not match 7 patterns (...)
549
550    The unit tests for this function are the source of truth for the expected
551    output.
552
553    Returns:
554        A multi-line string with descriptive information about the provided
555        Git pathspecs.
556    """
557    repo = find_git_repo(working_dir, tool_runner)
558    constraints = list(
559        _describe_constraints(repo, working_dir, commit, pathspecs, exclude)
560    )
561
562    name = repo.root().name
563    if project_root and project_root != repo.root():
564        name = str(repo.root().relative_to(project_root))
565
566    if not constraints:
567        return f'all files in the {name} repo'
568
569    msg = f'files in the {name} repo'
570    if len(constraints) == 1:
571        return f'{msg} {constraints[0]}'
572
573    return msg + ''.join(f'\n    - {line}' for line in constraints)
574