• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15
16import dataclasses
17from google.rpc import code_pb2
18import tenacity
19
20from framework.infrastructure import gcp
21
22logger = logging.getLogger(__name__)
23
24
25class NetworkSecurityV1Alpha1(gcp.api.GcpStandardCloudApiResource):
26    SERVER_TLS_POLICIES = 'serverTlsPolicies'
27    CLIENT_TLS_POLICIES = 'clientTlsPolicies'
28
29    @dataclasses.dataclass(frozen=True)
30    class ServerTlsPolicy:
31        url: str
32        name: str
33        server_certificate: dict
34        mtls_policy: dict
35        update_time: str
36        create_time: str
37
38    @dataclasses.dataclass(frozen=True)
39    class ClientTlsPolicy:
40        url: str
41        name: str
42        client_certificate: dict
43        server_validation_ca: list
44        update_time: str
45        create_time: str
46
47    def __init__(self, api_manager: gcp.api.GcpApiManager, project: str):
48        super().__init__(api_manager.networksecurity(self.api_version), project)
49        # Shortcut to projects/*/locations/ endpoints
50        self._api_locations = self.api.projects().locations()
51
52    @property
53    def api_name(self) -> str:
54        return 'networksecurity'
55
56    @property
57    def api_version(self) -> str:
58        return 'v1alpha1'
59
60    def create_server_tls_policy(self, name, body: dict):
61        return self._create_resource(self._api_locations.serverTlsPolicies(),
62                                     body,
63                                     serverTlsPolicyId=name)
64
65    def get_server_tls_policy(self, name: str) -> ServerTlsPolicy:
66        result = self._get_resource(
67            collection=self._api_locations.serverTlsPolicies(),
68            full_name=self.resource_full_name(name, self.SERVER_TLS_POLICIES))
69
70        return self.ServerTlsPolicy(name=name,
71                                    url=result['name'],
72                                    server_certificate=result.get(
73                                        'serverCertificate', {}),
74                                    mtls_policy=result.get('mtlsPolicy', {}),
75                                    create_time=result['createTime'],
76                                    update_time=result['updateTime'])
77
78    def delete_server_tls_policy(self, name):
79        return self._delete_resource(
80            collection=self._api_locations.serverTlsPolicies(),
81            full_name=self.resource_full_name(name, self.SERVER_TLS_POLICIES))
82
83    def create_client_tls_policy(self, name, body: dict):
84        return self._create_resource(self._api_locations.clientTlsPolicies(),
85                                     body,
86                                     clientTlsPolicyId=name)
87
88    def get_client_tls_policy(self, name: str) -> ClientTlsPolicy:
89        result = self._get_resource(
90            collection=self._api_locations.clientTlsPolicies(),
91            full_name=self.resource_full_name(name, self.CLIENT_TLS_POLICIES))
92
93        return self.ClientTlsPolicy(
94            name=name,
95            url=result['name'],
96            client_certificate=result.get('clientCertificate', {}),
97            server_validation_ca=result.get('serverValidationCa', []),
98            create_time=result['createTime'],
99            update_time=result['updateTime'])
100
101    def delete_client_tls_policy(self, name):
102        return self._delete_resource(
103            collection=self._api_locations.clientTlsPolicies(),
104            full_name=self.resource_full_name(name, self.CLIENT_TLS_POLICIES))
105
106    def _execute(self, *args, **kwargs):  # pylint: disable=signature-differs
107        # Workaround TD bug: throttled operations are reported as internal.
108        # Ref b/175345578
109        retryer = tenacity.Retrying(
110            retry=tenacity.retry_if_exception(self._operation_internal_error),
111            wait=tenacity.wait_fixed(10),
112            stop=tenacity.stop_after_delay(5 * 60),
113            before_sleep=tenacity.before_sleep_log(logger, logging.DEBUG),
114            reraise=True)
115        retryer(super()._execute, *args, **kwargs)
116
117    @staticmethod
118    def _operation_internal_error(exception):
119        return (isinstance(exception, gcp.api.OperationError) and
120                exception.error.code == code_pb2.INTERNAL)
121