• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Helpful commands for working with a Git repository."""
15
16import logging
17from pathlib import Path
18import subprocess
19from typing import Collection, Iterable, Iterator, List, NamedTuple, Optional
20from typing import Pattern, Set, Tuple, Union
21
22from pw_presubmit.tools import log_run, plural
23
24_LOG = logging.getLogger(__name__)
25PathOrStr = Union[Path, str]
26
27TRACKING_BRANCH_ALIAS = '@{upstream}'
28_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}'
29
30
31def git_stdout(*args: PathOrStr,
32               show_stderr=False,
33               repo: PathOrStr = '.') -> str:
34    return log_run(['git', '-C', repo, *args],
35                   stdout=subprocess.PIPE,
36                   stderr=None if show_stderr else subprocess.DEVNULL,
37                   check=True).stdout.decode().strip()
38
39
40def _ls_files(args: Collection[PathOrStr], repo: Path) -> Iterable[Path]:
41    """Returns results of git ls-files as absolute paths."""
42    git_root = repo.resolve()
43    for file in git_stdout('ls-files', '--', *args, repo=repo).splitlines():
44        yield git_root / file
45
46
47def _diff_names(commit: str, pathspecs: Collection[PathOrStr],
48                repo: Path) -> Iterable[Path]:
49    """Returns absolute paths of files changed since the specified commit."""
50    git_root = root(repo)
51    for file in git_stdout('diff',
52                           '--name-only',
53                           '--diff-filter=d',
54                           commit,
55                           '--',
56                           *pathspecs,
57                           repo=repo).splitlines():
58        yield git_root / file
59
60
61def tracking_branch(repo_path: Path = None) -> Optional[str]:
62    """Returns the tracking branch of the current branch.
63
64    Since most callers of this function can safely handle a return value of
65    None, suppress exceptions and return None if there is no tracking branch.
66
67    Args:
68      repo_path: repo path from which to run commands; defaults to Path.cwd()
69
70    Raises:
71      ValueError: if repo_path is not in a Git repository
72
73    Returns:
74      the remote tracking branch name or None if there is none
75    """
76    if repo_path is None:
77        repo_path = Path.cwd()
78
79    if not is_repo(repo_path or Path.cwd()):
80        raise ValueError(f'{repo_path} is not within a Git repository')
81
82    # This command should only error out if there's no upstream branch set.
83    try:
84        return git_stdout('rev-parse',
85                          '--abbrev-ref',
86                          '--symbolic-full-name',
87                          TRACKING_BRANCH_ALIAS,
88                          repo=repo_path)
89
90    except subprocess.CalledProcessError:
91        return None
92
93
94def list_files(commit: Optional[str] = None,
95               pathspecs: Collection[PathOrStr] = (),
96               repo_path: Optional[Path] = None) -> List[Path]:
97    """Lists files with git ls-files or git diff --name-only.
98
99    Args:
100      commit: commit to use as a base for git diff
101      pathspecs: Git pathspecs to use in git ls-files or diff
102      repo_path: repo path from which to run commands; defaults to Path.cwd()
103
104    Returns:
105      A sorted list of absolute paths
106    """
107    if repo_path is None:
108        repo_path = Path.cwd()
109
110    if commit in _TRACKING_BRANCH_ALIASES:
111        commit = tracking_branch(repo_path)
112
113    if commit:
114        try:
115            return sorted(_diff_names(commit, pathspecs, repo_path))
116        except subprocess.CalledProcessError:
117            _LOG.warning(
118                'Error comparing with base revision %s of %s, listing all '
119                'files instead of just changed files', commit, repo_path)
120
121    return sorted(_ls_files(pathspecs, repo_path))
122
123
124def has_uncommitted_changes(repo: Optional[Path] = None) -> bool:
125    """Returns True if the Git repo has uncommitted changes in it.
126
127    This does not check for untracked files.
128    """
129    if repo is None:
130        repo = Path.cwd()
131
132    # Refresh the Git index so that the diff-index command will be accurate.
133    # The `git update-index` command isn't reliable when run in parallel with
134    # other processes that may touch files in the repo directory, so retry a
135    # few times before giving up. The hallmark of this failure mode is the lack
136    # of an error message on stderr, so if we see something there we can assume
137    # it's some other issue and raise.
138    retries = 6
139    for i in range(retries):
140        try:
141            log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'],
142                    capture_output=True,
143                    check=True)
144        except subprocess.CalledProcessError as err:
145            if err.stderr or i == retries - 1:
146                raise
147            continue
148    # diff-index exits with 1 if there are uncommitted changes.
149    return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD',
150                    '--']).returncode == 1
151
152
153def _describe_constraints(git_root: Path, repo_path: Path,
154                          commit: Optional[str],
155                          pathspecs: Collection[PathOrStr],
156                          exclude: Collection[Pattern[str]]) -> Iterable[str]:
157    if not git_root.samefile(repo_path):
158        yield (
159            f'under the {repo_path.resolve().relative_to(git_root.resolve())} '
160            'subdirectory')
161
162    if commit in _TRACKING_BRANCH_ALIASES:
163        commit = tracking_branch(git_root)
164        if commit is None:
165            _LOG.warning(
166                'Attempted to list files changed since the remote tracking '
167                'branch, but the repo is not tracking a branch')
168
169    if commit:
170        yield f'that have changed since {commit}'
171
172    if pathspecs:
173        paths_str = ', '.join(str(p) for p in pathspecs)
174        yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})'
175
176    if exclude:
177        yield (f'that do not match {plural(exclude, "pattern")} (' +
178               ', '.join(p.pattern for p in exclude) + ')')
179
180
181def describe_files(git_root: Path,
182                   repo_path: Path,
183                   commit: Optional[str],
184                   pathspecs: Collection[PathOrStr],
185                   exclude: Collection[Pattern],
186                   project_root: Path = None) -> str:
187    """Completes 'Doing something to ...' for a set of files in a Git repo."""
188    constraints = list(
189        _describe_constraints(git_root, repo_path, commit, pathspecs, exclude))
190
191    name = git_root.name
192    if project_root and project_root != git_root:
193        name = str(git_root.relative_to(project_root))
194
195    if not constraints:
196        return f'all files in the {name} repo'
197
198    msg = f'files in the {name} repo'
199    if len(constraints) == 1:
200        return f'{msg} {constraints[0]}'
201
202    return msg + ''.join(f'\n    - {line}' for line in constraints)
203
204
205def root(repo_path: PathOrStr = '.', *, show_stderr: bool = True) -> Path:
206    """Returns the repository root as an absolute path.
207
208    Raises:
209      FileNotFoundError: the path does not exist
210      subprocess.CalledProcessError: the path is not in a Git repo
211    """
212    repo_path = Path(repo_path)
213    if not repo_path.exists():
214        raise FileNotFoundError(f'{repo_path} does not exist')
215
216    return Path(
217        git_stdout('rev-parse',
218                   '--show-toplevel',
219                   repo=repo_path if repo_path.is_dir() else repo_path.parent,
220                   show_stderr=show_stderr))
221
222
223def within_repo(repo_path: PathOrStr = '.') -> Optional[Path]:
224    """Similar to root(repo_path), returns None if the path is not in a repo."""
225    try:
226        return root(repo_path, show_stderr=False)
227    except subprocess.CalledProcessError:
228        return None
229
230
231def is_repo(repo_path: PathOrStr = '.') -> bool:
232    """True if the path is tracked by a Git repo."""
233    return within_repo(repo_path) is not None
234
235
236def path(repo_path: PathOrStr,
237         *additional_repo_paths: PathOrStr,
238         repo: PathOrStr = '.') -> Path:
239    """Returns a path relative to a Git repository's root."""
240    return root(repo).joinpath(repo_path, *additional_repo_paths)
241
242
243class PythonPackage(NamedTuple):
244    root: Path  # Path to the file containing the setup.py
245    package: Path  # Path to the main package directory
246    packaged_files: Tuple[Path, ...]  # All sources in the main package dir
247    other_files: Tuple[Path, ...]  # Other Python files under root
248
249    def all_files(self) -> Tuple[Path, ...]:
250        return self.packaged_files + self.other_files
251
252
253def all_python_packages(repo: PathOrStr = '.') -> Iterator[PythonPackage]:
254    """Finds all Python packages in the repo based on setup.py locations."""
255    root_py_dirs = [
256        file.parent
257        for file in _ls_files(['setup.py', '*/setup.py'], Path(repo))
258    ]
259
260    for py_dir in root_py_dirs:
261        all_packaged_files = _ls_files([py_dir / '*' / '*.py'], repo=py_dir)
262        common_dir: Optional[str] = None
263
264        # Make there is only one package directory with Python files in it.
265        for file in all_packaged_files:
266            package_dir = file.relative_to(py_dir).parts[0]
267
268            if common_dir is None:
269                common_dir = package_dir
270            elif common_dir != package_dir:
271                _LOG.warning(
272                    'There are multiple Python package directories in %s: %s '
273                    'and %s. This is not supported by pw presubmit. Each '
274                    'setup.py should correspond with a single Python package',
275                    py_dir, common_dir, package_dir)
276                break
277
278        if common_dir is not None:
279            packaged_files = tuple(_ls_files(['*/*.py'], repo=py_dir))
280            other_files = tuple(
281                f for f in _ls_files(['*.py'], repo=py_dir)
282                if f.name != 'setup.py' and f not in packaged_files)
283
284            yield PythonPackage(py_dir, py_dir / common_dir, packaged_files,
285                                other_files)
286
287
288def python_packages_containing(
289        python_paths: Iterable[Path],
290        repo: PathOrStr = '.') -> Tuple[List[PythonPackage], List[Path]]:
291    """Finds all Python packages containing the provided Python paths.
292
293    Returns:
294      ([packages], [files_not_in_packages])
295    """
296    all_packages = list(all_python_packages(repo))
297
298    packages: Set[PythonPackage] = set()
299    files_not_in_packages: List[Path] = []
300
301    for python_path in python_paths:
302        for package in all_packages:
303            if package.root in python_path.parents:
304                packages.add(package)
305                break
306        else:
307            files_not_in_packages.append(python_path)
308
309    return list(packages), files_not_in_packages
310
311
312def commit_message(commit: str = 'HEAD', repo: PathOrStr = '.') -> str:
313    return git_stdout('log', '--format=%B', '-n1', commit, repo=repo)
314
315
316def commit_hash(rev: str = 'HEAD',
317                short: bool = True,
318                repo: PathOrStr = '.') -> str:
319    """Returns the commit hash of the revision."""
320    args = ['rev-parse']
321    if short:
322        args += ['--short']
323    args += [rev]
324    return git_stdout(*args, repo=repo)
325