1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16from typing import Tuple 17 18import grpc 19 20_REQUEST = b"\x00\x00\x00" 21_RESPONSE = b"\x00\x00\x00" 22 23_SERVICE_NAME = "test" 24_UNARY_UNARY = "UnaryUnary" 25_UNARY_UNARY_FILTERED = "UnaryUnaryFiltered" 26_UNARY_STREAM = "UnaryStream" 27_STREAM_UNARY = "StreamUnary" 28_STREAM_STREAM = "StreamStream" 29STREAM_LENGTH = 5 30TRIGGER_RPC_METADATA = ("control", "trigger_rpc") 31TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "") 32 33 34def handle_unary_unary(request, servicer_context): 35 if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata(): 36 for k, v in servicer_context.invocation_metadata(): 37 if "port" in k: 38 unary_unary_call(port=int(v)) 39 if "to_new_server" in k: 40 second_server = grpc.server( 41 futures.ThreadPoolExecutor(max_workers=10) 42 ) 43 second_server.add_generic_rpc_handlers((_GenericHandler(),)) 44 second_server_port = second_server.add_insecure_port("[::]:0") 45 second_server.start() 46 unary_unary_call(port=second_server_port) 47 second_server.stop(0) 48 return _RESPONSE 49 50 51def handle_unary_stream(request, servicer_context): 52 for _ in range(STREAM_LENGTH): 53 yield _RESPONSE 54 55 56def handle_stream_unary(request_iterator, servicer_context): 57 return _RESPONSE 58 59 60def handle_stream_stream(request_iterator, servicer_context): 61 for request in request_iterator: 62 yield _RESPONSE 63 64 65class _MethodHandler(grpc.RpcMethodHandler): 66 def __init__(self, request_streaming, response_streaming): 67 self.request_streaming = request_streaming 68 self.response_streaming = response_streaming 69 self.request_deserializer = None 70 self.response_serializer = None 71 self.unary_unary = None 72 self.unary_stream = None 73 self.stream_unary = None 74 self.stream_stream = None 75 if self.request_streaming and self.response_streaming: 76 self.stream_stream = handle_stream_stream 77 elif self.request_streaming: 78 self.stream_unary = handle_stream_unary 79 elif self.response_streaming: 80 self.unary_stream = handle_unary_stream 81 else: 82 self.unary_unary = handle_unary_unary 83 84 85class _GenericHandler(grpc.GenericRpcHandler): 86 def service(self, handler_call_details): 87 if handler_call_details.method == _UNARY_UNARY: 88 return _MethodHandler(False, False) 89 if handler_call_details.method == _UNARY_UNARY_FILTERED: 90 return _MethodHandler(False, False) 91 elif handler_call_details.method == _UNARY_STREAM: 92 return _MethodHandler(False, True) 93 elif handler_call_details.method == _STREAM_UNARY: 94 return _MethodHandler(True, False) 95 elif handler_call_details.method == _STREAM_STREAM: 96 return _MethodHandler(True, True) 97 else: 98 return None 99 100 101RPC_METHOD_HANDLERS = { 102 _UNARY_UNARY_FILTERED: _MethodHandler(False, False), 103 _UNARY_UNARY: _MethodHandler(False, False), 104 _UNARY_STREAM: _MethodHandler(False, True), 105 _STREAM_UNARY: _MethodHandler(True, False), 106 _STREAM_STREAM: _MethodHandler(True, True), 107} 108 109REGISTERED_RPC_METHOD_HANDLERS = { 110 _UNARY_UNARY_FILTERED: _MethodHandler(False, False), 111 _UNARY_UNARY: _MethodHandler(False, False), 112 _UNARY_STREAM: _MethodHandler(False, True), 113 _STREAM_UNARY: _MethodHandler(True, False), 114 _STREAM_STREAM: _MethodHandler(True, True), 115} 116 117 118def start_server( 119 interceptors=None, register_method=True 120) -> Tuple[grpc.Server, int]: 121 if interceptors: 122 server = grpc.server( 123 futures.ThreadPoolExecutor(max_workers=10), 124 interceptors=interceptors, 125 ) 126 else: 127 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 128 generic_handler = grpc.method_handlers_generic_handler( 129 _SERVICE_NAME, RPC_METHOD_HANDLERS 130 ) 131 server.add_generic_rpc_handlers((generic_handler,)) 132 if register_method: 133 server.add_registered_method_handlers( 134 _SERVICE_NAME, REGISTERED_RPC_METHOD_HANDLERS 135 ) 136 port = server.add_insecure_port("[::]:0") 137 server.start() 138 return server, port 139 140 141def unary_unary_call(port, metadata=None, registered_method=False): 142 with grpc.insecure_channel(f"localhost:{port}") as channel: 143 multi_callable = channel.unary_unary( 144 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY), 145 _registered_method=registered_method, 146 ) 147 if metadata: 148 unused_response, call = multi_callable.with_call( 149 _REQUEST, metadata=metadata 150 ) 151 else: 152 unused_response, call = multi_callable.with_call(_REQUEST) 153 154 155def intercepted_unary_unary_call(port, interceptors, metadata=None): 156 with grpc.insecure_channel(f"localhost:{port}") as channel: 157 intercept_channel = grpc.intercept_channel(channel, interceptors) 158 multi_callable = intercept_channel.unary_unary( 159 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY) 160 ) 161 if metadata: 162 unused_response, call = multi_callable.with_call( 163 _REQUEST, metadata=metadata 164 ) 165 else: 166 unused_response, call = multi_callable.with_call(_REQUEST) 167 168 169def unary_unary_filtered_call(port, metadata=None): 170 with grpc.insecure_channel(f"localhost:{port}") as channel: 171 multi_callable = channel.unary_unary( 172 grpc._common.fully_qualified_method( 173 _SERVICE_NAME, _UNARY_UNARY_FILTERED 174 ) 175 ) 176 if metadata: 177 unused_response, call = multi_callable.with_call( 178 _REQUEST, metadata=metadata 179 ) 180 else: 181 unused_response, call = multi_callable.with_call(_REQUEST) 182 183 184def unary_stream_call(port): 185 with grpc.insecure_channel(f"localhost:{port}") as channel: 186 multi_callable = channel.unary_stream( 187 grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_STREAM) 188 ) 189 call = multi_callable(_REQUEST) 190 for _ in call: 191 pass 192 193 194def stream_unary_call(port): 195 with grpc.insecure_channel(f"localhost:{port}") as channel: 196 multi_callable = channel.stream_unary( 197 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_UNARY) 198 ) 199 unused_response, call = multi_callable.with_call( 200 iter([_REQUEST] * STREAM_LENGTH) 201 ) 202 203 204def stream_stream_call(port): 205 with grpc.insecure_channel(f"localhost:{port}") as channel: 206 multi_callable = channel.stream_stream( 207 grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_STREAM) 208 ) 209 call = multi_callable(iter([_REQUEST] * STREAM_LENGTH)) 210 for _ in call: 211 pass 212