1# Copyright 2021 The gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import argparse 16import collections 17from concurrent import futures 18import logging 19import signal 20import socket 21import sys 22import threading 23import time 24from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple 25 26import grpc 27from grpc_channelz.v1 import channelz 28from grpc_channelz.v1 import channelz_pb2 29from grpc_health.v1 import health as grpc_health 30from grpc_health.v1 import health_pb2 31from grpc_health.v1 import health_pb2_grpc 32from grpc_reflection.v1alpha import reflection 33 34from src.proto.grpc.testing import empty_pb2 35from src.proto.grpc.testing import messages_pb2 36from src.proto.grpc.testing import test_pb2 37from src.proto.grpc.testing import test_pb2_grpc 38 39# NOTE: This interop server is not fully compatible with all xDS interop tests. 40# It currently only implements enough functionality to pass the xDS security 41# tests. 42 43_LISTEN_HOST = "0.0.0.0" 44 45_THREAD_POOL_SIZE = 256 46 47logger = logging.getLogger() 48console_handler = logging.StreamHandler() 49formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s") 50console_handler.setFormatter(formatter) 51logger.addHandler(console_handler) 52 53 54class TestService(test_pb2_grpc.TestServiceServicer): 55 def __init__(self, server_id, hostname): 56 self._server_id = server_id 57 self._hostname = hostname 58 59 def EmptyCall( 60 self, _: empty_pb2.Empty, context: grpc.ServicerContext 61 ) -> empty_pb2.Empty: 62 context.send_initial_metadata((("hostname", self._hostname),)) 63 return empty_pb2.Empty() 64 65 def UnaryCall( 66 self, request: messages_pb2.SimpleRequest, context: grpc.ServicerContext 67 ) -> messages_pb2.SimpleResponse: 68 context.send_initial_metadata((("hostname", self._hostname),)) 69 response = messages_pb2.SimpleResponse() 70 response.server_id = self._server_id 71 response.hostname = self._hostname 72 return response 73 74 75def _configure_maintenance_server( 76 server: grpc.Server, maintenance_port: int 77) -> None: 78 channelz.add_channelz_servicer(server) 79 listen_address = f"{_LISTEN_HOST}:{maintenance_port}" 80 server.add_insecure_port(listen_address) 81 health_servicer = grpc_health.HealthServicer( 82 experimental_non_blocking=True, 83 experimental_thread_pool=futures.ThreadPoolExecutor( 84 max_workers=_THREAD_POOL_SIZE 85 ), 86 ) 87 88 health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server) 89 SERVICE_NAMES = ( 90 test_pb2.DESCRIPTOR.services_by_name["TestService"].full_name, 91 health_pb2.DESCRIPTOR.services_by_name["Health"].full_name, 92 channelz_pb2.DESCRIPTOR.services_by_name["Channelz"].full_name, 93 reflection.SERVICE_NAME, 94 ) 95 for service in SERVICE_NAMES: 96 health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING) 97 reflection.enable_server_reflection(SERVICE_NAMES, server) 98 99 100def _configure_test_server( 101 server: grpc.Server, port: int, secure_mode: bool, server_id: str 102) -> None: 103 test_pb2_grpc.add_TestServiceServicer_to_server( 104 TestService(server_id, socket.gethostname()), server 105 ) 106 listen_address = f"{_LISTEN_HOST}:{port}" 107 if not secure_mode: 108 server.add_insecure_port(listen_address) 109 else: 110 logger.info("Running with xDS Server credentials") 111 server_fallback_creds = grpc.insecure_server_credentials() 112 server_creds = grpc.xds_server_credentials(server_fallback_creds) 113 server.add_secure_port(listen_address, server_creds) 114 115 116def _run( 117 port: int, maintenance_port: int, secure_mode: bool, server_id: str 118) -> None: 119 if port == maintenance_port: 120 server = grpc.server( 121 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE) 122 ) 123 _configure_test_server(server, port, secure_mode, server_id) 124 _configure_maintenance_server(server, maintenance_port) 125 server.start() 126 logger.info("Test server listening on port %d", port) 127 logger.info("Maintenance server listening on port %d", maintenance_port) 128 server.wait_for_termination() 129 else: 130 maintenance_server = grpc.server( 131 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE) 132 ) 133 _configure_maintenance_server(maintenance_server, maintenance_port) 134 maintenance_server.start() 135 logger.info("Maintenance server listening on port %d", maintenance_port) 136 test_server = grpc.server( 137 futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE), 138 xds=secure_mode, 139 ) 140 _configure_test_server(test_server, port, secure_mode, server_id) 141 test_server.start() 142 logger.info("Test server listening on port %d", port) 143 test_server.wait_for_termination() 144 maintenance_server.wait_for_termination() 145 146 147def bool_arg(arg: str) -> bool: 148 if arg.lower() in ("true", "yes", "y"): 149 return True 150 elif arg.lower() in ("false", "no", "n"): 151 return False 152 else: 153 raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.") 154 155 156if __name__ == "__main__": 157 parser = argparse.ArgumentParser( 158 description="Run Python xDS interop server." 159 ) 160 parser.add_argument( 161 "--port", type=int, default=8080, help="Port for test server." 162 ) 163 parser.add_argument( 164 "--maintenance_port", 165 type=int, 166 default=8080, 167 help="Port for servers besides test server.", 168 ) 169 parser.add_argument( 170 "--secure_mode", 171 type=bool_arg, 172 default="False", 173 help="If specified, uses xDS to retrieve server credentials.", 174 ) 175 parser.add_argument( 176 "--server_id", 177 type=str, 178 default="python_server", 179 help="The server ID to return in responses..", 180 ) 181 parser.add_argument( 182 "--verbose", 183 help="verbose log output", 184 default=False, 185 action="store_true", 186 ) 187 args = parser.parse_args() 188 if args.verbose: 189 logger.setLevel(logging.DEBUG) 190 else: 191 logger.setLevel(logging.INFO) 192 if args.secure_mode and args.port == args.maintenance_port: 193 raise ValueError( 194 "--port and --maintenance_port must not be the same when" 195 " --secure_mode is set." 196 ) 197 _run(args.port, args.maintenance_port, args.secure_mode, args.server_id) 198