1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import dataclasses 15import enum 16import logging 17from typing import Any, Dict, Optional 18 19from googleapiclient import discovery 20import googleapiclient.errors 21# TODO(sergiitk): replace with tenacity 22import retrying 23 24from framework.infrastructure import gcp 25 26logger = logging.getLogger(__name__) 27 28 29class ComputeV1(gcp.api.GcpProjectApiResource): 30 # TODO(sergiitk): move someplace better 31 _WAIT_FOR_BACKEND_SEC = 60 * 10 32 _WAIT_FOR_OPERATION_SEC = 60 * 5 33 34 @dataclasses.dataclass(frozen=True) 35 class GcpResource: 36 name: str 37 url: str 38 39 @dataclasses.dataclass(frozen=True) 40 class ZonalGcpResource(GcpResource): 41 zone: str 42 43 def __init__(self, api_manager: gcp.api.GcpApiManager, project: str): 44 super().__init__(api_manager.compute('v1'), project) 45 46 class HealthCheckProtocol(enum.Enum): 47 TCP = enum.auto() 48 GRPC = enum.auto() 49 50 class BackendServiceProtocol(enum.Enum): 51 HTTP2 = enum.auto() 52 GRPC = enum.auto() 53 54 def create_health_check(self, 55 name: str, 56 protocol: HealthCheckProtocol, 57 *, 58 port: Optional[int] = None) -> GcpResource: 59 if protocol is self.HealthCheckProtocol.TCP: 60 health_check_field = 'tcpHealthCheck' 61 elif protocol is self.HealthCheckProtocol.GRPC: 62 health_check_field = 'grpcHealthCheck' 63 else: 64 raise TypeError(f'Unexpected Health Check protocol: {protocol}') 65 66 health_check_settings = {} 67 if port is None: 68 health_check_settings['portSpecification'] = 'USE_SERVING_PORT' 69 else: 70 health_check_settings['portSpecification'] = 'USE_FIXED_PORT' 71 health_check_settings['port'] = port 72 73 return self._insert_resource( 74 self.api.healthChecks(), { 75 'name': name, 76 'type': protocol.name, 77 health_check_field: health_check_settings, 78 }) 79 80 def delete_health_check(self, name): 81 self._delete_resource(self.api.healthChecks(), 'healthCheck', name) 82 83 def create_backend_service_traffic_director( 84 self, 85 name: str, 86 health_check: GcpResource, 87 protocol: Optional[BackendServiceProtocol] = None) -> GcpResource: 88 if not isinstance(protocol, self.BackendServiceProtocol): 89 raise TypeError(f'Unexpected Backend Service protocol: {protocol}') 90 return self._insert_resource( 91 self.api.backendServices(), 92 { 93 'name': name, 94 'loadBalancingScheme': 95 'INTERNAL_SELF_MANAGED', # Traffic Director 96 'healthChecks': [health_check.url], 97 'protocol': protocol.name, 98 }) 99 100 def get_backend_service_traffic_director(self, name: str) -> GcpResource: 101 return self._get_resource(self.api.backendServices(), 102 backendService=name) 103 104 def patch_backend_service(self, backend_service, body, **kwargs): 105 self._patch_resource(collection=self.api.backendServices(), 106 backendService=backend_service.name, 107 body=body, 108 **kwargs) 109 110 def backend_service_add_backends(self, backend_service, backends): 111 backend_list = [{ 112 'group': backend.url, 113 'balancingMode': 'RATE', 114 'maxRatePerEndpoint': 5 115 } for backend in backends] 116 117 self._patch_resource(collection=self.api.backendServices(), 118 body={'backends': backend_list}, 119 backendService=backend_service.name) 120 121 def backend_service_remove_all_backends(self, backend_service): 122 self._patch_resource(collection=self.api.backendServices(), 123 body={'backends': []}, 124 backendService=backend_service.name) 125 126 def delete_backend_service(self, name): 127 self._delete_resource(self.api.backendServices(), 'backendService', 128 name) 129 130 def create_url_map( 131 self, 132 name: str, 133 matcher_name: str, 134 src_hosts, 135 dst_default_backend_service: GcpResource, 136 dst_host_rule_match_backend_service: Optional[GcpResource] = None, 137 ) -> GcpResource: 138 if dst_host_rule_match_backend_service is None: 139 dst_host_rule_match_backend_service = dst_default_backend_service 140 return self._insert_resource( 141 self.api.urlMaps(), { 142 'name': 143 name, 144 'defaultService': 145 dst_default_backend_service.url, 146 'hostRules': [{ 147 'hosts': src_hosts, 148 'pathMatcher': matcher_name, 149 }], 150 'pathMatchers': [{ 151 'name': matcher_name, 152 'defaultService': dst_host_rule_match_backend_service.url, 153 }], 154 }) 155 156 def delete_url_map(self, name): 157 self._delete_resource(self.api.urlMaps(), 'urlMap', name) 158 159 def create_target_grpc_proxy( 160 self, 161 name: str, 162 url_map: GcpResource, 163 ) -> GcpResource: 164 return self._insert_resource(self.api.targetGrpcProxies(), { 165 'name': name, 166 'url_map': url_map.url, 167 'validate_for_proxyless': True, 168 }) 169 170 def delete_target_grpc_proxy(self, name): 171 self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy', 172 name) 173 174 def create_target_http_proxy( 175 self, 176 name: str, 177 url_map: GcpResource, 178 ) -> GcpResource: 179 return self._insert_resource(self.api.targetHttpProxies(), { 180 'name': name, 181 'url_map': url_map.url, 182 }) 183 184 def delete_target_http_proxy(self, name): 185 self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy', 186 name) 187 188 def create_forwarding_rule( 189 self, 190 name: str, 191 src_port: int, 192 target_proxy: GcpResource, 193 network_url: str, 194 ) -> GcpResource: 195 return self._insert_resource( 196 self.api.globalForwardingRules(), 197 { 198 'name': name, 199 'loadBalancingScheme': 200 'INTERNAL_SELF_MANAGED', # Traffic Director 201 'portRange': src_port, 202 'IPAddress': '0.0.0.0', 203 'network': network_url, 204 'target': target_proxy.url, 205 }) 206 207 def delete_forwarding_rule(self, name): 208 self._delete_resource(self.api.globalForwardingRules(), 209 'forwardingRule', name) 210 211 @staticmethod 212 def _network_endpoint_group_not_ready(neg): 213 return not neg or neg.get('size', 0) == 0 214 215 def wait_for_network_endpoint_group(self, name, zone): 216 217 @retrying.retry(retry_on_result=self._network_endpoint_group_not_ready, 218 stop_max_delay=60 * 1000, 219 wait_fixed=2 * 1000) 220 def _wait_for_network_endpoint_group_ready(): 221 try: 222 neg = self.get_network_endpoint_group(name, zone) 223 logger.debug( 224 'Waiting for endpoints: NEG %s in zone %s, ' 225 'current count %s', neg['name'], zone, neg.get('size')) 226 except googleapiclient.errors.HttpError as error: 227 # noinspection PyProtectedMember 228 reason = error._get_reason() 229 logger.debug('Retrying NEG load, got %s, details %s', 230 error.resp.status, reason) 231 raise 232 return neg 233 234 network_endpoint_group = _wait_for_network_endpoint_group_ready() 235 # TODO(sergiitk): dataclass 236 return self.ZonalGcpResource(network_endpoint_group['name'], 237 network_endpoint_group['selfLink'], zone) 238 239 def get_network_endpoint_group(self, name, zone): 240 neg = self.api.networkEndpointGroups().get(project=self.project, 241 networkEndpointGroup=name, 242 zone=zone).execute() 243 # TODO(sergiitk): dataclass 244 return neg 245 246 def wait_for_backends_healthy_status( 247 self, 248 backend_service, 249 backends, 250 timeout_sec=_WAIT_FOR_BACKEND_SEC, 251 wait_sec=4, 252 ): 253 pending = set(backends) 254 255 @retrying.retry(retry_on_result=lambda result: not result, 256 stop_max_delay=timeout_sec * 1000, 257 wait_fixed=wait_sec * 1000) 258 def _retry_backends_health(): 259 for backend in pending: 260 result = self.get_backend_service_backend_health( 261 backend_service, backend) 262 263 if 'healthStatus' not in result: 264 logger.debug('Waiting for instances: backend %s, zone %s', 265 backend.name, backend.zone) 266 continue 267 268 backend_healthy = True 269 for instance in result['healthStatus']: 270 logger.debug( 271 'Backend %s in zone %s: instance %s:%s health: %s', 272 backend.name, backend.zone, instance['ipAddress'], 273 instance['port'], instance['healthState']) 274 if instance['healthState'] != 'HEALTHY': 275 backend_healthy = False 276 277 if backend_healthy: 278 logger.info('Backend %s in zone %s reported healthy', 279 backend.name, backend.zone) 280 pending.remove(backend) 281 282 return not pending 283 284 _retry_backends_health() 285 286 def get_backend_service_backend_health(self, backend_service, backend): 287 return self.api.backendServices().getHealth( 288 project=self.project, 289 backendService=backend_service.name, 290 body={ 291 "group": backend.url 292 }).execute() 293 294 def _get_resource(self, collection: discovery.Resource, 295 **kwargs) -> GcpResource: 296 resp = collection.get(project=self.project, **kwargs).execute() 297 logger.info('Loaded compute resource:\n%s', 298 self._resource_pretty_format(resp)) 299 return self.GcpResource(resp['name'], resp['selfLink']) 300 301 def _insert_resource(self, collection: discovery.Resource, 302 body: Dict[str, Any]) -> GcpResource: 303 logger.info('Creating compute resource:\n%s', 304 self._resource_pretty_format(body)) 305 resp = self._execute(collection.insert(project=self.project, body=body)) 306 return self.GcpResource(body['name'], resp['targetLink']) 307 308 def _patch_resource(self, collection, body, **kwargs): 309 logger.info('Patching compute resource:\n%s', 310 self._resource_pretty_format(body)) 311 self._execute( 312 collection.patch(project=self.project, body=body, **kwargs)) 313 314 def _delete_resource(self, collection: discovery.Resource, 315 resource_type: str, resource_name: str) -> bool: 316 try: 317 params = {"project": self.project, resource_type: resource_name} 318 self._execute(collection.delete(**params)) 319 return True 320 except googleapiclient.errors.HttpError as error: 321 if error.resp and error.resp.status == 404: 322 logger.info( 323 'Resource %s "%s" not deleted since it does not exist', 324 resource_type, resource_name) 325 else: 326 logger.warning('Failed to delete %s "%s", %r', resource_type, 327 resource_name, error) 328 return False 329 330 @staticmethod 331 def _operation_status_done(operation): 332 return 'status' in operation and operation['status'] == 'DONE' 333 334 def _execute(self, 335 request, 336 *, 337 test_success_fn=None, 338 timeout_sec=_WAIT_FOR_OPERATION_SEC): 339 operation = request.execute(num_retries=self._GCP_API_RETRIES) 340 logger.debug('Response %s', operation) 341 342 # TODO(sergiitk) try using wait() here 343 # https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait 344 operation_request = self.api.globalOperations().get( 345 project=self.project, operation=operation['name']) 346 347 if test_success_fn is None: 348 test_success_fn = self._operation_status_done 349 350 logger.debug('Waiting for global operation %s, timeout %s sec', 351 operation['name'], timeout_sec) 352 response = self.wait_for_operation(operation_request=operation_request, 353 test_success_fn=test_success_fn, 354 timeout_sec=timeout_sec) 355 356 if 'error' in response: 357 logger.debug('Waiting for global operation failed, response: %r', 358 response) 359 raise Exception(f'Operation {operation["name"]} did not complete ' 360 f'within {timeout_sec}s, error={response["error"]}') 361 return response 362