1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import abc 15import contextlib 16import functools 17import logging 18from typing import Optional 19 20# Workaround: `grpc` must be imported before `google.protobuf.json_format`, 21# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897 22# TODO(sergiitk): Remove after #24897 is solved 23import grpc # noqa pylint: disable=unused-import 24from absl import flags 25from google.cloud import secretmanager_v1 26from google.longrunning import operations_pb2 27from google.protobuf import json_format 28from google.rpc import code_pb2 29from googleapiclient import discovery 30import googleapiclient.errors 31import tenacity 32import yaml 33 34logger = logging.getLogger(__name__) 35PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string( 36 "private_api_key_secret_name", 37 default=None, 38 help="Load Private API access key from the latest version of the secret " 39 "with the given name, in the format projects/*/secrets/*") 40V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri", 41 default=discovery.V1_DISCOVERY_URI, 42 help="Override v1 Discovery URI") 43V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri", 44 default=discovery.V2_DISCOVERY_URI, 45 help="Override v2 Discovery URI") 46COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string( 47 "compute_v1_discovery_file", 48 default=None, 49 help="Load compute v1 from discovery file") 50 51# Type aliases 52Operation = operations_pb2.Operation 53 54 55class GcpApiManager: 56 57 def __init__(self, 58 *, 59 v1_discovery_uri=None, 60 v2_discovery_uri=None, 61 compute_v1_discovery_file=None, 62 private_api_key_secret_name=None): 63 self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value 64 self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value 65 self.compute_v1_discovery_file = (compute_v1_discovery_file or 66 COMPUTE_V1_DISCOVERY_FILE.value) 67 self.private_api_key_secret_name = (private_api_key_secret_name or 68 PRIVATE_API_KEY_SECRET_NAME.value) 69 # TODO(sergiitk): add options to pass google Credentials 70 self._exit_stack = contextlib.ExitStack() 71 72 def close(self): 73 self._exit_stack.close() 74 75 @property 76 @functools.lru_cache(None) 77 def private_api_key(self): 78 """ 79 Private API key. 80 81 Return API key credential that identifies a GCP project allow-listed for 82 accessing private API discovery documents. 83 https://pantheon.corp.google.com/apis/credentials 84 85 This method lazy-loads the content of the key from the Secret Manager. 86 https://pantheon.corp.google.com/security/secret-manager 87 """ 88 if not self.private_api_key_secret_name: 89 raise ValueError('private_api_key_secret_name must be set to ' 90 'access private_api_key.') 91 92 secrets_api = self.secrets('v1') 93 version_resource_path = secrets_api.secret_version_path( 94 **secrets_api.parse_secret_path(self.private_api_key_secret_name), 95 secret_version='latest') 96 secret: secretmanager_v1.AccessSecretVersionResponse 97 secret = secrets_api.access_secret_version(name=version_resource_path) 98 return secret.payload.data.decode() 99 100 @functools.lru_cache(None) 101 def compute(self, version): 102 api_name = 'compute' 103 if version == 'v1': 104 if self.compute_v1_discovery_file: 105 return self._build_from_file(self.compute_v1_discovery_file) 106 else: 107 return self._build_from_discovery_v1(api_name, version) 108 109 raise NotImplementedError(f'Compute {version} not supported') 110 111 @functools.lru_cache(None) 112 def networksecurity(self, version): 113 api_name = 'networksecurity' 114 if version == 'v1alpha1': 115 return self._build_from_discovery_v2(api_name, 116 version, 117 api_key=self.private_api_key) 118 119 raise NotImplementedError(f'Network Security {version} not supported') 120 121 @functools.lru_cache(None) 122 def networkservices(self, version): 123 api_name = 'networkservices' 124 if version == 'v1alpha1': 125 return self._build_from_discovery_v2(api_name, 126 version, 127 api_key=self.private_api_key) 128 129 raise NotImplementedError(f'Network Services {version} not supported') 130 131 @functools.lru_cache(None) 132 def secrets(self, version): 133 if version == 'v1': 134 return secretmanager_v1.SecretManagerServiceClient() 135 136 raise NotImplementedError(f'Secrets Manager {version} not supported') 137 138 def _build_from_discovery_v1(self, api_name, version): 139 api = discovery.build(api_name, 140 version, 141 cache_discovery=False, 142 discoveryServiceUrl=self.v1_discovery_uri) 143 self._exit_stack.enter_context(api) 144 return api 145 146 def _build_from_discovery_v2(self, api_name, version, *, api_key=None): 147 key_arg = f'&key={api_key}' if api_key else '' 148 api = discovery.build( 149 api_name, 150 version, 151 cache_discovery=False, 152 discoveryServiceUrl=f'{self.v2_discovery_uri}{key_arg}') 153 self._exit_stack.enter_context(api) 154 return api 155 156 def _build_from_file(self, discovery_file): 157 with open(discovery_file, 'r') as f: 158 api = discovery.build_from_document(f.read()) 159 self._exit_stack.enter_context(api) 160 return api 161 162 163class Error(Exception): 164 """Base error class for GCP API errors""" 165 166 167class OperationError(Error): 168 """ 169 Operation was not successful. 170 171 Assuming Operation based on Google API Style Guide: 172 https://cloud.google.com/apis/design/design_patterns#long_running_operations 173 https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto 174 """ 175 176 def __init__(self, api_name, operation_response, message=None): 177 self.api_name = api_name 178 operation = json_format.ParseDict(operation_response, Operation()) 179 self.name = operation.name or 'unknown' 180 self.error = operation.error 181 self.code_name = code_pb2.Code.Name(operation.error.code) 182 if message is None: 183 message = (f'{api_name} operation "{self.name}" failed. Error ' 184 f'code: {self.error.code} ({self.code_name}), ' 185 f'message: {self.error.message}') 186 self.message = message 187 super().__init__(message) 188 189 190class GcpProjectApiResource: 191 # TODO(sergiitk): move someplace better 192 _WAIT_FOR_OPERATION_SEC = 60 * 5 193 _WAIT_FIXED_SEC = 2 194 _GCP_API_RETRIES = 5 195 196 def __init__(self, api: discovery.Resource, project: str): 197 self.api: discovery.Resource = api 198 self.project: str = project 199 200 @staticmethod 201 def wait_for_operation(operation_request, 202 test_success_fn, 203 timeout_sec=_WAIT_FOR_OPERATION_SEC, 204 wait_sec=_WAIT_FIXED_SEC): 205 retryer = tenacity.Retrying( 206 retry=(tenacity.retry_if_not_result(test_success_fn) | 207 tenacity.retry_if_exception_type()), 208 wait=tenacity.wait_fixed(wait_sec), 209 stop=tenacity.stop_after_delay(timeout_sec), 210 after=tenacity.after_log(logger, logging.DEBUG), 211 reraise=True) 212 return retryer(operation_request.execute) 213 214 @staticmethod 215 def _resource_pretty_format(body: dict) -> str: 216 """Return a string with pretty-printed resource body.""" 217 return yaml.dump(body, explicit_start=True, explicit_end=True) 218 219 220class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta): 221 GLOBAL_LOCATION = 'global' 222 223 def parent(self, location: Optional[str] = GLOBAL_LOCATION): 224 if location is None: 225 location = self.GLOBAL_LOCATION 226 return f'projects/{self.project}/locations/{location}' 227 228 def resource_full_name(self, name, collection_name): 229 return f'{self.parent()}/{collection_name}/{name}' 230 231 def _create_resource(self, collection: discovery.Resource, body: dict, 232 **kwargs): 233 logger.info("Creating %s resource:\n%s", self.api_name, 234 self._resource_pretty_format(body)) 235 create_req = collection.create(parent=self.parent(), 236 body=body, 237 **kwargs) 238 self._execute(create_req) 239 240 @property 241 @abc.abstractmethod 242 def api_name(self) -> str: 243 raise NotImplementedError 244 245 @property 246 @abc.abstractmethod 247 def api_version(self) -> str: 248 raise NotImplementedError 249 250 def _get_resource(self, collection: discovery.Resource, full_name): 251 resource = collection.get(name=full_name).execute() 252 logger.info('Loaded %s:\n%s', full_name, 253 self._resource_pretty_format(resource)) 254 return resource 255 256 def _delete_resource(self, collection: discovery.Resource, 257 full_name: str) -> bool: 258 logger.debug("Deleting %s", full_name) 259 try: 260 self._execute(collection.delete(name=full_name)) 261 return True 262 except googleapiclient.errors.HttpError as error: 263 if error.resp and error.resp.status == 404: 264 logger.info('%s not deleted since it does not exist', full_name) 265 else: 266 logger.warning('Failed to delete %s, %r', full_name, error) 267 return False 268 269 def _execute(self, 270 request, 271 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): 272 operation = request.execute(num_retries=self._GCP_API_RETRIES) 273 self._wait(operation, timeout_sec) 274 275 def _wait(self, 276 operation, 277 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): 278 op_name = operation['name'] 279 logger.debug('Waiting for %s operation, timeout %s sec: %s', 280 self.api_name, timeout_sec, op_name) 281 282 op_request = self.api.projects().locations().operations().get( 283 name=op_name) 284 operation = self.wait_for_operation( 285 operation_request=op_request, 286 test_success_fn=lambda result: result['done'], 287 timeout_sec=timeout_sec) 288 289 logger.debug('Completed operation: %s', operation) 290 if 'error' in operation: 291 raise OperationError(self.api_name, operation) 292