1# Copyright 2023 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from concurrent import futures 16from typing import Tuple 17 18import grpc 19 20_REQUEST = b"\x00\x00\x00" 21_RESPONSE = b"\x00\x00\x00" 22 23_UNARY_UNARY = "/test/UnaryUnary" 24_UNARY_UNARY_FILTERED = "/test/UnaryUnaryFiltered" 25_UNARY_STREAM = "/test/UnaryStream" 26_STREAM_UNARY = "/test/StreamUnary" 27_STREAM_STREAM = "/test/StreamStream" 28STREAM_LENGTH = 5 29TRIGGER_RPC_METADATA = ("control", "trigger_rpc") 30TRIGGER_RPC_TO_NEW_SERVER_METADATA = ("to_new_server", "") 31 32 33def handle_unary_unary(request, servicer_context): 34 if TRIGGER_RPC_METADATA in servicer_context.invocation_metadata(): 35 for k, v in servicer_context.invocation_metadata(): 36 if "port" in k: 37 unary_unary_call(port=int(v)) 38 if "to_new_server" in k: 39 second_server = grpc.server( 40 futures.ThreadPoolExecutor(max_workers=10) 41 ) 42 second_server.add_generic_rpc_handlers((_GenericHandler(),)) 43 second_server_port = second_server.add_insecure_port("[::]:0") 44 second_server.start() 45 unary_unary_call(port=second_server_port) 46 second_server.stop(0) 47 return _RESPONSE 48 49 50def handle_unary_stream(request, servicer_context): 51 for _ in range(STREAM_LENGTH): 52 yield _RESPONSE 53 54 55def handle_stream_unary(request_iterator, servicer_context): 56 return _RESPONSE 57 58 59def handle_stream_stream(request_iterator, servicer_context): 60 for request in request_iterator: 61 yield _RESPONSE 62 63 64class _MethodHandler(grpc.RpcMethodHandler): 65 def __init__(self, request_streaming, response_streaming): 66 self.request_streaming = request_streaming 67 self.response_streaming = response_streaming 68 self.request_deserializer = None 69 self.response_serializer = None 70 self.unary_unary = None 71 self.unary_stream = None 72 self.stream_unary = None 73 self.stream_stream = None 74 if self.request_streaming and self.response_streaming: 75 self.stream_stream = handle_stream_stream 76 elif self.request_streaming: 77 self.stream_unary = handle_stream_unary 78 elif self.response_streaming: 79 self.unary_stream = handle_unary_stream 80 else: 81 self.unary_unary = handle_unary_unary 82 83 84class _GenericHandler(grpc.GenericRpcHandler): 85 def service(self, handler_call_details): 86 if handler_call_details.method == _UNARY_UNARY: 87 return _MethodHandler(False, False) 88 if handler_call_details.method == _UNARY_UNARY_FILTERED: 89 return _MethodHandler(False, False) 90 elif handler_call_details.method == _UNARY_STREAM: 91 return _MethodHandler(False, True) 92 elif handler_call_details.method == _STREAM_UNARY: 93 return _MethodHandler(True, False) 94 elif handler_call_details.method == _STREAM_STREAM: 95 return _MethodHandler(True, True) 96 else: 97 return None 98 99 100def start_server(interceptors=None) -> Tuple[grpc.Server, int]: 101 if interceptors: 102 server = grpc.server( 103 futures.ThreadPoolExecutor(max_workers=10), 104 interceptors=interceptors, 105 ) 106 else: 107 server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) 108 server.add_generic_rpc_handlers((_GenericHandler(),)) 109 port = server.add_insecure_port("[::]:0") 110 server.start() 111 return server, port 112 113 114def unary_unary_call(port, metadata=None): 115 with grpc.insecure_channel(f"localhost:{port}") as channel: 116 multi_callable = channel.unary_unary(_UNARY_UNARY) 117 if metadata: 118 unused_response, call = multi_callable.with_call( 119 _REQUEST, metadata=metadata 120 ) 121 else: 122 unused_response, call = multi_callable.with_call(_REQUEST) 123 124 125def intercepted_unary_unary_call(port, interceptors, metadata=None): 126 with grpc.insecure_channel(f"localhost:{port}") as channel: 127 intercept_channel = grpc.intercept_channel(channel, interceptors) 128 multi_callable = intercept_channel.unary_unary(_UNARY_UNARY) 129 if metadata: 130 unused_response, call = multi_callable.with_call( 131 _REQUEST, metadata=metadata 132 ) 133 else: 134 unused_response, call = multi_callable.with_call(_REQUEST) 135 136 137def unary_unary_filtered_call(port, metadata=None): 138 with grpc.insecure_channel(f"localhost:{port}") as channel: 139 multi_callable = channel.unary_unary(_UNARY_UNARY_FILTERED) 140 if metadata: 141 unused_response, call = multi_callable.with_call( 142 _REQUEST, metadata=metadata 143 ) 144 else: 145 unused_response, call = multi_callable.with_call(_REQUEST) 146 147 148def unary_stream_call(port): 149 with grpc.insecure_channel(f"localhost:{port}") as channel: 150 multi_callable = channel.unary_stream(_UNARY_STREAM) 151 call = multi_callable(_REQUEST) 152 for _ in call: 153 pass 154 155 156def stream_unary_call(port): 157 with grpc.insecure_channel(f"localhost:{port}") as channel: 158 multi_callable = channel.stream_unary(_STREAM_UNARY) 159 unused_response, call = multi_callable.with_call( 160 iter([_REQUEST] * STREAM_LENGTH) 161 ) 162 163 164def stream_stream_call(port): 165 with grpc.insecure_channel(f"localhost:{port}") as channel: 166 multi_callable = channel.stream_stream(_STREAM_STREAM) 167 call = multi_callable(iter([_REQUEST] * STREAM_LENGTH)) 168 for _ in call: 169 pass 170