• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import abc
15import contextlib
16import functools
17import logging
18from typing import Optional, List
19
20# Workaround: `grpc` must be imported before `google.protobuf.json_format`,
21# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897
22# TODO(sergiitk): Remove after #24897 is solved
23import grpc  # noqa pylint: disable=unused-import
24from absl import flags
25from google.cloud import secretmanager_v1
26from google.longrunning import operations_pb2
27from google.protobuf import json_format
28from google.rpc import code_pb2
29from googleapiclient import discovery
30import googleapiclient.errors
31import tenacity
32import yaml
33
34logger = logging.getLogger(__name__)
35PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string(
36    "private_api_key_secret_name",
37    default=None,
38    help="Load Private API access key from the latest version of the secret "
39    "with the given name, in the format projects/*/secrets/*")
40V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri",
41                                       default=discovery.V1_DISCOVERY_URI,
42                                       help="Override v1 Discovery URI")
43V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri",
44                                       default=discovery.V2_DISCOVERY_URI,
45                                       help="Override v2 Discovery URI")
46COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string(
47    "compute_v1_discovery_file",
48    default=None,
49    help="Load compute v1 from discovery file")
50
51# Type aliases
52Operation = operations_pb2.Operation
53
54
55class GcpApiManager:
56
57    def __init__(self,
58                 *,
59                 v1_discovery_uri=None,
60                 v2_discovery_uri=None,
61                 compute_v1_discovery_file=None,
62                 private_api_key_secret_name=None):
63        self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value
64        self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value
65        self.compute_v1_discovery_file = (compute_v1_discovery_file or
66                                          COMPUTE_V1_DISCOVERY_FILE.value)
67        self.private_api_key_secret_name = (private_api_key_secret_name or
68                                            PRIVATE_API_KEY_SECRET_NAME.value)
69        # TODO(sergiitk): add options to pass google Credentials
70        self._exit_stack = contextlib.ExitStack()
71
72    def close(self):
73        self._exit_stack.close()
74
75    @property
76    @functools.lru_cache(None)
77    def private_api_key(self):
78        """
79        Private API key.
80
81        Return API key credential that identifies a GCP project allow-listed for
82        accessing private API discovery documents.
83        https://pantheon.corp.google.com/apis/credentials
84
85        This method lazy-loads the content of the key from the Secret Manager.
86        https://pantheon.corp.google.com/security/secret-manager
87        """
88        if not self.private_api_key_secret_name:
89            raise ValueError('private_api_key_secret_name must be set to '
90                             'access private_api_key.')
91
92        secrets_api = self.secrets('v1')
93        version_resource_path = secrets_api.secret_version_path(
94            **secrets_api.parse_secret_path(self.private_api_key_secret_name),
95            secret_version='latest')
96        secret: secretmanager_v1.AccessSecretVersionResponse
97        secret = secrets_api.access_secret_version(name=version_resource_path)
98        return secret.payload.data.decode()
99
100    @functools.lru_cache(None)
101    def compute(self, version):
102        api_name = 'compute'
103        if version == 'v1':
104            if self.compute_v1_discovery_file:
105                return self._build_from_file(self.compute_v1_discovery_file)
106            else:
107                return self._build_from_discovery_v1(api_name, version)
108
109        raise NotImplementedError(f'Compute {version} not supported')
110
111    @functools.lru_cache(None)
112    def networksecurity(self, version):
113        api_name = 'networksecurity'
114        if version == 'v1alpha1':
115            return self._build_from_discovery_v2(
116                api_name,
117                version,
118                api_key=self.private_api_key,
119                visibility_labels=['NETWORKSECURITY_ALPHA'])
120
121        raise NotImplementedError(f'Network Security {version} not supported')
122
123    @functools.lru_cache(None)
124    def networkservices(self, version):
125        api_name = 'networkservices'
126        if version == 'v1alpha1':
127            return self._build_from_discovery_v2(
128                api_name,
129                version,
130                api_key=self.private_api_key,
131                visibility_labels=['NETWORKSERVICES_ALPHA'])
132
133        raise NotImplementedError(f'Network Services {version} not supported')
134
135    @functools.lru_cache(None)
136    def secrets(self, version):
137        if version == 'v1':
138            return secretmanager_v1.SecretManagerServiceClient()
139
140        raise NotImplementedError(f'Secret Manager {version} not supported')
141
142    def _build_from_discovery_v1(self, api_name, version):
143        api = discovery.build(api_name,
144                              version,
145                              cache_discovery=False,
146                              discoveryServiceUrl=self.v1_discovery_uri)
147        self._exit_stack.enter_context(api)
148        return api
149
150    def _build_from_discovery_v2(self,
151                                 api_name,
152                                 version,
153                                 *,
154                                 api_key: Optional[str] = None,
155                                 visibility_labels: Optional[List] = None):
156        params = {}
157        if api_key:
158            params['key'] = api_key
159        if visibility_labels:
160            # Dash-separated list of labels.
161            params['labels'] = '_'.join(visibility_labels)
162
163        params_str = ''
164        if params:
165            params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items()))
166
167        api = discovery.build(
168            api_name,
169            version,
170            cache_discovery=False,
171            discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}')
172        self._exit_stack.enter_context(api)
173        return api
174
175    def _build_from_file(self, discovery_file):
176        with open(discovery_file, 'r') as f:
177            api = discovery.build_from_document(f.read())
178        self._exit_stack.enter_context(api)
179        return api
180
181
182class Error(Exception):
183    """Base error class for GCP API errors"""
184
185
186class OperationError(Error):
187    """
188    Operation was not successful.
189
190    Assuming Operation based on Google API Style Guide:
191    https://cloud.google.com/apis/design/design_patterns#long_running_operations
192    https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto
193    """
194
195    def __init__(self, api_name, operation_response, message=None):
196        self.api_name = api_name
197        operation = json_format.ParseDict(operation_response, Operation())
198        self.name = operation.name or 'unknown'
199        self.error = operation.error
200        self.code_name = code_pb2.Code.Name(operation.error.code)
201        if message is None:
202            message = (f'{api_name} operation "{self.name}" failed. Error '
203                       f'code: {self.error.code} ({self.code_name}), '
204                       f'message: {self.error.message}')
205        self.message = message
206        super().__init__(message)
207
208
209class GcpProjectApiResource:
210    # TODO(sergiitk): move someplace better
211    _WAIT_FOR_OPERATION_SEC = 60 * 5
212    _WAIT_FIXED_SEC = 2
213    _GCP_API_RETRIES = 5
214
215    def __init__(self, api: discovery.Resource, project: str):
216        self.api: discovery.Resource = api
217        self.project: str = project
218
219    @staticmethod
220    def wait_for_operation(operation_request,
221                           test_success_fn,
222                           timeout_sec=_WAIT_FOR_OPERATION_SEC,
223                           wait_sec=_WAIT_FIXED_SEC):
224        retryer = tenacity.Retrying(
225            retry=(tenacity.retry_if_not_result(test_success_fn) |
226                   tenacity.retry_if_exception_type()),
227            wait=tenacity.wait_fixed(wait_sec),
228            stop=tenacity.stop_after_delay(timeout_sec),
229            after=tenacity.after_log(logger, logging.DEBUG),
230            reraise=True)
231        return retryer(operation_request.execute)
232
233    @staticmethod
234    def _resource_pretty_format(body: dict) -> str:
235        """Return a string with pretty-printed resource body."""
236        return yaml.dump(body, explicit_start=True, explicit_end=True)
237
238
239class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta):
240    GLOBAL_LOCATION = 'global'
241
242    def parent(self, location: Optional[str] = GLOBAL_LOCATION):
243        if location is None:
244            location = self.GLOBAL_LOCATION
245        return f'projects/{self.project}/locations/{location}'
246
247    def resource_full_name(self, name, collection_name):
248        return f'{self.parent()}/{collection_name}/{name}'
249
250    def _create_resource(self, collection: discovery.Resource, body: dict,
251                         **kwargs):
252        logger.info("Creating %s resource:\n%s", self.api_name,
253                    self._resource_pretty_format(body))
254        create_req = collection.create(parent=self.parent(),
255                                       body=body,
256                                       **kwargs)
257        self._execute(create_req)
258
259    @property
260    @abc.abstractmethod
261    def api_name(self) -> str:
262        raise NotImplementedError
263
264    @property
265    @abc.abstractmethod
266    def api_version(self) -> str:
267        raise NotImplementedError
268
269    def _get_resource(self, collection: discovery.Resource, full_name):
270        resource = collection.get(name=full_name).execute()
271        logger.info('Loaded %s:\n%s', full_name,
272                    self._resource_pretty_format(resource))
273        return resource
274
275    def _delete_resource(self, collection: discovery.Resource,
276                         full_name: str) -> bool:
277        logger.debug("Deleting %s", full_name)
278        try:
279            self._execute(collection.delete(name=full_name))
280            return True
281        except googleapiclient.errors.HttpError as error:
282            if error.resp and error.resp.status == 404:
283                logger.info('%s not deleted since it does not exist', full_name)
284            else:
285                logger.warning('Failed to delete %s, %r', full_name, error)
286        return False
287
288    def _execute(self,
289                 request,
290                 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
291        operation = request.execute(num_retries=self._GCP_API_RETRIES)
292        self._wait(operation, timeout_sec)
293
294    def _wait(self,
295              operation,
296              timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC):
297        op_name = operation['name']
298        logger.debug('Waiting for %s operation, timeout %s sec: %s',
299                     self.api_name, timeout_sec, op_name)
300
301        op_request = self.api.projects().locations().operations().get(
302            name=op_name)
303        operation = self.wait_for_operation(
304            operation_request=op_request,
305            test_success_fn=lambda result: result['done'],
306            timeout_sec=timeout_sec)
307
308        logger.debug('Completed operation: %s', operation)
309        if 'error' in operation:
310            raise OperationError(self.api_name, operation)
311