1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" 15 16import argparse 17import os 18import threading 19import time 20import logging 21 22import grpc 23from tests.unit import test_common 24 25from concurrent import futures 26from six.moves import queue 27 28WAIT_TIME = 1000 29 30REQUEST = b'request' 31RESPONSE = b'response' 32 33SERVER_RAISES_EXCEPTION = 'server_raises_exception' 34SERVER_DEALLOCATED = 'server_deallocated' 35SERVER_FORK_CAN_EXIT = 'server_fork_can_exit' 36 37FORK_EXIT = '/test/ForkExit' 38 39 40def fork_and_exit(request, servicer_context): 41 pid = os.fork() 42 if pid == 0: 43 os._exit(0) 44 return RESPONSE 45 46 47class GenericHandler(grpc.GenericRpcHandler): 48 49 def service(self, handler_call_details): 50 if handler_call_details.method == FORK_EXIT: 51 return grpc.unary_unary_rpc_method_handler(fork_and_exit) 52 else: 53 return None 54 55 56def run_server(port_queue): 57 server = test_common.test_server() 58 port = server.add_insecure_port('[::]:0') 59 port_queue.put(port) 60 server.add_generic_rpc_handlers((GenericHandler(),)) 61 server.start() 62 # threading.Event.wait() does not exhibit the bug identified in 63 # https://github.com/grpc/grpc/issues/17093, sleep instead 64 time.sleep(WAIT_TIME) 65 66 67def run_test(args): 68 if args.scenario == SERVER_RAISES_EXCEPTION: 69 server = test_common.test_server() 70 server.start() 71 raise Exception() 72 elif args.scenario == SERVER_DEALLOCATED: 73 server = test_common.test_server() 74 server.start() 75 server.__del__() 76 while server._state.stage != grpc._server._ServerStage.STOPPED: 77 pass 78 elif args.scenario == SERVER_FORK_CAN_EXIT: 79 port_queue = queue.Queue() 80 thread = threading.Thread(target=run_server, args=(port_queue,)) 81 thread.daemon = True 82 thread.start() 83 port = port_queue.get() 84 channel = grpc.insecure_channel('localhost:%d' % port) 85 multi_callable = channel.unary_unary(FORK_EXIT) 86 result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) 87 os.wait() 88 else: 89 raise ValueError('unknown test scenario') 90 91 92if __name__ == '__main__': 93 logging.basicConfig() 94 parser = argparse.ArgumentParser() 95 parser.add_argument('scenario', type=str) 96 args = parser.parse_args() 97 run_test(args) 98