• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import abc
15import contextlib
16import functools
17import logging
18from typing import Optional
19
20# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
21# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
22# TODO(sergiitk): Remove after #24897 is solved
23import grpc  # noqa pylint: disable=unused-import
24from absl import flags
25from google.cloud import secretmanager_v1
26from google.longrunning import operations_pb2
27from google.protobuf import json_format
28from google.rpc import code_pb2
29from googleapiclient import discovery
30import googleapiclient.errors
31import tenacity
32import yaml
33
34logger = logging.getLogger(__name__)
35PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
36    "private_api_key_secret_name",
37    default=None,
38    help="Load Private API access key from the latest version of the secret "
39    "with the given name, in the format projects/*/secrets/*")
40V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri",
41                                       default=discovery.V1_DISCOVERY_URI,
42                                       help="Override v1 Discovery URI")
43V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri",
44                                       default=discovery.V2_DISCOVERY_URI,
45                                       help="Override v2 Discovery URI")
46COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
47    "compute_v1_discovery_file",
48    default=None,
49    help="Load compute v1 from discovery file")
50
51# Type aliases
52Operation = operations_pb2.Operation
53
54
55class GcpApiManager:
56
57    def __init__(self,
58                 *,
59                 v1_discovery_uri=None,
60                 v2_discovery_uri=None,
61                 compute_v1_discovery_file=None,
62                 private_api_key_secret_name=None):
63        self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value
64        self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value
65        self.compute_v1_discovery_file = (compute_v1_discovery_file or
66                                          COMPUTE_V1_DISCOVERY_FILE.value)
67        self.private_api_key_secret_name = (private_api_key_secret_name or
68                                            PRIVATE_API_KEY_SECRET_NAME.value)
69        # TODO(sergiitk): add options to pass google Credentials
70        self._exit_stack = contextlib.ExitStack()
71
72    def close(self):
73        self._exit_stack.close()
74
75    @property
76    @functools.lru_cache(None)
77    def private_api_key(self):
78        """
79        Private API key.
80
81        Return API key credential that identifies a GCP project allow-listed for
82        accessing private API discovery documents.
83        https://pantheon.corp.google.com/apis/credentials
84
85        This method lazy-loads the content of the key from the Secret Manager.
86        https://pantheon.corp.google.com/security/secret-manager
87        """
88        if not self.private_api_key_secret_name:
89            raise ValueError('private_api_key_secret_name must be set to '
90                             'access private_api_key.')
91
92        secrets_api = self.secrets('v1')
93        version_resource_path = secrets_api.secret_version_path(
94            **secrets_api.parse_secret_path(self.private_api_key_secret_name),
95            secret_version='latest')
96        secret: secretmanager_v1.AccessSecretVersionResponse
97        secret = secrets_api.access_secret_version(name=version_resource_path)
98        return secret.payload.data.decode()
99
100    @functools.lru_cache(None)
101    def compute(self, version):
102        api_name = 'compute'
103        if version == 'v1':
104            if self.compute_v1_discovery_file:
105                return self._build_from_file(self.compute_v1_discovery_file)
106            else:
107                return self._build_from_discovery_v1(api_name, version)
108
109        raise NotImplementedError(f'Compute {version} not supported')
110
111    @functools.lru_cache(None)
112    def networksecurity(self, version):
113        api_name = 'networksecurity'
114        if version == 'v1alpha1':
115            return self._build_from_discovery_v2(api_name,
116                                                 version,
117                                                 api_key=self.private_api_key)
118
119        raise NotImplementedError(f'Network Security {version} not supported')
120
121    @functools.lru_cache(None)
122    def networkservices(self, version):
123        api_name = 'networkservices'
124        if version == 'v1alpha1':
125            return self._build_from_discovery_v2(api_name,
126                                                 version,
127                                                 api_key=self.private_api_key)
128
129        raise NotImplementedError(f'Network Services {version} not supported')
130
131    @functools.lru_cache(None)
132    def secrets(self, version):
133        if version == 'v1':
134            return secretmanager_v1.SecretManagerServiceClient()
135
136        raise NotImplementedError(f'Secrets Manager {version} not supported')
137
138    def _build_from_discovery_v1(self, api_name, version):
139        api = discovery.build(api_name,
140                              version,
141                              cache_discovery=False,
142                              discoveryServiceUrl=self.v1_discovery_uri)
143        self._exit_stack.enter_context(api)
144        return api
145
146    def _build_from_discovery_v2(self, api_name, version, *, api_key=None):
147        key_arg = f'&key={api_key}' if api_key else ''
148        api = discovery.build(
149            api_name,
150            version,
151            cache_discovery=False,
152            discoveryServiceUrl=f'{self.v2_discovery_uri}{key_arg}')
153        self._exit_stack.enter_context(api)
154        return api
155
156    def _build_from_file(self, discovery_file):
157        with open(discovery_file, 'r') as f:
158            api = discovery.build_from_document(f.read())
159        self._exit_stack.enter_context(api)
160        return api
161
162
163class Error(Exception):
164    """Base error class for GCP API errors"""
165
166
167class OperationError(Error):
168    """
169    Operation was not successful.
170
171    Assuming Operation based on Google API Style Guide:
172    https://cloud.google.com/apis/design/design_patterns#long_running_operations
173    https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
174    """
175
176    def __init__(self, api_name, operation_response, message=None):
177        self.api_name = api_name
178        operation = json_format.ParseDict(operation_response, Operation())
179        self.name = operation.name or 'unknown'
180        self.error = operation.error
181        self.code_name = code_pb2.Code.Name(operation.error.code)
182        if message is None:
183            message = (f'{api_name} operation "{self.name}" failed. Error '
184                       f'code: {self.error.code} ({self.code_name}), '
185                       f'message: {self.error.message}')
186        self.message = message
187        super().__init__(message)
188
189
190class GcpProjectApiResource:
191    # TODO(sergiitk): move someplace better
192    _WAIT_FOR_OPERATION_SEC = 60 * 5
193    _WAIT_FIXED_SEC = 2
194    _GCP_API_RETRIES = 5
195
196    def __init__(self, api: discovery.Resource, project: str):
197        self.api: discovery.Resource = api
198        self.project: str = project
199
200    @staticmethod
201    def wait_for_operation(operation_request,
202                           test_success_fn,
203                           timeout_sec=_WAIT_FOR_OPERATION_SEC,
204                           wait_sec=_WAIT_FIXED_SEC):
205        retryer = tenacity.Retrying(
206            retry=(tenacity.retry_if_not_result(test_success_fn) |
207                   tenacity.retry_if_exception_type()),
208            wait=tenacity.wait_fixed(wait_sec),
209            stop=tenacity.stop_after_delay(timeout_sec),
210            after=tenacity.after_log(logger, logging.DEBUG),
211            reraise=True)
212        return retryer(operation_request.execute)
213
214    @staticmethod
215    def _resource_pretty_format(body: dict) -> str:
216        """Return a string with pretty-printed resource body."""
217        return yaml.dump(body, explicit_start=True, explicit_end=True)
218
219
220class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
221    GLOBAL_LOCATION = 'global'
222
223    def parent(self, location: Optional[str] = GLOBAL_LOCATION):
224        if location is None:
225            location = self.GLOBAL_LOCATION
226        return f'projects/{self.project}/locations/{location}'
227
228    def resource_full_name(self, name, collection_name):
229        return f'{self.parent()}/{collection_name}/{name}'
230
231    def _create_resource(self, collection: discovery.Resource, body: dict,
232                         **kwargs):
233        logger.info("Creating %s resource:\n%s", self.api_name,
234                    self._resource_pretty_format(body))
235        create_req = collection.create(parent=self.parent(),
236                                       body=body,
237                                       **kwargs)
238        self._execute(create_req)
239
240    @property
241    @abc.abstractmethod
242    def api_name(self) -> str:
243        raise NotImplementedError
244
245    @property
246    @abc.abstractmethod
247    def api_version(self) -> str:
248        raise NotImplementedError
249
250    def _get_resource(self, collection: discovery.Resource, full_name):
251        resource = collection.get(name=full_name).execute()
252        logger.info('Loaded %s:\n%s', full_name,
253                    self._resource_pretty_format(resource))
254        return resource
255
256    def _delete_resource(self, collection: discovery.Resource,
257                         full_name: str) -> bool:
258        logger.debug("Deleting %s", full_name)
259        try:
260            self._execute(collection.delete(name=full_name))
261            return True
262        except googleapiclient.errors.HttpError as error:
263            if error.resp and error.resp.status == 404:
264                logger.info('%s not deleted since it does not exist', full_name)
265            else:
266                logger.warning('Failed to delete %s, %r', full_name, error)
267        return False
268
269    def _execute(self,
270                 request,
271                 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
272        operation = request.execute(num_retries=self._GCP_API_RETRIES)
273        self._wait(operation, timeout_sec)
274
275    def _wait(self,
276              operation,
277              timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
278        op_name = operation['name']
279        logger.debug('Waiting for %s operation, timeout %s sec: %s',
280                     self.api_name, timeout_sec, op_name)
281
282        op_request = self.api.projects().locations().operations().get(
283            name=op_name)
284        operation = self.wait_for_operation(
285            operation_request=op_request,
286            test_success_fn=lambda result: result['done'],
287            timeout_sec=timeout_sec)
288
289        logger.debug('Completed operation: %s', operation)
290        if 'error' in operation:
291            raise OperationError(self.api_name, operation)
292