• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Class to manage a git repository via python.
15
16This class is to be used to implement git commands over
17a python API and manage the current state of the git repo.
18
19  Typical usage example:
20
21    r_man =  RepoManager('https://github.com/google/oss-fuzz.git')
22    r_man.checkout('5668cc422c2c92d38a370545d3591039fb5bb8d4')
23"""
24import datetime
25import logging
26import os
27import shutil
28
29import utils
30
31
32class RepoManager:
33  """Repo manager."""
34
35  def __init__(self, repo_dir):
36    self.repo_dir = repo_dir
37
38  def _is_git_repo(self):
39    """Test if the current repo dir is a git repo or not.
40
41    Returns:
42      True if the current repo_dir is a valid git repo.
43    """
44    git_path = os.path.join(self.repo_dir, '.git')
45    return os.path.isdir(git_path)
46
47  def git(self, cmd, check_result=False):
48    """Run a git command.
49
50    Args:
51      command: The git command as a list to be run.
52      check_result: Should an exception be thrown on failed command.
53
54    Returns:
55      stdout, stderr, error code.
56    """
57    return utils.execute(['git'] + cmd,
58                         location=self.repo_dir,
59                         check_result=check_result)
60
61  def commit_exists(self, commit):
62    """Checks to see if a commit exists in the project repo.
63
64    Args:
65      commit: The commit SHA you are checking.
66
67    Returns:
68      True if the commit exits in the project.
69    """
70    if not commit.rstrip():
71      return False
72
73    _, _, err_code = self.git(['cat-file', '-e', commit])
74    return not err_code
75
76  def commit_date(self, commit):
77    """Get the date of a commit.
78
79    Args:
80      commit: The commit hash.
81
82    Returns:
83      A datetime representing the date of the commit.
84    """
85    out, _, _ = self.git(['show', '-s', '--format=%ct', commit],
86                         check_result=True)
87    return datetime.datetime.fromtimestamp(int(out), tz=datetime.timezone.utc)
88
89  def get_git_diff(self, base='origin...'):
90    """Gets a list of files that have changed from the repo head.
91
92    Returns:
93      A list of changed file paths or None on Error.
94    """
95    self.fetch_unshallow()
96    # Add '--' so that git knows we aren't talking about files.
97    command = ['diff', '--name-only', base, '--']
98    out, err_msg, err_code = self.git(command)
99    if err_code:
100      logging.error('Git diff failed with error message %s.', err_msg)
101      return None
102    if not out:
103      logging.error('No diff was found.')
104      return None
105    return [line for line in out.splitlines() if line]
106
107  def get_current_commit(self):
108    """Gets the current commit SHA of the repo.
109
110    Returns:
111      The current active commit SHA.
112    """
113    out, _, _ = self.git(['rev-parse', 'HEAD'], check_result=True)
114    return out.strip()
115
116  def get_parent(self, commit, count):
117    """Gets the count'th parent of the given commit.
118
119    Returns:
120      The parent commit SHA.
121    """
122    self.fetch_unshallow()
123    out, _, err_code = self.git(['rev-parse', commit + '~' + str(count)],
124                                check_result=False)
125    if err_code:
126      return None
127
128    return out.strip()
129
130  def fetch_all_remotes(self):
131    """Fetch all remotes for checkouts that track a single branch."""
132    self.git([
133        'config', 'remote.origin.fetch', '+refs/heads/*:refs/remotes/origin/*'
134    ],
135             check_result=True)
136    self.git(['remote', 'update'], check_result=True)
137
138  def get_commit_list(self, newest_commit, oldest_commit=None, limit=None):
139    """Gets the list of commits(inclusive) between the old and new commits.
140
141    Args:
142      newest_commit: The newest commit to be in the list.
143      oldest_commit: The (optional) oldest commit to be in the list.
144
145    Returns:
146      The list of commit SHAs from newest to oldest.
147
148    Raises:
149      ValueError: When either the oldest or newest commit does not exist.
150      RuntimeError: When there is an error getting the commit list.
151    """
152    self.fetch_unshallow()
153    if oldest_commit and not self.commit_exists(oldest_commit):
154      raise ValueError('The oldest commit %s does not exist' % oldest_commit)
155    if not self.commit_exists(newest_commit):
156      raise ValueError('The newest commit %s does not exist' % newest_commit)
157    if oldest_commit == newest_commit:
158      return [oldest_commit]
159
160    if oldest_commit:
161      commit_range = oldest_commit + '..' + newest_commit
162    else:
163      commit_range = newest_commit
164
165    limit_args = []
166    if limit:
167      limit_args.append(f'--max-count={limit}')
168
169    out, _, err_code = self.git(['rev-list', commit_range] + limit_args)
170    commits = out.split('\n')
171    commits = [commit for commit in commits if commit]
172    if err_code or not commits:
173      raise RuntimeError('Error getting commit list between %s and %s ' %
174                         (oldest_commit, newest_commit))
175
176    # Make sure result is inclusive
177    if oldest_commit:
178      commits.append(oldest_commit)
179    return commits
180
181  def fetch_branch(self, branch):
182    """Fetches a remote branch from origin."""
183    return self.git(
184        ['fetch', 'origin', '{branch}:{branch}'.format(branch=branch)])
185
186  def fetch_unshallow(self):
187    """Gets the current git repository history."""
188    shallow_file = os.path.join(self.repo_dir, '.git', 'shallow')
189    if os.path.exists(shallow_file):
190      _, err, err_code = self.git(['fetch', '--unshallow'], check_result=False)
191      if err_code:
192        logging.error('Unshallow returned non-zero code: %s', err)
193
194  def checkout_pr(self, pr_ref):
195    """Checks out a remote pull request.
196
197    Args:
198      pr_ref: The pull request reference to be checked out.
199    """
200    self.fetch_unshallow()
201    self.git(['fetch', 'origin', pr_ref], check_result=True)
202    self.git(['checkout', '-f', 'FETCH_HEAD'], check_result=True)
203
204  def checkout_commit(self, commit, clean=True):
205    """Checks out a specific commit from the repo.
206
207    Args:
208      commit: The commit SHA to be checked out.
209
210    Raises:
211      RuntimeError: when checkout is not successful.
212      ValueError: when commit does not exist.
213    """
214    self.fetch_unshallow()
215    if not self.commit_exists(commit):
216      raise ValueError('Commit %s does not exist in current branch' % commit)
217    self.git(['checkout', '-f', commit], check_result=True)
218    if clean:
219      self.git(['clean', '-fxd'], check_result=True)
220    if self.get_current_commit() != commit:
221      raise RuntimeError('Error checking out commit %s' % commit)
222
223  def remove_repo(self):
224    """Removes the git repo from disk."""
225    if os.path.isdir(self.repo_dir):
226      shutil.rmtree(self.repo_dir)
227
228
229def clone_repo_and_get_manager(repo_url, base_dir, repo_name=None):
230  """Clones a repo and constructs a repo manager class.
231
232    Args:
233      repo_url: The github url needed to clone.
234      base_dir: The full file-path where the git repo is located.
235      repo_name: The name of the directory the repo is cloned to.
236    """
237  if repo_name is None:
238    repo_name = os.path.basename(repo_url).replace('.git', '')
239  repo_dir = os.path.join(base_dir, repo_name)
240  manager = RepoManager(repo_dir)
241
242  if not os.path.exists(repo_dir):
243    _clone(repo_url, base_dir, repo_name)
244
245  return manager
246
247
248def _clone(repo_url, base_dir, repo_name):
249  """Creates a clone of the repo in the specified directory.
250
251     Raises:
252       ValueError: when the repo is not able to be cloned.
253  """
254  utils.execute(['git', 'clone', repo_url, repo_name],
255                location=base_dir,
256                check_result=True)
257