1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14""" 15xDS Test Server. 16 17TODO(sergiitk): separate XdsTestServer and KubernetesServerRunner to individual 18modules. 19""" 20import functools 21import logging 22from typing import Iterator, Optional 23 24from framework.infrastructure import k8s 25import framework.rpc 26from framework.rpc import grpc_channelz 27from framework.test_app import base_runner 28 29logger = logging.getLogger(__name__) 30 31# Type aliases 32_ChannelzServiceClient = grpc_channelz.ChannelzServiceClient 33 34 35class XdsTestServer(framework.rpc.grpc.GrpcApp): 36 """ 37 Represents RPC services implemented in Server component of the xDS test app. 38 https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server 39 """ 40 41 def __init__(self, 42 *, 43 ip: str, 44 rpc_port: int, 45 maintenance_port: Optional[int] = None, 46 secure_mode: Optional[bool] = False, 47 server_id: Optional[str] = None, 48 xds_host: Optional[str] = None, 49 xds_port: Optional[int] = None, 50 rpc_host: Optional[str] = None): 51 super().__init__(rpc_host=(rpc_host or ip)) 52 self.ip = ip 53 self.rpc_port = rpc_port 54 self.maintenance_port = maintenance_port or rpc_port 55 self.secure_mode = secure_mode 56 self.server_id = server_id 57 self.xds_host, self.xds_port = xds_host, xds_port 58 59 @property 60 @functools.lru_cache(None) 61 def channelz(self) -> _ChannelzServiceClient: 62 return _ChannelzServiceClient(self._make_channel(self.maintenance_port)) 63 64 def set_xds_address(self, xds_host, xds_port: Optional[int] = None): 65 self.xds_host, self.xds_port = xds_host, xds_port 66 67 @property 68 def xds_address(self) -> str: 69 if not self.xds_host: 70 return '' 71 if not self.xds_port: 72 return self.xds_host 73 return f'{self.xds_host}:{self.xds_port}' 74 75 @property 76 def xds_uri(self) -> str: 77 if not self.xds_host: 78 return '' 79 return f'xds:///{self.xds_address}' 80 81 def get_test_server(self) -> grpc_channelz.Server: 82 """Return channelz representation of a server running TestService. 83 84 Raises: 85 GrpcApp.NotFound: Test server not found. 86 """ 87 server = self.channelz.find_server_listening_on_port(self.rpc_port) 88 if not server: 89 raise self.NotFound( 90 f'Server listening on port {self.rpc_port} not found') 91 return server 92 93 def get_test_server_sockets(self) -> Iterator[grpc_channelz.Socket]: 94 """List all sockets of the test server. 95 96 Raises: 97 GrpcApp.NotFound: Test server not found. 98 """ 99 server = self.get_test_server() 100 return self.channelz.list_server_sockets(server) 101 102 def get_server_socket_matching_client(self, 103 client_socket: grpc_channelz.Socket): 104 """Find test server socket that matches given test client socket. 105 106 Sockets are matched using TCP endpoints (ip:port), further on "address". 107 Server socket remote address matched with client socket local address. 108 109 Raises: 110 GrpcApp.NotFound: Server socket matching client socket not found. 111 """ 112 client_local = self.channelz.sock_address_to_str(client_socket.local) 113 logger.debug('Looking for a server socket connected to the client %s', 114 client_local) 115 116 server_socket = self.channelz.find_server_socket_matching_client( 117 self.get_test_server_sockets(), client_socket) 118 if not server_socket: 119 raise self.NotFound( 120 f'Server socket to client {client_local} not found') 121 122 logger.info('Found matching socket pair: server(%s) <-> client(%s)', 123 self.channelz.sock_addresses_pretty(server_socket), 124 self.channelz.sock_addresses_pretty(client_socket)) 125 return server_socket 126 127 128class KubernetesServerRunner(base_runner.KubernetesBaseRunner): 129 DEFAULT_TEST_PORT = 8080 130 DEFAULT_MAINTENANCE_PORT = 8080 131 DEFAULT_SECURE_MODE_MAINTENANCE_PORT = 8081 132 133 def __init__(self, 134 k8s_namespace, 135 *, 136 deployment_name, 137 image_name, 138 gcp_service_account, 139 service_account_name=None, 140 service_name=None, 141 neg_name=None, 142 td_bootstrap_image=None, 143 xds_server_uri=None, 144 network='default', 145 deployment_template='server.deployment.yaml', 146 service_account_template='service-account.yaml', 147 service_template='server.service.yaml', 148 reuse_service=False, 149 reuse_namespace=False, 150 namespace_template=None, 151 debug_use_port_forwarding=False): 152 super().__init__(k8s_namespace, namespace_template, reuse_namespace) 153 154 # Settings 155 self.deployment_name = deployment_name 156 self.image_name = image_name 157 self.gcp_service_account = gcp_service_account 158 self.service_account_name = service_account_name or deployment_name 159 self.service_name = service_name or deployment_name 160 # xDS bootstrap generator 161 self.td_bootstrap_image = td_bootstrap_image 162 self.xds_server_uri = xds_server_uri 163 # This only works in k8s >= 1.18.10-gke.600 164 # https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs 165 self.neg_name = neg_name or (f'{self.k8s_namespace.name}-' 166 f'{self.service_name}') 167 self.network = network 168 self.deployment_template = deployment_template 169 self.service_account_template = service_account_template 170 self.service_template = service_template 171 self.reuse_service = reuse_service 172 self.debug_use_port_forwarding = debug_use_port_forwarding 173 174 # Mutable state 175 self.deployment: Optional[k8s.V1Deployment] = None 176 self.service_account: Optional[k8s.V1ServiceAccount] = None 177 self.service: Optional[k8s.V1Service] = None 178 self.port_forwarder = None 179 180 def run(self, 181 *, 182 test_port=DEFAULT_TEST_PORT, 183 maintenance_port=None, 184 secure_mode=False, 185 server_id=None, 186 replica_count=1) -> XdsTestServer: 187 # TODO(sergiitk): multiple replicas 188 if replica_count != 1: 189 raise NotImplementedError("Multiple replicas not yet supported") 190 191 # Implementation detail: in secure mode, maintenance ("backchannel") 192 # port must be different from the test port so communication with 193 # maintenance services can be reached independently from the security 194 # configuration under test. 195 if maintenance_port is None: 196 if not secure_mode: 197 maintenance_port = self.DEFAULT_MAINTENANCE_PORT 198 else: 199 maintenance_port = self.DEFAULT_SECURE_MODE_MAINTENANCE_PORT 200 201 if secure_mode and maintenance_port == test_port: 202 raise ValueError('port and maintenance_port must be different ' 203 'when running test server in secure mode') 204 # To avoid bugs with comparing wrong types. 205 if not (isinstance(test_port, int) and 206 isinstance(maintenance_port, int)): 207 raise TypeError('Port numbers must be integer') 208 209 # Create namespace. 210 super().run() 211 212 # Reuse existing if requested, create a new deployment when missing. 213 # Useful for debugging to avoid NEG loosing relation to deleted service. 214 if self.reuse_service: 215 self.service = self._reuse_service(self.service_name) 216 if not self.service: 217 self.service = self._create_service( 218 self.service_template, 219 service_name=self.service_name, 220 namespace_name=self.k8s_namespace.name, 221 deployment_name=self.deployment_name, 222 neg_name=self.neg_name, 223 test_port=test_port) 224 self._wait_service_neg(self.service_name, test_port) 225 226 # Create service account 227 self.service_account = self._create_service_account( 228 self.service_account_template, 229 service_account_name=self.service_account_name, 230 namespace_name=self.k8s_namespace.name, 231 gcp_service_account=self.gcp_service_account) 232 233 # Always create a new deployment 234 self.deployment = self._create_deployment( 235 self.deployment_template, 236 deployment_name=self.deployment_name, 237 image_name=self.image_name, 238 namespace_name=self.k8s_namespace.name, 239 service_account_name=self.service_account_name, 240 td_bootstrap_image=self.td_bootstrap_image, 241 xds_server_uri=self.xds_server_uri, 242 network=self.network, 243 replica_count=replica_count, 244 test_port=test_port, 245 maintenance_port=maintenance_port, 246 server_id=server_id, 247 secure_mode=secure_mode) 248 249 self._wait_deployment_with_available_replicas(self.deployment_name, 250 replica_count) 251 252 # Wait for pods running 253 pods = self.k8s_namespace.list_deployment_pods(self.deployment) 254 for pod in pods: 255 self._wait_pod_started(pod.metadata.name) 256 257 # TODO(sergiitk): This is why multiple replicas not yet supported 258 pod = pods[0] 259 pod_ip = pod.status.pod_ip 260 rpc_host = None 261 # Experimental, for local debugging. 262 if self.debug_use_port_forwarding: 263 logger.info('LOCAL DEV MODE: Enabling port forwarding to %s:%s', 264 pod_ip, maintenance_port) 265 self.port_forwarder = self.k8s_namespace.port_forward_pod( 266 pod, remote_port=maintenance_port) 267 rpc_host = self.k8s_namespace.PORT_FORWARD_LOCAL_ADDRESS 268 269 return XdsTestServer(ip=pod_ip, 270 rpc_port=test_port, 271 maintenance_port=maintenance_port, 272 secure_mode=secure_mode, 273 server_id=server_id, 274 rpc_host=rpc_host) 275 276 def cleanup(self, *, force=False, force_namespace=False): 277 if self.port_forwarder: 278 self.k8s_namespace.port_forward_stop(self.port_forwarder) 279 self.port_forwarder = None 280 if self.deployment or force: 281 self._delete_deployment(self.deployment_name) 282 self.deployment = None 283 if (self.service and not self.reuse_service) or force: 284 self._delete_service(self.service_name) 285 self.service = None 286 if self.service_account or force: 287 self._delete_service_account(self.service_account_name) 288 self.service_account = None 289 super().cleanup(force=(force_namespace and force)) 290