1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines a number of module-scope gRPC scenarios to test clean exit.""" 15 16import argparse 17import threading 18import time 19 20import grpc 21 22from tests.unit.framework.common import test_constants 23 24WAIT_TIME = 1000 25 26REQUEST = b'request' 27 28UNSTARTED_SERVER = 'unstarted_server' 29RUNNING_SERVER = 'running_server' 30POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server' 31POLL_CONNECTIVITY = 'poll_connectivity' 32IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call' 33IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call' 34IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call' 35IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call' 36IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call' 37IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call' 38IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call' 39 40UNARY_UNARY = b'/test/UnaryUnary' 41UNARY_STREAM = b'/test/UnaryStream' 42STREAM_UNARY = b'/test/StreamUnary' 43STREAM_STREAM = b'/test/StreamStream' 44PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream' 45PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary' 46PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream' 47 48TEST_TO_METHOD = { 49 IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY, 50 IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM, 51 IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY, 52 IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM, 53 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM, 54 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY, 55 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM, 56} 57 58 59def hang_unary_unary(request, servicer_context): 60 time.sleep(WAIT_TIME) 61 62 63def hang_unary_stream(request, servicer_context): 64 time.sleep(WAIT_TIME) 65 66 67def hang_partial_unary_stream(request, servicer_context): 68 for _ in range(test_constants.STREAM_LENGTH // 2): 69 yield request 70 time.sleep(WAIT_TIME) 71 72 73def hang_stream_unary(request_iterator, servicer_context): 74 time.sleep(WAIT_TIME) 75 76 77def hang_partial_stream_unary(request_iterator, servicer_context): 78 for _ in range(test_constants.STREAM_LENGTH // 2): 79 next(request_iterator) 80 time.sleep(WAIT_TIME) 81 82 83def hang_stream_stream(request_iterator, servicer_context): 84 time.sleep(WAIT_TIME) 85 86 87def hang_partial_stream_stream(request_iterator, servicer_context): 88 for _ in range(test_constants.STREAM_LENGTH // 2): 89 yield next(request_iterator) 90 time.sleep(WAIT_TIME) 91 92 93class MethodHandler(grpc.RpcMethodHandler): 94 95 def __init__(self, request_streaming, response_streaming, partial_hang): 96 self.request_streaming = request_streaming 97 self.response_streaming = response_streaming 98 self.request_deserializer = None 99 self.response_serializer = None 100 self.unary_unary = None 101 self.unary_stream = None 102 self.stream_unary = None 103 self.stream_stream = None 104 if self.request_streaming and self.response_streaming: 105 if partial_hang: 106 self.stream_stream = hang_partial_stream_stream 107 else: 108 self.stream_stream = hang_stream_stream 109 elif self.request_streaming: 110 if partial_hang: 111 self.stream_unary = hang_partial_stream_unary 112 else: 113 self.stream_unary = hang_stream_unary 114 elif self.response_streaming: 115 if partial_hang: 116 self.unary_stream = hang_partial_unary_stream 117 else: 118 self.unary_stream = hang_unary_stream 119 else: 120 self.unary_unary = hang_unary_unary 121 122 123class GenericHandler(grpc.GenericRpcHandler): 124 125 def service(self, handler_call_details): 126 if handler_call_details.method == UNARY_UNARY: 127 return MethodHandler(False, False, False) 128 elif handler_call_details.method == UNARY_STREAM: 129 return MethodHandler(False, True, False) 130 elif handler_call_details.method == STREAM_UNARY: 131 return MethodHandler(True, False, False) 132 elif handler_call_details.method == STREAM_STREAM: 133 return MethodHandler(True, True, False) 134 elif handler_call_details.method == PARTIAL_UNARY_STREAM: 135 return MethodHandler(False, True, True) 136 elif handler_call_details.method == PARTIAL_STREAM_UNARY: 137 return MethodHandler(True, False, True) 138 elif handler_call_details.method == PARTIAL_STREAM_STREAM: 139 return MethodHandler(True, True, True) 140 else: 141 return None 142 143 144# Traditional executors will not exit until all their 145# current jobs complete. Because we submit jobs that will 146# never finish, we don't want to block exit on these jobs. 147class DaemonPool(object): 148 149 def submit(self, fn, *args, **kwargs): 150 thread = threading.Thread(target=fn, args=args, kwargs=kwargs) 151 thread.daemon = True 152 thread.start() 153 154 def shutdown(self, wait=True): 155 pass 156 157 158def infinite_request_iterator(): 159 while True: 160 yield REQUEST 161 162 163if __name__ == '__main__': 164 parser = argparse.ArgumentParser() 165 parser.add_argument('scenario', type=str) 166 parser.add_argument( 167 '--wait_for_interrupt', dest='wait_for_interrupt', action='store_true') 168 args = parser.parse_args() 169 170 if args.scenario == UNSTARTED_SERVER: 171 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 172 if args.wait_for_interrupt: 173 time.sleep(WAIT_TIME) 174 elif args.scenario == RUNNING_SERVER: 175 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 176 port = server.add_insecure_port('[::]:0') 177 server.start() 178 if args.wait_for_interrupt: 179 time.sleep(WAIT_TIME) 180 elif args.scenario == POLL_CONNECTIVITY_NO_SERVER: 181 channel = grpc.insecure_channel('localhost:12345') 182 183 def connectivity_callback(connectivity): 184 pass 185 186 channel.subscribe(connectivity_callback, try_to_connect=True) 187 if args.wait_for_interrupt: 188 time.sleep(WAIT_TIME) 189 elif args.scenario == POLL_CONNECTIVITY: 190 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 191 port = server.add_insecure_port('[::]:0') 192 server.start() 193 channel = grpc.insecure_channel('localhost:%d' % port) 194 195 def connectivity_callback(connectivity): 196 pass 197 198 channel.subscribe(connectivity_callback, try_to_connect=True) 199 if args.wait_for_interrupt: 200 time.sleep(WAIT_TIME) 201 202 else: 203 handler = GenericHandler() 204 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 205 port = server.add_insecure_port('[::]:0') 206 server.add_generic_rpc_handlers((handler,)) 207 server.start() 208 channel = grpc.insecure_channel('localhost:%d' % port) 209 210 method = TEST_TO_METHOD[args.scenario] 211 212 if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL: 213 multi_callable = channel.unary_unary(method) 214 future = multi_callable.future(REQUEST) 215 result, call = multi_callable.with_call(REQUEST) 216 elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or 217 args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL): 218 multi_callable = channel.unary_stream(method) 219 response_iterator = multi_callable(REQUEST) 220 for response in response_iterator: 221 pass 222 elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or 223 args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL): 224 multi_callable = channel.stream_unary(method) 225 future = multi_callable.future(infinite_request_iterator()) 226 result, call = multi_callable.with_call( 227 iter([REQUEST] * test_constants.STREAM_LENGTH)) 228 elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or 229 args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): 230 multi_callable = channel.stream_stream(method) 231 response_iterator = multi_callable(infinite_request_iterator()) 232 for response in response_iterator: 233 pass 234