1# Copyright 2018 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Defines a number of module-scope gRPC scenarios to test server shutdown.""" 15 16import argparse 17from concurrent import futures 18import logging 19import os 20import queue 21import threading 22import time 23 24import grpc 25 26from tests.unit import test_common 27 28WAIT_TIME = 1000 29 30REQUEST = b"request" 31RESPONSE = b"response" 32 33SERVER_RAISES_EXCEPTION = "server_raises_exception" 34SERVER_DEALLOCATED = "server_deallocated" 35SERVER_FORK_CAN_EXIT = "server_fork_can_exit" 36 37_SERVICE_NAME = "test" 38FORK_EXIT = "ForkExit" 39 40 41def fork_and_exit(request, servicer_context): 42 pid = os.fork() 43 if pid == 0: 44 os._exit(0) 45 return RESPONSE 46 47 48class GenericHandler(grpc.GenericRpcHandler): 49 def service(self, handler_call_details): 50 if handler_call_details.method == FORK_EXIT: 51 return grpc.unary_unary_rpc_method_handler(fork_and_exit) 52 else: 53 return None 54 55 56_METHOD_HANDLERS = { 57 FORK_EXIT: grpc.unary_unary_rpc_method_handler(fork_and_exit) 58} 59 60 61def run_server(port_queue): 62 server = test_common.test_server() 63 port = server.add_insecure_port("[::]:0") 64 port_queue.put(port) 65 server.add_registered_method_handlers(_SERVICE_NAME, _METHOD_HANDLERS) 66 server.start() 67 # threading.Event.wait() does not exhibit the bug identified in 68 # https://github.com/grpc/grpc/issues/17093, sleep instead 69 time.sleep(WAIT_TIME) 70 71 72def run_test(args): 73 if args.scenario == SERVER_RAISES_EXCEPTION: 74 server = test_common.test_server() 75 server.start() 76 raise Exception() 77 elif args.scenario == SERVER_DEALLOCATED: 78 server = test_common.test_server() 79 server.start() 80 server.__del__() 81 while server._state.stage != grpc._server._ServerStage.STOPPED: 82 pass 83 elif args.scenario == SERVER_FORK_CAN_EXIT: 84 port_queue = queue.Queue() 85 thread = threading.Thread(target=run_server, args=(port_queue,)) 86 thread.daemon = True 87 thread.start() 88 port = port_queue.get() 89 channel = grpc.insecure_channel("localhost:%d" % port) 90 multi_callable = channel.unary_unary( 91 grpc._common.fully_qualified_method(_SERVICE_NAME, FORK_EXIT), 92 _registered_method=True, 93 ) 94 result, call = multi_callable.with_call(REQUEST, wait_for_ready=True) 95 os.wait() 96 else: 97 raise ValueError("unknown test scenario") 98 99 100if __name__ == "__main__": 101 logging.basicConfig() 102 parser = argparse.ArgumentParser() 103 parser.add_argument("scenario", type=str) 104 args = parser.parse_args() 105 run_test(args) 106