1# Copyright 2019 the gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client for testing responsiveness to signals.""" 15 16from __future__ import print_function 17 18import argparse 19import functools 20import logging 21import signal 22import sys 23 24import grpc 25 26SIGTERM_MESSAGE = "Handling sigterm!" 27 28UNARY_UNARY = "/test/Unary" 29UNARY_STREAM = "/test/ServerStreaming" 30 31_MESSAGE = b"\x00\x00\x00" 32 33_ASSERTION_MESSAGE = "Control flow should never reach here." 34 35# NOTE(gnossen): We use a global variable here so that the signal handler can be 36# installed before the RPC begins. If we do not do this, then we may receive the 37# SIGINT before the signal handler is installed. I'm not happy with per-process 38# global state, but the per-process global state that is signal handlers 39# somewhat forces my hand. 40per_process_rpc_future = None 41 42 43def handle_sigint(unused_signum, unused_frame): 44 print(SIGTERM_MESSAGE) 45 if per_process_rpc_future is not None: 46 per_process_rpc_future.cancel() 47 sys.stderr.flush() 48 # This sys.exit(0) avoids an exception caused by the cancelled RPC. 49 sys.exit(0) 50 51 52def main_unary(server_target): 53 """Initiate a unary RPC to be interrupted by a SIGINT.""" 54 global per_process_rpc_future # pylint: disable=global-statement 55 with grpc.insecure_channel(server_target) as channel: 56 multicallable = channel.unary_unary( 57 UNARY_UNARY, 58 _registered_method=True, 59 ) 60 signal.signal(signal.SIGINT, handle_sigint) 61 per_process_rpc_future = multicallable.future( 62 _MESSAGE, wait_for_ready=True 63 ) 64 result = per_process_rpc_future.result() 65 assert False, _ASSERTION_MESSAGE 66 67 68def main_streaming(server_target): 69 """Initiate a streaming RPC to be interrupted by a SIGINT.""" 70 global per_process_rpc_future # pylint: disable=global-statement 71 with grpc.insecure_channel(server_target) as channel: 72 signal.signal(signal.SIGINT, handle_sigint) 73 per_process_rpc_future = channel.unary_stream( 74 UNARY_STREAM, 75 _registered_method=True, 76 )(_MESSAGE, wait_for_ready=True) 77 for result in per_process_rpc_future: 78 pass 79 assert False, _ASSERTION_MESSAGE 80 81 82def main_unary_with_exception(server_target): 83 """Initiate a unary RPC with a signal handler that will raise.""" 84 channel = grpc.insecure_channel(server_target) 85 try: 86 channel.unary_unary( 87 UNARY_UNARY, 88 _registered_method=True, 89 )(_MESSAGE, wait_for_ready=True) 90 except KeyboardInterrupt: 91 sys.stderr.write("Running signal handler.\n") 92 sys.stderr.flush() 93 94 # This call should not freeze. 95 channel.close() 96 97 98def main_streaming_with_exception(server_target): 99 """Initiate a streaming RPC with a signal handler that will raise.""" 100 channel = grpc.insecure_channel(server_target) 101 try: 102 for _ in channel.unary_stream( 103 UNARY_STREAM, 104 _registered_method=True, 105 )(_MESSAGE, wait_for_ready=True): 106 pass 107 except KeyboardInterrupt: 108 sys.stderr.write("Running signal handler.\n") 109 sys.stderr.flush() 110 111 # This call should not freeze. 112 channel.close() 113 114 115if __name__ == "__main__": 116 parser = argparse.ArgumentParser(description="Signal test client.") 117 parser.add_argument("server", help="Server target") 118 parser.add_argument("arity", help="Arity", choices=("unary", "streaming")) 119 parser.add_argument( 120 "--exception", 121 help="Whether the signal throws an exception", 122 action="store_true", 123 ) 124 parser.add_argument( 125 "--gevent", help="Whether to run under gevent.", action="store_true" 126 ) 127 args = parser.parse_args() 128 if args.gevent: 129 from gevent import monkey 130 import gevent.util 131 132 monkey.patch_all() 133 134 import grpc.experimental.gevent 135 136 grpc.experimental.gevent.init_gevent() 137 138 if args.arity == "unary" and not args.exception: 139 main_unary(args.server) 140 elif args.arity == "streaming" and not args.exception: 141 main_streaming(args.server) 142 elif args.arity == "unary" and args.exception: 143 main_unary_with_exception(args.server) 144 else: 145 main_streaming_with_exception(args.server) 146