• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Helpful commands for working with a Git repository."""
15
16import logging
17from pathlib import Path
18import subprocess
19from typing import Collection, Iterable, List, Optional, Pattern, Union
20
21from pw_presubmit.tools import log_run, plural
22
23_LOG = logging.getLogger(__name__)
24PathOrStr = Union[Path, str]
25PatternOrStr = Union[Pattern, str]
26
27TRACKING_BRANCH_ALIAS = '@{upstream}'
28_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}'
29
30
31def git_stdout(
32    *args: PathOrStr, show_stderr=False, repo: PathOrStr = '.'
33) -> str:
34    return (
35        log_run(
36            ['git', '-C', str(repo), *args],
37            stdout=subprocess.PIPE,
38            stderr=None if show_stderr else subprocess.DEVNULL,
39            check=True,
40        )
41        .stdout.decode()
42        .strip()
43    )
44
45
46def _ls_files(args: Collection[PathOrStr], repo: Path) -> Iterable[Path]:
47    """Returns results of git ls-files as absolute paths."""
48    git_root = repo.resolve()
49    for file in git_stdout('ls-files', '--', *args, repo=repo).splitlines():
50        full_path = git_root / file
51        # Modified submodules will show up as directories and should be ignored.
52        if full_path.is_file():
53            yield full_path
54
55
56def _diff_names(
57    commit: str, pathspecs: Collection[PathOrStr], repo: Path
58) -> Iterable[Path]:
59    """Returns absolute paths of files changed since the specified commit."""
60    git_root = root(repo)
61    for file in git_stdout(
62        'diff',
63        '--name-only',
64        '--diff-filter=d',
65        commit,
66        '--',
67        *pathspecs,
68        repo=repo,
69    ).splitlines():
70        full_path = git_root / file
71        # Modified submodules will show up as directories and should be ignored.
72        if full_path.is_file():
73            yield full_path
74
75
76def tracking_branch(repo_path: Optional[Path] = None) -> Optional[str]:
77    """Returns the tracking branch of the current branch.
78
79    Since most callers of this function can safely handle a return value of
80    None, suppress exceptions and return None if there is no tracking branch.
81
82    Args:
83      repo_path: repo path from which to run commands; defaults to Path.cwd()
84
85    Raises:
86      ValueError: if repo_path is not in a Git repository
87
88    Returns:
89      the remote tracking branch name or None if there is none
90    """
91    if repo_path is None:
92        repo_path = Path.cwd()
93
94    if not is_repo(repo_path or Path.cwd()):
95        raise ValueError(f'{repo_path} is not within a Git repository')
96
97    # This command should only error out if there's no upstream branch set.
98    try:
99        return git_stdout(
100            'rev-parse',
101            '--abbrev-ref',
102            '--symbolic-full-name',
103            TRACKING_BRANCH_ALIAS,
104            repo=repo_path,
105        )
106
107    except subprocess.CalledProcessError:
108        return None
109
110
111def list_files(
112    commit: Optional[str] = None,
113    pathspecs: Collection[PathOrStr] = (),
114    repo_path: Optional[Path] = None,
115) -> List[Path]:
116    """Lists files with git ls-files or git diff --name-only.
117
118    Args:
119      commit: commit to use as a base for git diff
120      pathspecs: Git pathspecs to use in git ls-files or diff
121      repo_path: repo path from which to run commands; defaults to Path.cwd()
122
123    Returns:
124      A sorted list of absolute paths
125    """
126    if repo_path is None:
127        repo_path = Path.cwd()
128
129    if commit in _TRACKING_BRANCH_ALIASES:
130        commit = tracking_branch(repo_path)
131
132    if commit:
133        try:
134            return sorted(_diff_names(commit, pathspecs, repo_path))
135        except subprocess.CalledProcessError:
136            _LOG.warning(
137                'Error comparing with base revision %s of %s, listing all '
138                'files instead of just changed files',
139                commit,
140                repo_path,
141            )
142
143    return sorted(_ls_files(pathspecs, repo_path))
144
145
146def has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
147    """Returns True if the Git repo has uncommitted changes in it.
148
149    This does not check for untracked files.
150    """
151    if repo is None:
152        repo = Path.cwd()
153
154    # Refresh the Git index so that the diff-index command will be accurate.
155    # The `git update-index` command isn't reliable when run in parallel with
156    # other processes that may touch files in the repo directory, so retry a
157    # few times before giving up. The hallmark of this failure mode is the lack
158    # of an error message on stderr, so if we see something there we can assume
159    # it's some other issue and raise.
160    retries = 6
161    for i in range(retries):
162        try:
163            log_run(
164                ['git', '-C', repo, 'update-index', '-q', '--refresh'],
165                capture_output=True,
166                check=True,
167            )
168        except subprocess.CalledProcessError as err:
169            if err.stderr or i == retries - 1:
170                raise
171            continue
172    # diff-index exits with 1 if there are uncommitted changes.
173    return (
174        log_run(
175            ['git', '-C', repo, 'diff-index', '--quiet', 'HEAD', '--']
176        ).returncode
177        == 1
178    )
179
180
181def _describe_constraints(
182    git_root: Path,
183    repo_path: Path,
184    commit: Optional[str],
185    pathspecs: Collection[PathOrStr],
186    exclude: Collection[Pattern[str]],
187) -> Iterable[str]:
188    if not git_root.samefile(repo_path):
189        yield (
190            f'under the {repo_path.resolve().relative_to(git_root.resolve())} '
191            'subdirectory'
192        )
193
194    if commit in _TRACKING_BRANCH_ALIASES:
195        commit = tracking_branch(git_root)
196        if commit is None:
197            _LOG.warning(
198                'Attempted to list files changed since the remote tracking '
199                'branch, but the repo is not tracking a branch'
200            )
201
202    if commit:
203        yield f'that have changed since {commit}'
204
205    if pathspecs:
206        paths_str = ', '.join(str(p) for p in pathspecs)
207        yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
208
209    if exclude:
210        yield (
211            f'that do not match {plural(exclude, "pattern")} ('
212            + ', '.join(p.pattern for p in exclude)
213            + ')'
214        )
215
216
217def describe_files(
218    git_root: Path,
219    repo_path: Path,
220    commit: Optional[str],
221    pathspecs: Collection[PathOrStr],
222    exclude: Collection[Pattern],
223    project_root: Optional[Path] = None,
224) -> str:
225    """Completes 'Doing something to ...' for a set of files in a Git repo."""
226    constraints = list(
227        _describe_constraints(git_root, repo_path, commit, pathspecs, exclude)
228    )
229
230    name = git_root.name
231    if project_root and project_root != git_root:
232        name = str(git_root.relative_to(project_root))
233
234    if not constraints:
235        return f'all files in the {name} repo'
236
237    msg = f'files in the {name} repo'
238    if len(constraints) == 1:
239        return f'{msg} {constraints[0]}'
240
241    return msg + ''.join(f'\n    - {line}' for line in constraints)
242
243
244def root(repo_path: PathOrStr = '.', *, show_stderr: bool = True) -> Path:
245    """Returns the repository root as an absolute path.
246
247    Raises:
248      FileNotFoundError: the path does not exist
249      subprocess.CalledProcessError: the path is not in a Git repo
250    """
251    repo_path = Path(repo_path)
252    if not repo_path.exists():
253        raise FileNotFoundError(f'{repo_path} does not exist')
254
255    return Path(
256        git_stdout(
257            'rev-parse',
258            '--show-toplevel',
259            repo=repo_path if repo_path.is_dir() else repo_path.parent,
260            show_stderr=show_stderr,
261        )
262    )
263
264
265def within_repo(repo_path: PathOrStr = '.') -> Optional[Path]:
266    """Similar to root(repo_path), returns None if the path is not in a repo."""
267    try:
268        return root(repo_path, show_stderr=False)
269    except subprocess.CalledProcessError:
270        return None
271
272
273def is_repo(repo_path: PathOrStr = '.') -> bool:
274    """True if the path is tracked by a Git repo."""
275    return within_repo(repo_path) is not None
276
277
278def path(
279    repo_path: PathOrStr,
280    *additional_repo_paths: PathOrStr,
281    repo: PathOrStr = '.',
282) -> Path:
283    """Returns a path relative to a Git repository's root."""
284    return root(repo).joinpath(repo_path, *additional_repo_paths)
285
286
287def commit_message(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
288    return git_stdout('log', '--format=%B', '-n1', commit, repo=repo)
289
290
291def commit_author(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
292    return git_stdout('log', '--format=%ae', '-n1', commit, repo=repo)
293
294
295def commit_hash(
296    rev: str = 'HEAD', short: bool = True, repo: PathOrStr = '.'
297) -> str:
298    """Returns the commit hash of the revision."""
299    args = ['rev-parse']
300    if short:
301        args += ['--short']
302    args += [rev]
303    return git_stdout(*args, repo=repo)
304
305
306def discover_submodules(
307    superproject_dir: Path, excluded_paths: Collection[PatternOrStr] = ()
308) -> List[Path]:
309    """Query git and return a list of submodules in the current project.
310
311    Args:
312        superproject_dir: Path object to directory under which we are looking
313                          for submodules. This will also be included in list
314                          returned unless excluded.
315        excluded_paths: Pattern or string that match submodules that should not
316                        be returned. All matches are done on posix style paths.
317
318    Returns:
319        List of "Path"s which were found but not excluded, this includes
320        superproject_dir unless excluded.
321    """
322    discovery_report = git_stdout(
323        'submodule',
324        'foreach',
325        '--quiet',
326        '--recursive',
327        'echo $toplevel/$sm_path',
328        repo=superproject_dir,
329    )
330    module_dirs = [Path(line) for line in discovery_report.split()]
331    # The superproject is omitted in the prior scan.
332    module_dirs.append(superproject_dir)
333
334    for exclude in excluded_paths:
335        if isinstance(exclude, Pattern):
336            for module_dir in reversed(module_dirs):
337                if exclude.fullmatch(module_dir.as_posix()):
338                    module_dirs.remove(module_dir)
339        else:
340            for module_dir in reversed(module_dirs):
341                if exclude == module_dir.as_posix():
342                    module_dirs.remove(module_dir)
343
344    return module_dirs
345