1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import abc 15import contextlib 16import functools 17import logging 18from typing import Optional, List 19 20# Workaround: `grpc` must be imported before `google.protobuf.json_format`, 21# to prevent "Segmentation fault". Ref https://github.com/grpc/grpc/issues/24897 22# TODO(sergiitk): Remove after #24897 is solved 23import grpc # noqa pylint: disable=unused-import 24from absl import flags 25from google.cloud import secretmanager_v1 26from google.longrunning import operations_pb2 27from google.protobuf import json_format 28from google.rpc import code_pb2 29from googleapiclient import discovery 30import googleapiclient.errors 31import tenacity 32import yaml 33 34logger = logging.getLogger(__name__) 35PRIVATE_API_KEY_SECRET_NAME = flags.DEFINE_string( 36 "private_api_key_secret_name", 37 default=None, 38 help="Load Private API access key from the latest version of the secret " 39 "with the given name, in the format projects/*/secrets/*") 40V1_DISCOVERY_URI = flags.DEFINE_string("v1_discovery_uri", 41 default=discovery.V1_DISCOVERY_URI, 42 help="Override v1 Discovery URI") 43V2_DISCOVERY_URI = flags.DEFINE_string("v2_discovery_uri", 44 default=discovery.V2_DISCOVERY_URI, 45 help="Override v2 Discovery URI") 46COMPUTE_V1_DISCOVERY_FILE = flags.DEFINE_string( 47 "compute_v1_discovery_file", 48 default=None, 49 help="Load compute v1 from discovery file") 50 51# Type aliases 52Operation = operations_pb2.Operation 53 54 55class GcpApiManager: 56 57 def __init__(self, 58 *, 59 v1_discovery_uri=None, 60 v2_discovery_uri=None, 61 compute_v1_discovery_file=None, 62 private_api_key_secret_name=None): 63 self.v1_discovery_uri = v1_discovery_uri or V1_DISCOVERY_URI.value 64 self.v2_discovery_uri = v2_discovery_uri or V2_DISCOVERY_URI.value 65 self.compute_v1_discovery_file = (compute_v1_discovery_file or 66 COMPUTE_V1_DISCOVERY_FILE.value) 67 self.private_api_key_secret_name = (private_api_key_secret_name or 68 PRIVATE_API_KEY_SECRET_NAME.value) 69 # TODO(sergiitk): add options to pass google Credentials 70 self._exit_stack = contextlib.ExitStack() 71 72 def close(self): 73 self._exit_stack.close() 74 75 @property 76 @functools.lru_cache(None) 77 def private_api_key(self): 78 """ 79 Private API key. 80 81 Return API key credential that identifies a GCP project allow-listed for 82 accessing private API discovery documents. 83 https://pantheon.corp.google.com/apis/credentials 84 85 This method lazy-loads the content of the key from the Secret Manager. 86 https://pantheon.corp.google.com/security/secret-manager 87 """ 88 if not self.private_api_key_secret_name: 89 raise ValueError('private_api_key_secret_name must be set to ' 90 'access private_api_key.') 91 92 secrets_api = self.secrets('v1') 93 version_resource_path = secrets_api.secret_version_path( 94 **secrets_api.parse_secret_path(self.private_api_key_secret_name), 95 secret_version='latest') 96 secret: secretmanager_v1.AccessSecretVersionResponse 97 secret = secrets_api.access_secret_version(name=version_resource_path) 98 return secret.payload.data.decode() 99 100 @functools.lru_cache(None) 101 def compute(self, version): 102 api_name = 'compute' 103 if version == 'v1': 104 if self.compute_v1_discovery_file: 105 return self._build_from_file(self.compute_v1_discovery_file) 106 else: 107 return self._build_from_discovery_v1(api_name, version) 108 109 raise NotImplementedError(f'Compute {version} not supported') 110 111 @functools.lru_cache(None) 112 def networksecurity(self, version): 113 api_name = 'networksecurity' 114 if version == 'v1alpha1': 115 return self._build_from_discovery_v2( 116 api_name, 117 version, 118 api_key=self.private_api_key, 119 visibility_labels=['NETWORKSECURITY_ALPHA']) 120 121 raise NotImplementedError(f'Network Security {version} not supported') 122 123 @functools.lru_cache(None) 124 def networkservices(self, version): 125 api_name = 'networkservices' 126 if version == 'v1alpha1': 127 return self._build_from_discovery_v2( 128 api_name, 129 version, 130 api_key=self.private_api_key, 131 visibility_labels=['NETWORKSERVICES_ALPHA']) 132 133 raise NotImplementedError(f'Network Services {version} not supported') 134 135 @functools.lru_cache(None) 136 def secrets(self, version): 137 if version == 'v1': 138 return secretmanager_v1.SecretManagerServiceClient() 139 140 raise NotImplementedError(f'Secret Manager {version} not supported') 141 142 def _build_from_discovery_v1(self, api_name, version): 143 api = discovery.build(api_name, 144 version, 145 cache_discovery=False, 146 discoveryServiceUrl=self.v1_discovery_uri) 147 self._exit_stack.enter_context(api) 148 return api 149 150 def _build_from_discovery_v2(self, 151 api_name, 152 version, 153 *, 154 api_key: Optional[str] = None, 155 visibility_labels: Optional[List] = None): 156 params = {} 157 if api_key: 158 params['key'] = api_key 159 if visibility_labels: 160 # Dash-separated list of labels. 161 params['labels'] = '_'.join(visibility_labels) 162 163 params_str = '' 164 if params: 165 params_str = '&' + ('&'.join(f'{k}={v}' for k, v in params.items())) 166 167 api = discovery.build( 168 api_name, 169 version, 170 cache_discovery=False, 171 discoveryServiceUrl=f'{self.v2_discovery_uri}{params_str}') 172 self._exit_stack.enter_context(api) 173 return api 174 175 def _build_from_file(self, discovery_file): 176 with open(discovery_file, 'r') as f: 177 api = discovery.build_from_document(f.read()) 178 self._exit_stack.enter_context(api) 179 return api 180 181 182class Error(Exception): 183 """Base error class for GCP API errors""" 184 185 186class OperationError(Error): 187 """ 188 Operation was not successful. 189 190 Assuming Operation based on Google API Style Guide: 191 https://cloud.google.com/apis/design/design_patterns#long_running_operations 192 https://github.com/googleapis/googleapis/blob/master/google/longrunning/operations.proto 193 """ 194 195 def __init__(self, api_name, operation_response, message=None): 196 self.api_name = api_name 197 operation = json_format.ParseDict(operation_response, Operation()) 198 self.name = operation.name or 'unknown' 199 self.error = operation.error 200 self.code_name = code_pb2.Code.Name(operation.error.code) 201 if message is None: 202 message = (f'{api_name} operation "{self.name}" failed. Error ' 203 f'code: {self.error.code} ({self.code_name}), ' 204 f'message: {self.error.message}') 205 self.message = message 206 super().__init__(message) 207 208 209class GcpProjectApiResource: 210 # TODO(sergiitk): move someplace better 211 _WAIT_FOR_OPERATION_SEC = 60 * 5 212 _WAIT_FIXED_SEC = 2 213 _GCP_API_RETRIES = 5 214 215 def __init__(self, api: discovery.Resource, project: str): 216 self.api: discovery.Resource = api 217 self.project: str = project 218 219 @staticmethod 220 def wait_for_operation(operation_request, 221 test_success_fn, 222 timeout_sec=_WAIT_FOR_OPERATION_SEC, 223 wait_sec=_WAIT_FIXED_SEC): 224 retryer = tenacity.Retrying( 225 retry=(tenacity.retry_if_not_result(test_success_fn) | 226 tenacity.retry_if_exception_type()), 227 wait=tenacity.wait_fixed(wait_sec), 228 stop=tenacity.stop_after_delay(timeout_sec), 229 after=tenacity.after_log(logger, logging.DEBUG), 230 reraise=True) 231 return retryer(operation_request.execute) 232 233 @staticmethod 234 def _resource_pretty_format(body: dict) -> str: 235 """Return a string with pretty-printed resource body.""" 236 return yaml.dump(body, explicit_start=True, explicit_end=True) 237 238 239class GcpStandardCloudApiResource(GcpProjectApiResource, metaclass=abc.ABCMeta): 240 GLOBAL_LOCATION = 'global' 241 242 def parent(self, location: Optional[str] = GLOBAL_LOCATION): 243 if location is None: 244 location = self.GLOBAL_LOCATION 245 return f'projects/{self.project}/locations/{location}' 246 247 def resource_full_name(self, name, collection_name): 248 return f'{self.parent()}/{collection_name}/{name}' 249 250 def _create_resource(self, collection: discovery.Resource, body: dict, 251 **kwargs): 252 logger.info("Creating %s resource:\n%s", self.api_name, 253 self._resource_pretty_format(body)) 254 create_req = collection.create(parent=self.parent(), 255 body=body, 256 **kwargs) 257 self._execute(create_req) 258 259 @property 260 @abc.abstractmethod 261 def api_name(self) -> str: 262 raise NotImplementedError 263 264 @property 265 @abc.abstractmethod 266 def api_version(self) -> str: 267 raise NotImplementedError 268 269 def _get_resource(self, collection: discovery.Resource, full_name): 270 resource = collection.get(name=full_name).execute() 271 logger.info('Loaded %s:\n%s', full_name, 272 self._resource_pretty_format(resource)) 273 return resource 274 275 def _delete_resource(self, collection: discovery.Resource, 276 full_name: str) -> bool: 277 logger.debug("Deleting %s", full_name) 278 try: 279 self._execute(collection.delete(name=full_name)) 280 return True 281 except googleapiclient.errors.HttpError as error: 282 if error.resp and error.resp.status == 404: 283 logger.info('%s not deleted since it does not exist', full_name) 284 else: 285 logger.warning('Failed to delete %s, %r', full_name, error) 286 return False 287 288 def _execute(self, 289 request, 290 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): 291 operation = request.execute(num_retries=self._GCP_API_RETRIES) 292 self._wait(operation, timeout_sec) 293 294 def _wait(self, 295 operation, 296 timeout_sec=GcpProjectApiResource._WAIT_FOR_OPERATION_SEC): 297 op_name = operation['name'] 298 logger.debug('Waiting for %s operation, timeout %s sec: %s', 299 self.api_name, timeout_sec, op_name) 300 301 op_request = self.api.projects().locations().operations().get( 302 name=op_name) 303 operation = self.wait_for_operation( 304 operation_request=op_request, 305 test_success_fn=lambda result: result['done'], 306 timeout_sec=timeout_sec) 307 308 logger.debug('Completed operation: %s', operation) 309 if 'error' in operation: 310 raise OperationError(self.api_name, operation) 311