• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import argparse
16import collections
17from concurrent import futures
18import logging
19import signal
20import socket
21import sys
22import threading
23import time
24from typing import DefaultDict, Dict, List, Mapping, Sequence, Set, Tuple
25
26import grpc
27from grpc_channelz.v1 import channelz
28from grpc_channelz.v1 import channelz_pb2
29from grpc_health.v1 import health as grpc_health
30from grpc_health.v1 import health_pb2
31from grpc_health.v1 import health_pb2_grpc
32from grpc_reflection.v1alpha import reflection
33
34from src.proto.grpc.testing import empty_pb2
35from src.proto.grpc.testing import messages_pb2
36from src.proto.grpc.testing import test_pb2
37from src.proto.grpc.testing import test_pb2_grpc
38
39# NOTE: This interop server is not fully compatible with all xDS interop tests.
40#  It currently only implements enough functionality to pass the xDS security
41#  tests.
42
43_LISTEN_HOST = "0.0.0.0"
44
45_THREAD_POOL_SIZE = 256
46
47logger = logging.getLogger()
48console_handler = logging.StreamHandler()
49formatter = logging.Formatter(fmt="%(asctime)s: %(levelname)-8s %(message)s")
50console_handler.setFormatter(formatter)
51logger.addHandler(console_handler)
52
53
54class TestService(test_pb2_grpc.TestServiceServicer):
55    def __init__(self, server_id, hostname):
56        self._server_id = server_id
57        self._hostname = hostname
58
59    def EmptyCall(
60        self, _: empty_pb2.Empty, context: grpc.ServicerContext
61    ) -> empty_pb2.Empty:
62        context.send_initial_metadata((("hostname", self._hostname),))
63        return empty_pb2.Empty()
64
65    def UnaryCall(
66        self, request: messages_pb2.SimpleRequest, context: grpc.ServicerContext
67    ) -> messages_pb2.SimpleResponse:
68        context.send_initial_metadata((("hostname", self._hostname),))
69        response = messages_pb2.SimpleResponse()
70        response.server_id = self._server_id
71        response.hostname = self._hostname
72        return response
73
74
75def _configure_maintenance_server(
76    server: grpc.Server, maintenance_port: int
77) -> None:
78    channelz.add_channelz_servicer(server)
79    listen_address = f"{_LISTEN_HOST}:{maintenance_port}"
80    server.add_insecure_port(listen_address)
81    health_servicer = grpc_health.HealthServicer(
82        experimental_non_blocking=True,
83        experimental_thread_pool=futures.ThreadPoolExecutor(
84            max_workers=_THREAD_POOL_SIZE
85        ),
86    )
87
88    health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)
89    SERVICE_NAMES = (
90        test_pb2.DESCRIPTOR.services_by_name["TestService"].full_name,
91        health_pb2.DESCRIPTOR.services_by_name["Health"].full_name,
92        channelz_pb2.DESCRIPTOR.services_by_name["Channelz"].full_name,
93        reflection.SERVICE_NAME,
94    )
95    for service in SERVICE_NAMES:
96        health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
97    reflection.enable_server_reflection(SERVICE_NAMES, server)
98
99
100def _configure_test_server(
101    server: grpc.Server, port: int, secure_mode: bool, server_id: str
102) -> None:
103    test_pb2_grpc.add_TestServiceServicer_to_server(
104        TestService(server_id, socket.gethostname()), server
105    )
106    listen_address = f"{_LISTEN_HOST}:{port}"
107    if not secure_mode:
108        server.add_insecure_port(listen_address)
109    else:
110        logger.info("Running with xDS Server credentials")
111        server_fallback_creds = grpc.insecure_server_credentials()
112        server_creds = grpc.xds_server_credentials(server_fallback_creds)
113        server.add_secure_port(listen_address, server_creds)
114
115
116def _run(
117    port: int, maintenance_port: int, secure_mode: bool, server_id: str
118) -> None:
119    if port == maintenance_port:
120        server = grpc.server(
121            futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)
122        )
123        _configure_test_server(server, port, secure_mode, server_id)
124        _configure_maintenance_server(server, maintenance_port)
125        server.start()
126        logger.info("Test server listening on port %d", port)
127        logger.info("Maintenance server listening on port %d", maintenance_port)
128        server.wait_for_termination()
129    else:
130        maintenance_server = grpc.server(
131            futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE)
132        )
133        _configure_maintenance_server(maintenance_server, maintenance_port)
134        maintenance_server.start()
135        logger.info("Maintenance server listening on port %d", maintenance_port)
136        test_server = grpc.server(
137            futures.ThreadPoolExecutor(max_workers=_THREAD_POOL_SIZE),
138            xds=secure_mode,
139        )
140        _configure_test_server(test_server, port, secure_mode, server_id)
141        test_server.start()
142        logger.info("Test server listening on port %d", port)
143        test_server.wait_for_termination()
144        maintenance_server.wait_for_termination()
145
146
147def bool_arg(arg: str) -> bool:
148    if arg.lower() in ("true", "yes", "y"):
149        return True
150    elif arg.lower() in ("false", "no", "n"):
151        return False
152    else:
153        raise argparse.ArgumentTypeError(f"Could not parse '{arg}' as a bool.")
154
155
156if __name__ == "__main__":
157    parser = argparse.ArgumentParser(
158        description="Run Python xDS interop server."
159    )
160    parser.add_argument(
161        "--port", type=int, default=8080, help="Port for test server."
162    )
163    parser.add_argument(
164        "--maintenance_port",
165        type=int,
166        default=8080,
167        help="Port for servers besides test server.",
168    )
169    parser.add_argument(
170        "--secure_mode",
171        type=bool_arg,
172        default="False",
173        help="If specified, uses xDS to retrieve server credentials.",
174    )
175    parser.add_argument(
176        "--server_id",
177        type=str,
178        default="python_server",
179        help="The server ID to return in responses..",
180    )
181    parser.add_argument(
182        "--verbose",
183        help="verbose log output",
184        default=False,
185        action="store_true",
186    )
187    args = parser.parse_args()
188    if args.verbose:
189        logger.setLevel(logging.DEBUG)
190    else:
191        logger.setLevel(logging.INFO)
192    if args.secure_mode and args.port == args.maintenance_port:
193        raise ValueError(
194            "--port and --maintenance_port must not be the same when"
195            " --secure_mode is set."
196        )
197    _run(args.port, args.maintenance_port, args.secure_mode, args.server_id)
198