1# Copyright 2019 the gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client for testing responsiveness to signals.""" 15 16from __future__ import print_function 17 18import argparse 19import functools 20import logging 21import signal 22import sys 23 24import grpc 25 26SIGTERM_MESSAGE = "Handling sigterm!" 27 28_SERVICE_NAME = "test" 29UNARY_UNARY = "Unary" 30UNARY_STREAM = "ServerStreaming" 31 32_MESSAGE = b"\x00\x00\x00" 33 34_ASSERTION_MESSAGE = "Control flow should never reach here." 35 36# NOTE(gnossen): We use a global variable here so that the signal handler can be 37# installed before the RPC begins. If we do not do this, then we may receive the 38# SIGINT before the signal handler is installed. I'm not happy with per-process 39# global state, but the per-process global state that is signal handlers 40# somewhat forces my hand. 41per_process_rpc_future = None 42 43 44def handle_sigint(unused_signum, unused_frame): 45 print(SIGTERM_MESSAGE) 46 if per_process_rpc_future is not None: 47 per_process_rpc_future.cancel() 48 sys.stderr.flush() 49 # This sys.exit(0) avoids an exception caused by the cancelled RPC. 50 sys.exit(0) 51 52 53def main_unary(server_target): 54 """Initiate a unary RPC to be interrupted by a SIGINT.""" 55 global per_process_rpc_future # pylint: disable=global-statement 56 with grpc.insecure_channel(server_target) as channel: 57 multicallable = channel.unary_unary( 58 grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_UNARY), 59 _registered_method=True, 60 ) 61 signal.signal(signal.SIGINT, handle_sigint) 62 per_process_rpc_future = multicallable.future( 63 _MESSAGE, wait_for_ready=True 64 ) 65 result = per_process_rpc_future.result() 66 assert False, _ASSERTION_MESSAGE 67 68 69def main_streaming(server_target): 70 """Initiate a streaming RPC to be interrupted by a SIGINT.""" 71 global per_process_rpc_future # pylint: disable=global-statement 72 with grpc.insecure_channel(server_target) as channel: 73 signal.signal(signal.SIGINT, handle_sigint) 74 per_process_rpc_future = channel.unary_stream( 75 grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_STREAM), 76 _registered_method=True, 77 )(_MESSAGE, wait_for_ready=True) 78 for result in per_process_rpc_future: 79 pass 80 assert False, _ASSERTION_MESSAGE 81 82 83def main_unary_with_exception(server_target): 84 """Initiate a unary RPC with a signal handler that will raise.""" 85 channel = grpc.insecure_channel(server_target) 86 try: 87 channel.unary_unary( 88 grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_UNARY), 89 _registered_method=True, 90 )(_MESSAGE, wait_for_ready=True) 91 except KeyboardInterrupt: 92 sys.stderr.write("Running signal handler.\n") 93 sys.stderr.flush() 94 95 # This call should not freeze. 96 channel.close() 97 98 99def main_streaming_with_exception(server_target): 100 """Initiate a streaming RPC with a signal handler that will raise.""" 101 channel = grpc.insecure_channel(server_target) 102 try: 103 for _ in channel.unary_stream( 104 grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_STREAM), 105 _registered_method=True, 106 )(_MESSAGE, wait_for_ready=True): 107 pass 108 except KeyboardInterrupt: 109 sys.stderr.write("Running signal handler.\n") 110 sys.stderr.flush() 111 112 # This call should not freeze. 113 channel.close() 114 115 116if __name__ == "__main__": 117 parser = argparse.ArgumentParser(description="Signal test client.") 118 parser.add_argument("server", help="Server target") 119 parser.add_argument("arity", help="Arity", choices=("unary", "streaming")) 120 parser.add_argument( 121 "--exception", 122 help="Whether the signal throws an exception", 123 action="store_true", 124 ) 125 parser.add_argument( 126 "--gevent", help="Whether to run under gevent.", action="store_true" 127 ) 128 args = parser.parse_args() 129 if args.gevent: 130 from gevent import monkey 131 import gevent.util 132 133 monkey.patch_all() 134 135 import grpc.experimental.gevent 136 137 grpc.experimental.gevent.init_gevent() 138 139 if args.arity == "unary" and not args.exception: 140 main_unary(args.server) 141 elif args.arity == "streaming" and not args.exception: 142 main_streaming(args.server) 143 elif args.arity == "unary" and args.exception: 144 main_unary_with_exception(args.server) 145 else: 146 main_streaming_with_exception(args.server) 147