• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""
15xDS Test Server.
16
17TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual
18modules.
19"""
20import functools
21import logging
22from typing import Iterator, Optional
23
24from framework.infrastructure import k8s
25import framework.rpc
26from framework.rpc import grpc_channelz
27from framework.test_app import base_runner
28
29logger = logging.getLogger(__name__)
30
31# Type aliases
32_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient
33
34
35class XdsTestServer(framework.rpc.grpc.GrpcApp):
36    """
37    Represents RPC services implemented in Server component of the xDS test app.
38    https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
39    """
40
41    def __init__(self,
42                 *,
43                 ip: str,
44                 rpc_port: int,
45                 maintenance_port: Optional[int] = None,
46                 secure_mode: Optional[bool] = False,
47                 server_id: Optional[str] = None,
48                 xds_host: Optional[str] = None,
49                 xds_port: Optional[int] = None,
50                 rpc_host: Optional[str] = None):
51        super().__init__(rpc_host=(rpc_host or ip))
52        self.ip = ip
53        self.rpc_port = rpc_port
54        self.maintenance_port = maintenance_port or rpc_port
55        self.secure_mode = secure_mode
56        self.server_id = server_id
57        self.xds_host, self.xds_port = xds_host, xds_port
58
59    @property
60    @functools.lru_cache(None)
61    def channelz(self) -> _ChannelzServiceClient:
62        return _ChannelzServiceClient(self._make_channel(self.maintenance_port))
63
64    def set_xds_address(self, xds_host, xds_port: Optional[int] = None):
65        self.xds_host, self.xds_port = xds_host, xds_port
66
67    @property
68    def xds_address(self) -> str:
69        if not self.xds_host:
70            return ''
71        if not self.xds_port:
72            return self.xds_host
73        return f'{self.xds_host}:{self.xds_port}'
74
75    @property
76    def xds_uri(self) -> str:
77        if not self.xds_host:
78            return ''
79        return f'xds:///{self.xds_address}'
80
81    def get_test_server(self) -> grpc_channelz.Server:
82        """Return channelz representation of a server running TestService.
83
84        Raises:
85            GrpcApp.NotFound: Test server not found.
86        """
87        server = self.channelz.find_server_listening_on_port(self.rpc_port)
88        if not server:
89            raise self.NotFound(
90                f'Server listening on port {self.rpc_port} not found')
91        return server
92
93    def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]:
94        """List all sockets of the test server.
95
96        Raises:
97            GrpcApp.NotFound: Test server not found.
98        """
99        server = self.get_test_server()
100        return self.channelz.list_server_sockets(server)
101
102    def get_server_socket_matching_client(self,
103                                          client_socket: grpc_channelz.Socket):
104        """Find test server socket that matches given test client socket.
105
106        Sockets are matched using TCP endpoints (ip:port), further on "address".
107        Server socket remote address matched with client socket local address.
108
109         Raises:
110             GrpcApp.NotFound: Server socket matching client socket not found.
111         """
112        client_local = self.channelz.sock_address_to_str(client_socket.local)
113        logger.debug('Looking for a server socket connected to the client %s',
114                     client_local)
115
116        server_socket = self.channelz.find_server_socket_matching_client(
117            self.get_test_server_sockets(), client_socket)
118        if not server_socket:
119            raise self.NotFound(
120                f'Server socket to client {client_local} not found')
121
122        logger.info('Found matching socket pair: server(%s) <-> client(%s)',
123                    self.channelz.sock_addresses_pretty(server_socket),
124                    self.channelz.sock_addresses_pretty(client_socket))
125        return server_socket
126
127
128class KubernetesServerRunner(base_runner.KubernetesBaseRunner):
129    DEFAULT_TEST_PORT = 8080
130    DEFAULT_MAINTENANCE_PORT = 8080
131    DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081
132
133    def __init__(self,
134                 k8s_namespace,
135                 *,
136                 deployment_name,
137                 image_name,
138                 gcp_service_account,
139                 service_account_name=None,
140                 service_name=None,
141                 neg_name=None,
142                 td_bootstrap_image=None,
143                 xds_server_uri=None,
144                 network='default',
145                 deployment_template='server.deployment.yaml',
146                 service_account_template='service-account.yaml',
147                 service_template='server.service.yaml',
148                 reuse_service=False,
149                 reuse_namespace=False,
150                 namespace_template=None,
151                 debug_use_port_forwarding=False):
152        super().__init__(k8s_namespace, namespace_template, reuse_namespace)
153
154        # Settings
155        self.deployment_name = deployment_name
156        self.image_name = image_name
157        self.gcp_service_account = gcp_service_account
158        self.service_account_name = service_account_name or deployment_name
159        self.service_name = service_name or deployment_name
160        # xDS bootstrap generator
161        self.td_bootstrap_image = td_bootstrap_image
162        self.xds_server_uri = xds_server_uri
163        # This only works in k8s >= 1.18.10-gke.600
164        # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs
165        self.neg_name = neg_name or (f'{self.k8s_namespace.name}-'
166                                     f'{self.service_name}')
167        self.network = network
168        self.deployment_template = deployment_template
169        self.service_account_template = service_account_template
170        self.service_template = service_template
171        self.reuse_service = reuse_service
172        self.debug_use_port_forwarding = debug_use_port_forwarding
173
174        # Mutable state
175        self.deployment: Optional[k8s.V1Deployment] = None
176        self.service_account: Optional[k8s.V1ServiceAccount] = None
177        self.service: Optional[k8s.V1Service] = None
178        self.port_forwarder = None
179
180    def run(self,
181            *,
182            test_port=DEFAULT_TEST_PORT,
183            maintenance_port=None,
184            secure_mode=False,
185            server_id=None,
186            replica_count=1) -> XdsTestServer:
187        # TODO(sergiitk): multiple replicas
188        if replica_count != 1:
189            raise NotImplementedError("Multiple replicas not yet supported")
190
191        # Implementation detail: in secure mode, maintenance ("backchannel")
192        # port must be different from the test port so communication with
193        # maintenance services can be reached independently from the security
194        # configuration under test.
195        if maintenance_port is None:
196            if not secure_mode:
197                maintenance_port = self.DEFAULT_MAINTENANCE_PORT
198            else:
199                maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT
200
201        if secure_mode and maintenance_port == test_port:
202            raise ValueError('port and maintenance_port must be different '
203                             'when running test server in secure mode')
204        # To avoid bugs with comparing wrong types.
205        if not (isinstance(test_port, int) and
206                isinstance(maintenance_port, int)):
207            raise TypeError('Port numbers must be integer')
208
209        # Create namespace.
210        super().run()
211
212        # Reuse existing if requested, create a new deployment when missing.
213        # Useful for debugging to avoid NEG loosing relation to deleted service.
214        if self.reuse_service:
215            self.service = self._reuse_service(self.service_name)
216        if not self.service:
217            self.service = self._create_service(
218                self.service_template,
219                service_name=self.service_name,
220                namespace_name=self.k8s_namespace.name,
221                deployment_name=self.deployment_name,
222                neg_name=self.neg_name,
223                test_port=test_port)
224        self._wait_service_neg(self.service_name, test_port)
225
226        # Create service account
227        self.service_account = self._create_service_account(
228            self.service_account_template,
229            service_account_name=self.service_account_name,
230            namespace_name=self.k8s_namespace.name,
231            gcp_service_account=self.gcp_service_account)
232
233        # Always create a new deployment
234        self.deployment = self._create_deployment(
235            self.deployment_template,
236            deployment_name=self.deployment_name,
237            image_name=self.image_name,
238            namespace_name=self.k8s_namespace.name,
239            service_account_name=self.service_account_name,
240            td_bootstrap_image=self.td_bootstrap_image,
241            xds_server_uri=self.xds_server_uri,
242            network=self.network,
243            replica_count=replica_count,
244            test_port=test_port,
245            maintenance_port=maintenance_port,
246            server_id=server_id,
247            secure_mode=secure_mode)
248
249        self._wait_deployment_with_available_replicas(self.deployment_name,
250                                                      replica_count)
251
252        # Wait for pods running
253        pods = self.k8s_namespace.list_deployment_pods(self.deployment)
254        for pod in pods:
255            self._wait_pod_started(pod.metadata.name)
256
257        # TODO(sergiitk): This is why multiple replicas not yet supported
258        pod = pods[0]
259        pod_ip = pod.status.pod_ip
260        rpc_host = None
261        # Experimental, for local debugging.
262        if self.debug_use_port_forwarding:
263            logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s',
264                        pod_ip, maintenance_port)
265            self.port_forwarder = self.k8s_namespace.port_forward_pod(
266                pod, remote_port=maintenance_port)
267            rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS
268
269        return XdsTestServer(ip=pod_ip,
270                             rpc_port=test_port,
271                             maintenance_port=maintenance_port,
272                             secure_mode=secure_mode,
273                             server_id=server_id,
274                             rpc_host=rpc_host)
275
276    def cleanup(self, *, force=False, force_namespace=False):
277        if self.port_forwarder:
278            self.k8s_namespace.port_forward_stop(self.port_forwarder)
279            self.port_forwarder = None
280        if self.deployment or force:
281            self._delete_deployment(self.deployment_name)
282            self.deployment = None
283        if (self.service and not self.reuse_service) or force:
284            self._delete_service(self.service_name)
285            self.service = None
286        if self.service_account or force:
287            self._delete_service_account(self.service_account_name)
288            self.service_account = None
289        super().cleanup(force=(force_namespace and force))
290