1# Copyright 2019 the gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Client for testing responsiveness to signals.""" 15 16from __future__ import print_function 17 18import argparse 19import functools 20import logging 21import signal 22import sys 23 24import grpc 25 26SIGTERM_MESSAGE = "Handling sigterm!" 27 28UNARY_UNARY = "/test/Unary" 29UNARY_STREAM = "/test/ServerStreaming" 30 31_MESSAGE = b'\x00\x00\x00' 32 33_ASSERTION_MESSAGE = "Control flow should never reach here." 34 35# NOTE(gnossen): We use a global variable here so that the signal handler can be 36# installed before the RPC begins. If we do not do this, then we may receive the 37# SIGINT before the signal handler is installed. I'm not happy with per-process 38# global state, but the per-process global state that is signal handlers 39# somewhat forces my hand. 40per_process_rpc_future = None 41 42 43def handle_sigint(unused_signum, unused_frame): 44 print(SIGTERM_MESSAGE) 45 if per_process_rpc_future is not None: 46 per_process_rpc_future.cancel() 47 sys.stderr.flush() 48 # This sys.exit(0) avoids an exception caused by the cancelled RPC. 49 sys.exit(0) 50 51 52def main_unary(server_target): 53 """Initiate a unary RPC to be interrupted by a SIGINT.""" 54 global per_process_rpc_future # pylint: disable=global-statement 55 with grpc.insecure_channel(server_target) as channel: 56 multicallable = channel.unary_unary(UNARY_UNARY) 57 signal.signal(signal.SIGINT, handle_sigint) 58 per_process_rpc_future = multicallable.future(_MESSAGE, 59 wait_for_ready=True) 60 result = per_process_rpc_future.result() 61 assert False, _ASSERTION_MESSAGE 62 63 64def main_streaming(server_target): 65 """Initiate a streaming RPC to be interrupted by a SIGINT.""" 66 global per_process_rpc_future # pylint: disable=global-statement 67 with grpc.insecure_channel(server_target) as channel: 68 signal.signal(signal.SIGINT, handle_sigint) 69 per_process_rpc_future = channel.unary_stream(UNARY_STREAM)( 70 _MESSAGE, wait_for_ready=True) 71 for result in per_process_rpc_future: 72 pass 73 assert False, _ASSERTION_MESSAGE 74 75 76def main_unary_with_exception(server_target): 77 """Initiate a unary RPC with a signal handler that will raise.""" 78 channel = grpc.insecure_channel(server_target) 79 try: 80 channel.unary_unary(UNARY_UNARY)(_MESSAGE, wait_for_ready=True) 81 except KeyboardInterrupt: 82 sys.stderr.write("Running signal handler.\n") 83 sys.stderr.flush() 84 85 # This call should not hang. 86 channel.close() 87 88 89def main_streaming_with_exception(server_target): 90 """Initiate a streaming RPC with a signal handler that will raise.""" 91 channel = grpc.insecure_channel(server_target) 92 try: 93 for _ in channel.unary_stream(UNARY_STREAM)(_MESSAGE, 94 wait_for_ready=True): 95 pass 96 except KeyboardInterrupt: 97 sys.stderr.write("Running signal handler.\n") 98 sys.stderr.flush() 99 100 # This call should not hang. 101 channel.close() 102 103 104if __name__ == '__main__': 105 parser = argparse.ArgumentParser(description='Signal test client.') 106 parser.add_argument('server', help='Server target') 107 parser.add_argument('arity', help='Arity', choices=('unary', 'streaming')) 108 parser.add_argument('--exception', 109 help='Whether the signal throws an exception', 110 action='store_true') 111 args = parser.parse_args() 112 if args.arity == 'unary' and not args.exception: 113 main_unary(args.server) 114 elif args.arity == 'streaming' and not args.exception: 115 main_streaming(args.server) 116 elif args.arity == 'unary' and args.exception: 117 main_unary_with_exception(args.server) 118 else: 119 main_streaming_with_exception(args.server) 120