1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines a number of module-scope gRPC scenarios to test clean exit.""" 15 16import argparse 17import threading 18import time 19import logging 20 21import grpc 22 23from tests.unit.framework.common import test_constants 24 25WAIT_TIME = 1000 26 27REQUEST = b'request' 28 29UNSTARTED_SERVER = 'unstarted_server' 30RUNNING_SERVER = 'running_server' 31POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server' 32POLL_CONNECTIVITY = 'poll_connectivity' 33IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call' 34IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call' 35IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call' 36IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call' 37IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call' 38IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call' 39IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call' 40 41UNARY_UNARY = b'/test/UnaryUnary' 42UNARY_STREAM = b'/test/UnaryStream' 43STREAM_UNARY = b'/test/StreamUnary' 44STREAM_STREAM = b'/test/StreamStream' 45PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream' 46PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary' 47PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream' 48 49TEST_TO_METHOD = { 50 IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY, 51 IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM, 52 IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY, 53 IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM, 54 IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM, 55 IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY, 56 IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM, 57} 58 59 60def hang_unary_unary(request, servicer_context): 61 time.sleep(WAIT_TIME) 62 63 64def hang_unary_stream(request, servicer_context): 65 time.sleep(WAIT_TIME) 66 67 68def hang_partial_unary_stream(request, servicer_context): 69 for _ in range(test_constants.STREAM_LENGTH // 2): 70 yield request 71 time.sleep(WAIT_TIME) 72 73 74def hang_stream_unary(request_iterator, servicer_context): 75 time.sleep(WAIT_TIME) 76 77 78def hang_partial_stream_unary(request_iterator, servicer_context): 79 for _ in range(test_constants.STREAM_LENGTH // 2): 80 next(request_iterator) 81 time.sleep(WAIT_TIME) 82 83 84def hang_stream_stream(request_iterator, servicer_context): 85 time.sleep(WAIT_TIME) 86 87 88def hang_partial_stream_stream(request_iterator, servicer_context): 89 for _ in range(test_constants.STREAM_LENGTH // 2): 90 yield next(request_iterator) #pylint: disable=stop-iteration-return 91 time.sleep(WAIT_TIME) 92 93 94class MethodHandler(grpc.RpcMethodHandler): 95 96 def __init__(self, request_streaming, response_streaming, partial_hang): 97 self.request_streaming = request_streaming 98 self.response_streaming = response_streaming 99 self.request_deserializer = None 100 self.response_serializer = None 101 self.unary_unary = None 102 self.unary_stream = None 103 self.stream_unary = None 104 self.stream_stream = None 105 if self.request_streaming and self.response_streaming: 106 if partial_hang: 107 self.stream_stream = hang_partial_stream_stream 108 else: 109 self.stream_stream = hang_stream_stream 110 elif self.request_streaming: 111 if partial_hang: 112 self.stream_unary = hang_partial_stream_unary 113 else: 114 self.stream_unary = hang_stream_unary 115 elif self.response_streaming: 116 if partial_hang: 117 self.unary_stream = hang_partial_unary_stream 118 else: 119 self.unary_stream = hang_unary_stream 120 else: 121 self.unary_unary = hang_unary_unary 122 123 124class GenericHandler(grpc.GenericRpcHandler): 125 126 def service(self, handler_call_details): 127 if handler_call_details.method == UNARY_UNARY: 128 return MethodHandler(False, False, False) 129 elif handler_call_details.method == UNARY_STREAM: 130 return MethodHandler(False, True, False) 131 elif handler_call_details.method == STREAM_UNARY: 132 return MethodHandler(True, False, False) 133 elif handler_call_details.method == STREAM_STREAM: 134 return MethodHandler(True, True, False) 135 elif handler_call_details.method == PARTIAL_UNARY_STREAM: 136 return MethodHandler(False, True, True) 137 elif handler_call_details.method == PARTIAL_STREAM_UNARY: 138 return MethodHandler(True, False, True) 139 elif handler_call_details.method == PARTIAL_STREAM_STREAM: 140 return MethodHandler(True, True, True) 141 else: 142 return None 143 144 145# Traditional executors will not exit until all their 146# current jobs complete. Because we submit jobs that will 147# never finish, we don't want to block exit on these jobs. 148class DaemonPool(object): 149 150 def submit(self, fn, *args, **kwargs): 151 thread = threading.Thread(target=fn, args=args, kwargs=kwargs) 152 thread.daemon = True 153 thread.start() 154 155 def shutdown(self, wait=True): 156 pass 157 158 159def infinite_request_iterator(): 160 while True: 161 yield REQUEST 162 163 164if __name__ == '__main__': 165 logging.basicConfig() 166 parser = argparse.ArgumentParser() 167 parser.add_argument('scenario', type=str) 168 parser.add_argument('--wait_for_interrupt', 169 dest='wait_for_interrupt', 170 action='store_true') 171 args = parser.parse_args() 172 173 if args.scenario == UNSTARTED_SERVER: 174 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 175 if args.wait_for_interrupt: 176 time.sleep(WAIT_TIME) 177 elif args.scenario == RUNNING_SERVER: 178 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 179 port = server.add_insecure_port('[::]:0') 180 server.start() 181 if args.wait_for_interrupt: 182 time.sleep(WAIT_TIME) 183 elif args.scenario == POLL_CONNECTIVITY_NO_SERVER: 184 channel = grpc.insecure_channel('localhost:12345') 185 186 def connectivity_callback(connectivity): 187 pass 188 189 channel.subscribe(connectivity_callback, try_to_connect=True) 190 if args.wait_for_interrupt: 191 time.sleep(WAIT_TIME) 192 elif args.scenario == POLL_CONNECTIVITY: 193 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 194 port = server.add_insecure_port('[::]:0') 195 server.start() 196 channel = grpc.insecure_channel('localhost:%d' % port) 197 198 def connectivity_callback(connectivity): 199 pass 200 201 channel.subscribe(connectivity_callback, try_to_connect=True) 202 if args.wait_for_interrupt: 203 time.sleep(WAIT_TIME) 204 205 else: 206 handler = GenericHandler() 207 server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),)) 208 port = server.add_insecure_port('[::]:0') 209 server.add_generic_rpc_handlers((handler,)) 210 server.start() 211 channel = grpc.insecure_channel('localhost:%d' % port) 212 213 method = TEST_TO_METHOD[args.scenario] 214 215 if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL: 216 multi_callable = channel.unary_unary(method) 217 future = multi_callable.future(REQUEST) 218 result, call = multi_callable.with_call(REQUEST) 219 elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or 220 args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL): 221 multi_callable = channel.unary_stream(method) 222 response_iterator = multi_callable(REQUEST) 223 for response in response_iterator: 224 pass 225 elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or 226 args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL): 227 multi_callable = channel.stream_unary(method) 228 future = multi_callable.future(infinite_request_iterator()) 229 result, call = multi_callable.with_call( 230 iter([REQUEST] * test_constants.STREAM_LENGTH)) 231 elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or 232 args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL): 233 multi_callable = channel.stream_stream(method) 234 response_iterator = multi_callable(infinite_request_iterator()) 235 for response in response_iterator: 236 pass 237