1# Copyright 2020 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Helpful commands for working with a Git repository.""" 15 16import logging 17from pathlib import Path 18import subprocess 19from typing import Collection, Iterable, Iterator, List, NamedTuple, Optional 20from typing import Pattern, Set, Tuple, Union 21 22from pw_presubmit.tools import log_run, plural 23 24_LOG = logging.getLogger(__name__) 25PathOrStr = Union[Path, str] 26 27TRACKING_BRANCH_ALIAS = '@{upstream}' 28_TRACKING_BRANCH_ALIASES = TRACKING_BRANCH_ALIAS, '@{u}' 29 30 31def git_stdout(*args: PathOrStr, 32 show_stderr=False, 33 repo: PathOrStr = '.') -> str: 34 return log_run(['git', '-C', repo, *args], 35 stdout=subprocess.PIPE, 36 stderr=None if show_stderr else subprocess.DEVNULL, 37 check=True).stdout.decode().strip() 38 39 40def _ls_files(args: Collection[PathOrStr], repo: Path) -> Iterable[Path]: 41 """Returns results of git ls-files as absolute paths.""" 42 git_root = repo.resolve() 43 for file in git_stdout('ls-files', '--', *args, repo=repo).splitlines(): 44 yield git_root / file 45 46 47def _diff_names(commit: str, pathspecs: Collection[PathOrStr], 48 repo: Path) -> Iterable[Path]: 49 """Returns absolute paths of files changed since the specified commit.""" 50 git_root = root(repo) 51 for file in git_stdout('diff', 52 '--name-only', 53 '--diff-filter=d', 54 commit, 55 '--', 56 *pathspecs, 57 repo=repo).splitlines(): 58 yield git_root / file 59 60 61def tracking_branch(repo_path: Path = None) -> Optional[str]: 62 """Returns the tracking branch of the current branch. 63 64 Since most callers of this function can safely handle a return value of 65 None, suppress exceptions and return None if there is no tracking branch. 66 67 Args: 68 repo_path: repo path from which to run commands; defaults to Path.cwd() 69 70 Raises: 71 ValueError: if repo_path is not in a Git repository 72 73 Returns: 74 the remote tracking branch name or None if there is none 75 """ 76 if repo_path is None: 77 repo_path = Path.cwd() 78 79 if not is_repo(repo_path or Path.cwd()): 80 raise ValueError(f'{repo_path} is not within a Git repository') 81 82 # This command should only error out if there's no upstream branch set. 83 try: 84 return git_stdout('rev-parse', 85 '--abbrev-ref', 86 '--symbolic-full-name', 87 TRACKING_BRANCH_ALIAS, 88 repo=repo_path) 89 90 except subprocess.CalledProcessError: 91 return None 92 93 94def list_files(commit: Optional[str] = None, 95 pathspecs: Collection[PathOrStr] = (), 96 repo_path: Optional[Path] = None) -> List[Path]: 97 """Lists files with git ls-files or git diff --name-only. 98 99 Args: 100 commit: commit to use as a base for git diff 101 pathspecs: Git pathspecs to use in git ls-files or diff 102 repo_path: repo path from which to run commands; defaults to Path.cwd() 103 104 Returns: 105 A sorted list of absolute paths 106 """ 107 if repo_path is None: 108 repo_path = Path.cwd() 109 110 if commit in _TRACKING_BRANCH_ALIASES: 111 commit = tracking_branch(repo_path) 112 113 if commit: 114 try: 115 return sorted(_diff_names(commit, pathspecs, repo_path)) 116 except subprocess.CalledProcessError: 117 _LOG.warning( 118 'Error comparing with base revision %s of %s, listing all ' 119 'files instead of just changed files', commit, repo_path) 120 121 return sorted(_ls_files(pathspecs, repo_path)) 122 123 124def has_uncommitted_changes(repo: Optional[Path] = None) -> bool: 125 """Returns True if the Git repo has uncommitted changes in it. 126 127 This does not check for untracked files. 128 """ 129 if repo is None: 130 repo = Path.cwd() 131 132 # Refresh the Git index so that the diff-index command will be accurate. 133 # The `git update-index` command isn't reliable when run in parallel with 134 # other processes that may touch files in the repo directory, so retry a 135 # few times before giving up. The hallmark of this failure mode is the lack 136 # of an error message on stderr, so if we see something there we can assume 137 # it's some other issue and raise. 138 retries = 6 139 for i in range(retries): 140 try: 141 log_run(['git', '-C', repo, 'update-index', '-q', '--refresh'], 142 capture_output=True, 143 check=True) 144 except subprocess.CalledProcessError as err: 145 if err.stderr or i == retries - 1: 146 raise 147 continue 148 # diff-index exits with 1 if there are uncommitted changes. 149 return log_run(['git', '-C', repo, 'diff-index', '--quiet', 'HEAD', 150 '--']).returncode == 1 151 152 153def _describe_constraints(git_root: Path, repo_path: Path, 154 commit: Optional[str], 155 pathspecs: Collection[PathOrStr], 156 exclude: Collection[Pattern[str]]) -> Iterable[str]: 157 if not git_root.samefile(repo_path): 158 yield ( 159 f'under the {repo_path.resolve().relative_to(git_root.resolve())} ' 160 'subdirectory') 161 162 if commit in _TRACKING_BRANCH_ALIASES: 163 commit = tracking_branch(git_root) 164 if commit is None: 165 _LOG.warning( 166 'Attempted to list files changed since the remote tracking ' 167 'branch, but the repo is not tracking a branch') 168 169 if commit: 170 yield f'that have changed since {commit}' 171 172 if pathspecs: 173 paths_str = ', '.join(str(p) for p in pathspecs) 174 yield f'that match {plural(pathspecs, "pathspec")} ({paths_str})' 175 176 if exclude: 177 yield (f'that do not match {plural(exclude, "pattern")} (' + 178 ', '.join(p.pattern for p in exclude) + ')') 179 180 181def describe_files(git_root: Path, 182 repo_path: Path, 183 commit: Optional[str], 184 pathspecs: Collection[PathOrStr], 185 exclude: Collection[Pattern], 186 project_root: Path = None) -> str: 187 """Completes 'Doing something to ...' for a set of files in a Git repo.""" 188 constraints = list( 189 _describe_constraints(git_root, repo_path, commit, pathspecs, exclude)) 190 191 name = git_root.name 192 if project_root and project_root != git_root: 193 name = str(git_root.relative_to(project_root)) 194 195 if not constraints: 196 return f'all files in the {name} repo' 197 198 msg = f'files in the {name} repo' 199 if len(constraints) == 1: 200 return f'{msg} {constraints[0]}' 201 202 return msg + ''.join(f'\n - {line}' for line in constraints) 203 204 205def root(repo_path: PathOrStr = '.', *, show_stderr: bool = True) -> Path: 206 """Returns the repository root as an absolute path. 207 208 Raises: 209 FileNotFoundError: the path does not exist 210 subprocess.CalledProcessError: the path is not in a Git repo 211 """ 212 repo_path = Path(repo_path) 213 if not repo_path.exists(): 214 raise FileNotFoundError(f'{repo_path} does not exist') 215 216 return Path( 217 git_stdout('rev-parse', 218 '--show-toplevel', 219 repo=repo_path if repo_path.is_dir() else repo_path.parent, 220 show_stderr=show_stderr)) 221 222 223def within_repo(repo_path: PathOrStr = '.') -> Optional[Path]: 224 """Similar to root(repo_path), returns None if the path is not in a repo.""" 225 try: 226 return root(repo_path, show_stderr=False) 227 except subprocess.CalledProcessError: 228 return None 229 230 231def is_repo(repo_path: PathOrStr = '.') -> bool: 232 """True if the path is tracked by a Git repo.""" 233 return within_repo(repo_path) is not None 234 235 236def path(repo_path: PathOrStr, 237 *additional_repo_paths: PathOrStr, 238 repo: PathOrStr = '.') -> Path: 239 """Returns a path relative to a Git repository's root.""" 240 return root(repo).joinpath(repo_path, *additional_repo_paths) 241 242 243class PythonPackage(NamedTuple): 244 root: Path # Path to the file containing the setup.py 245 package: Path # Path to the main package directory 246 packaged_files: Tuple[Path, ...] # All sources in the main package dir 247 other_files: Tuple[Path, ...] # Other Python files under root 248 249 def all_files(self) -> Tuple[Path, ...]: 250 return self.packaged_files + self.other_files 251 252 253def all_python_packages(repo: PathOrStr = '.') -> Iterator[PythonPackage]: 254 """Finds all Python packages in the repo based on setup.py locations.""" 255 root_py_dirs = [ 256 file.parent 257 for file in _ls_files(['setup.py', '*/setup.py'], Path(repo)) 258 ] 259 260 for py_dir in root_py_dirs: 261 all_packaged_files = _ls_files([py_dir / '*' / '*.py'], repo=py_dir) 262 common_dir: Optional[str] = None 263 264 # Make there is only one package directory with Python files in it. 265 for file in all_packaged_files: 266 package_dir = file.relative_to(py_dir).parts[0] 267 268 if common_dir is None: 269 common_dir = package_dir 270 elif common_dir != package_dir: 271 _LOG.warning( 272 'There are multiple Python package directories in %s: %s ' 273 'and %s. This is not supported by pw presubmit. Each ' 274 'setup.py should correspond with a single Python package', 275 py_dir, common_dir, package_dir) 276 break 277 278 if common_dir is not None: 279 packaged_files = tuple(_ls_files(['*/*.py'], repo=py_dir)) 280 other_files = tuple( 281 f for f in _ls_files(['*.py'], repo=py_dir) 282 if f.name != 'setup.py' and f not in packaged_files) 283 284 yield PythonPackage(py_dir, py_dir / common_dir, packaged_files, 285 other_files) 286 287 288def python_packages_containing( 289 python_paths: Iterable[Path], 290 repo: PathOrStr = '.') -> Tuple[List[PythonPackage], List[Path]]: 291 """Finds all Python packages containing the provided Python paths. 292 293 Returns: 294 ([packages], [files_not_in_packages]) 295 """ 296 all_packages = list(all_python_packages(repo)) 297 298 packages: Set[PythonPackage] = set() 299 files_not_in_packages: List[Path] = [] 300 301 for python_path in python_paths: 302 for package in all_packages: 303 if package.root in python_path.parents: 304 packages.add(package) 305 break 306 else: 307 files_not_in_packages.append(python_path) 308 309 return list(packages), files_not_in_packages 310 311 312def commit_message(commit: str = 'HEAD', repo: PathOrStr = '.') -> str: 313 return git_stdout('log', '--format=%B', '-n1', commit, repo=repo) 314 315 316def commit_hash(rev: str = 'HEAD', 317 short: bool = True, 318 repo: PathOrStr = '.') -> str: 319 """Returns the commit hash of the revision.""" 320 args = ['rev-parse'] 321 if short: 322 args += ['--short'] 323 args += [rev] 324 return git_stdout(*args, repo=repo) 325