• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""Install and check status of Git repository-based packages."""
15
16import os
17import pathlib
18import shutil
19import subprocess
20from typing import Union
21import urllib.parse
22
23import pw_package.package_manager
24
25PathOrStr = Union[pathlib.Path, str]
26
27
28def git_stdout(
29    *args: PathOrStr, show_stderr=False, repo: PathOrStr = '.'
30) -> str:
31    return (
32        subprocess.run(
33            ['git', '-C', repo, *args],
34            stdout=subprocess.PIPE,
35            stderr=None if show_stderr else subprocess.DEVNULL,
36            check=True,
37        )
38        .stdout.decode()
39        .strip()
40    )
41
42
43def git(*args: PathOrStr, repo: PathOrStr = '.') -> subprocess.CompletedProcess:
44    return subprocess.run(['git', '-C', repo, *args], check=True)
45
46
47class GitRepo(pw_package.package_manager.Package):
48    """Install and check status of Git repository-based packages."""
49
50    def __init__(
51        self, url, *args, commit='', tag='', sparse_list=None, **kwargs
52    ):
53        super().__init__(*args, **kwargs)
54        if not (commit or tag):
55            raise ValueError('git repo must specify a commit or tag')
56
57        self._url = url
58        self._commit = commit
59        self._tag = tag
60        self._sparse_list = sparse_list
61        self._allow_use_in_downstream = False
62
63    def status(self, path: pathlib.Path) -> bool:
64        # TODO(tonymd): Check the correct SHA is checked out here.
65        if not os.path.isdir(path / '.git'):
66            return False
67
68        remote = git_stdout('remote', 'get-url', 'origin', repo=path)
69        url = urllib.parse.urlparse(remote)
70        if url.scheme == 'sso' or '.git.corp.google.com' in url.netloc:
71            host = url.netloc.replace(
72                '.git.corp.google.com',
73                '.googlesource.com',
74            )
75            if not host.endswith('.googlesource.com'):
76                host += '.googlesource.com'
77            remote = 'https://{}{}'.format(host, url.path)
78
79        commit = git_stdout('rev-parse', 'HEAD', repo=path)
80        if self._commit and self._commit != commit:
81            return False
82
83        if self._tag:
84            tag = git_stdout('describe', '--tags', repo=path)
85            if self._tag != tag:
86                return False
87
88        # If it is a sparse checkout, sparse list shall match.
89        if self._sparse_list:
90            if not self.check_sparse_list(path):
91                return False
92
93        status = git_stdout('status', '--porcelain=v1', repo=path)
94        return remote == self._url and not status
95
96    def install(self, path: pathlib.Path) -> None:
97        # If already installed and at correct version exit now.
98        if self.status(path):
99            return
100
101        # Otherwise delete current version and clone again.
102        if os.path.isdir(path):
103            shutil.rmtree(path)
104
105        if self._sparse_list:
106            self.checkout_sparse(path)
107        else:
108            self.checkout_full(path)
109
110    def checkout_full(self, path: pathlib.Path) -> None:
111        # --filter=blob:none means we don't get history, just the current
112        # revision. If we later run commands that need history it will be
113        # retrieved on-demand. For small repositories the effect is negligible
114        # but for large repositories this should be a significant improvement.
115        if self._commit:
116            git('clone', '--filter=blob:none', self._url, path)
117            git('reset', '--hard', self._commit, repo=path)
118        elif self._tag:
119            git('clone', '-b', self._tag, '--filter=blob:none', self._url, path)
120
121    def checkout_sparse(self, path: pathlib.Path) -> None:
122        # sparse checkout
123        git('init', path)
124        git('remote', 'add', 'origin', self._url, repo=path)
125        git('config', 'core.sparseCheckout', 'true', repo=path)
126
127        # Add files to checkout by editing .git/info/sparse-checkout
128        with open(path / '.git' / 'info' / 'sparse-checkout', 'w') as sparse:
129            for source in self._sparse_list:
130                sparse.write(source + '\n')
131
132        # Either pull from a commit or a tag.
133        target = self._commit if self._commit else self._tag
134        git('pull', '--depth=1', 'origin', target, repo=path)
135
136    def check_sparse_list(self, path: pathlib.Path) -> bool:
137        sparse_list = (
138            git_stdout('sparse-checkout', 'list', repo=path)
139            .strip('\n')
140            .splitlines()
141        )
142        return set(sparse_list) == set(self._sparse_list)
143