• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import dataclasses
15import enum
16import logging
17from typing import Any, Dict, Optional
18
19from googleapiclient import discovery
20import googleapiclient.errors
21# TODO(sergiitk): replace with tenacity
22import retrying
23
24from framework.infrastructure import gcp
25
26logger = logging.getLogger(__name__)
27
28
29class ComputeV1(gcp.api.GcpProjectApiResource):
30    # TODO(sergiitk): move someplace better
31    _WAIT_FOR_BACKEND_SEC = 60 * 10
32    _WAIT_FOR_OPERATION_SEC = 60 * 5
33
34    @dataclasses.dataclass(frozen=True)
35    class GcpResource:
36        name: str
37        url: str
38
39    @dataclasses.dataclass(frozen=True)
40    class ZonalGcpResource(GcpResource):
41        zone: str
42
43    def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
44        super().__init__(api_manager.compute('v1'), project)
45
46    class HealthCheckProtocol(enum.Enum):
47        TCP = enum.auto()
48        GRPC = enum.auto()
49
50    class BackendServiceProtocol(enum.Enum):
51        HTTP2 = enum.auto()
52        GRPC = enum.auto()
53
54    def create_health_check(self,
55                            name: str,
56                            protocol: HealthCheckProtocol,
57                            *,
58                            port: Optional[int] = None) -> GcpResource:
59        if protocol is self.HealthCheckProtocol.TCP:
60            health_check_field = 'tcpHealthCheck'
61        elif protocol is self.HealthCheckProtocol.GRPC:
62            health_check_field = 'grpcHealthCheck'
63        else:
64            raise TypeError(f'Unexpected Health Check protocol: {protocol}')
65
66        health_check_settings = {}
67        if port is None:
68            health_check_settings['portSpecification'] = 'USE_SERVING_PORT'
69        else:
70            health_check_settings['portSpecification'] = 'USE_FIXED_PORT'
71            health_check_settings['port'] = port
72
73        return self._insert_resource(
74            self.api.healthChecks(), {
75                'name': name,
76                'type': protocol.name,
77                health_check_field: health_check_settings,
78            })
79
80    def delete_health_check(self, name):
81        self._delete_resource(self.api.healthChecks(), 'healthCheck', name)
82
83    def create_backend_service_traffic_director(
84            self,
85            name: str,
86            health_check: GcpResource,
87            protocol: Optional[BackendServiceProtocol] = None) -> GcpResource:
88        if not isinstance(protocol, self.BackendServiceProtocol):
89            raise TypeError(f'Unexpected Backend Service protocol: {protocol}')
90        return self._insert_resource(
91            self.api.backendServices(),
92            {
93                'name': name,
94                'loadBalancingScheme':
95                    'INTERNAL_SELF_MANAGED',  # Traffic Director
96                'healthChecks': [health_check.url],
97                'protocol': protocol.name,
98            })
99
100    def get_backend_service_traffic_director(self, name: str) -> GcpResource:
101        return self._get_resource(self.api.backendServices(),
102                                  backendService=name)
103
104    def patch_backend_service(self, backend_service, body, **kwargs):
105        self._patch_resource(collection=self.api.backendServices(),
106                             backendService=backend_service.name,
107                             body=body,
108                             **kwargs)
109
110    def backend_service_add_backends(self, backend_service, backends):
111        backend_list = [{
112            'group': backend.url,
113            'balancingMode': 'RATE',
114            'maxRatePerEndpoint': 5
115        } for backend in backends]
116
117        self._patch_resource(collection=self.api.backendServices(),
118                             body={'backends': backend_list},
119                             backendService=backend_service.name)
120
121    def backend_service_remove_all_backends(self, backend_service):
122        self._patch_resource(collection=self.api.backendServices(),
123                             body={'backends': []},
124                             backendService=backend_service.name)
125
126    def delete_backend_service(self, name):
127        self._delete_resource(self.api.backendServices(), 'backendService',
128                              name)
129
130    def create_url_map(
131        self,
132        name: str,
133        matcher_name: str,
134        src_hosts,
135        dst_default_backend_service: GcpResource,
136        dst_host_rule_match_backend_service: Optional[GcpResource] = None,
137    ) -> GcpResource:
138        if dst_host_rule_match_backend_service is None:
139            dst_host_rule_match_backend_service = dst_default_backend_service
140        return self._insert_resource(
141            self.api.urlMaps(), {
142                'name':
143                    name,
144                'defaultService':
145                    dst_default_backend_service.url,
146                'hostRules': [{
147                    'hosts': src_hosts,
148                    'pathMatcher': matcher_name,
149                }],
150                'pathMatchers': [{
151                    'name': matcher_name,
152                    'defaultService': dst_host_rule_match_backend_service.url,
153                }],
154            })
155
156    def delete_url_map(self, name):
157        self._delete_resource(self.api.urlMaps(), 'urlMap', name)
158
159    def create_target_grpc_proxy(
160        self,
161        name: str,
162        url_map: GcpResource,
163    ) -> GcpResource:
164        return self._insert_resource(self.api.targetGrpcProxies(), {
165            'name': name,
166            'url_map': url_map.url,
167            'validate_for_proxyless': True,
168        })
169
170    def delete_target_grpc_proxy(self, name):
171        self._delete_resource(self.api.targetGrpcProxies(), 'targetGrpcProxy',
172                              name)
173
174    def create_target_http_proxy(
175        self,
176        name: str,
177        url_map: GcpResource,
178    ) -> GcpResource:
179        return self._insert_resource(self.api.targetHttpProxies(), {
180            'name': name,
181            'url_map': url_map.url,
182        })
183
184    def delete_target_http_proxy(self, name):
185        self._delete_resource(self.api.targetHttpProxies(), 'targetHttpProxy',
186                              name)
187
188    def create_forwarding_rule(
189        self,
190        name: str,
191        src_port: int,
192        target_proxy: GcpResource,
193        network_url: str,
194    ) -> GcpResource:
195        return self._insert_resource(
196            self.api.globalForwardingRules(),
197            {
198                'name': name,
199                'loadBalancingScheme':
200                    'INTERNAL_SELF_MANAGED',  # Traffic Director
201                'portRange': src_port,
202                'IPAddress': '0.0.0.0',
203                'network': network_url,
204                'target': target_proxy.url,
205            })
206
207    def delete_forwarding_rule(self, name):
208        self._delete_resource(self.api.globalForwardingRules(),
209                              'forwardingRule', name)
210
211    @staticmethod
212    def _network_endpoint_group_not_ready(neg):
213        return not neg or neg.get('size', 0) == 0
214
215    def wait_for_network_endpoint_group(self, name, zone):
216
217        @retrying.retry(retry_on_result=self._network_endpoint_group_not_ready,
218                        stop_max_delay=60 * 1000,
219                        wait_fixed=2 * 1000)
220        def _wait_for_network_endpoint_group_ready():
221            try:
222                neg = self.get_network_endpoint_group(name, zone)
223                logger.debug(
224                    'Waiting for endpoints: NEG %s in zone %s, '
225                    'current count %s', neg['name'], zone, neg.get('size'))
226            except googleapiclient.errors.HttpError as error:
227                # noinspection PyProtectedMember
228                reason = error._get_reason()
229                logger.debug('Retrying NEG load, got %s, details %s',
230                             error.resp.status, reason)
231                raise
232            return neg
233
234        network_endpoint_group = _wait_for_network_endpoint_group_ready()
235        # TODO(sergiitk): dataclass
236        return self.ZonalGcpResource(network_endpoint_group['name'],
237                                     network_endpoint_group['selfLink'], zone)
238
239    def get_network_endpoint_group(self, name, zone):
240        neg = self.api.networkEndpointGroups().get(project=self.project,
241                                                   networkEndpointGroup=name,
242                                                   zone=zone).execute()
243        # TODO(sergiitk): dataclass
244        return neg
245
246    def wait_for_backends_healthy_status(
247        self,
248        backend_service,
249        backends,
250        timeout_sec=_WAIT_FOR_BACKEND_SEC,
251        wait_sec=4,
252    ):
253        pending = set(backends)
254
255        @retrying.retry(retry_on_result=lambda result: not result,
256                        stop_max_delay=timeout_sec * 1000,
257                        wait_fixed=wait_sec * 1000)
258        def _retry_backends_health():
259            for backend in pending:
260                result = self.get_backend_service_backend_health(
261                    backend_service, backend)
262
263                if 'healthStatus' not in result:
264                    logger.debug('Waiting for instances: backend %s, zone %s',
265                                 backend.name, backend.zone)
266                    continue
267
268                backend_healthy = True
269                for instance in result['healthStatus']:
270                    logger.debug(
271                        'Backend %s in zone %s: instance %s:%s health: %s',
272                        backend.name, backend.zone, instance['ipAddress'],
273                        instance['port'], instance['healthState'])
274                    if instance['healthState'] != 'HEALTHY':
275                        backend_healthy = False
276
277                if backend_healthy:
278                    logger.info('Backend %s in zone %s reported healthy',
279                                backend.name, backend.zone)
280                    pending.remove(backend)
281
282            return not pending
283
284        _retry_backends_health()
285
286    def get_backend_service_backend_health(self, backend_service, backend):
287        return self.api.backendServices().getHealth(
288            project=self.project,
289            backendService=backend_service.name,
290            body={
291                "group": backend.url
292            }).execute()
293
294    def _get_resource(self, collection: discovery.Resource,
295                      **kwargs) -> GcpResource:
296        resp = collection.get(project=self.project, **kwargs).execute()
297        logger.info('Loaded compute resource:\n%s',
298                    self._resource_pretty_format(resp))
299        return self.GcpResource(resp['name'], resp['selfLink'])
300
301    def _insert_resource(self, collection: discovery.Resource,
302                         body: Dict[str, Any]) -> GcpResource:
303        logger.info('Creating compute resource:\n%s',
304                    self._resource_pretty_format(body))
305        resp = self._execute(collection.insert(project=self.project, body=body))
306        return self.GcpResource(body['name'], resp['targetLink'])
307
308    def _patch_resource(self, collection, body, **kwargs):
309        logger.info('Patching compute resource:\n%s',
310                    self._resource_pretty_format(body))
311        self._execute(
312            collection.patch(project=self.project, body=body, **kwargs))
313
314    def _delete_resource(self, collection: discovery.Resource,
315                         resource_type: str, resource_name: str) -> bool:
316        try:
317            params = {"project": self.project, resource_type: resource_name}
318            self._execute(collection.delete(**params))
319            return True
320        except googleapiclient.errors.HttpError as error:
321            if error.resp and error.resp.status == 404:
322                logger.info(
323                    'Resource %s "%s" not deleted since it does not exist',
324                    resource_type, resource_name)
325            else:
326                logger.warning('Failed to delete %s "%s", %r', resource_type,
327                               resource_name, error)
328        return False
329
330    @staticmethod
331    def _operation_status_done(operation):
332        return 'status' in operation and operation['status'] == 'DONE'
333
334    def _execute(self,
335                 request,
336                 *,
337                 test_success_fn=None,
338                 timeout_sec=_WAIT_FOR_OPERATION_SEC):
339        operation = request.execute(num_retries=self._GCP_API_RETRIES)
340        logger.debug('Response %s', operation)
341
342        # TODO(sergiitk) try using wait() here
343        # https://googleapis.github.io/google-api-python-client/docs/dyn/compute_v1.globalOperations.html#wait
344        operation_request = self.api.globalOperations().get(
345            project=self.project, operation=operation['name'])
346
347        if test_success_fn is None:
348            test_success_fn = self._operation_status_done
349
350        logger.debug('Waiting for global operation %s, timeout %s sec',
351                     operation['name'], timeout_sec)
352        response = self.wait_for_operation(operation_request=operation_request,
353                                           test_success_fn=test_success_fn,
354                                           timeout_sec=timeout_sec)
355
356        if 'error' in response:
357            logger.debug('Waiting for global operation failed, response: %r',
358                         response)
359            raise Exception(f'Operation {operation["name"]} did not complete '
360                            f'within {timeout_sec}s, error={response["error"]}')
361        return response
362