• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 the gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Client for testing responsiveness to signals."""
15
16from __future__ import print_function
17
18import argparse
19import functools
20import logging
21import signal
22import sys
23
24import grpc
25
26SIGTERM_MESSAGE = "Handling sigterm!"
27
28UNARY_UNARY = "/test/Unary"
29UNARY_STREAM = "/test/ServerStreaming"
30
31_MESSAGE = b"\x00\x00\x00"
32
33_ASSERTION_MESSAGE = "Control flow should never reach here."
34
35# NOTE(gnossen): We use a global variable here so that the signal handler can be
36# installed before the RPC begins. If we do not do this, then we may receive the
37# SIGINT before the signal handler is installed. I'm not happy with per-process
38# global state, but the per-process global state that is signal handlers
39# somewhat forces my hand.
40per_process_rpc_future = None
41
42
43def handle_sigint(unused_signum, unused_frame):
44    print(SIGTERM_MESSAGE)
45    if per_process_rpc_future is not None:
46        per_process_rpc_future.cancel()
47    sys.stderr.flush()
48    # This sys.exit(0) avoids an exception caused by the cancelled RPC.
49    sys.exit(0)
50
51
52def main_unary(server_target):
53    """Initiate a unary RPC to be interrupted by a SIGINT."""
54    global per_process_rpc_future  # pylint: disable=global-statement
55    with grpc.insecure_channel(server_target) as channel:
56        multicallable = channel.unary_unary(
57            UNARY_UNARY,
58            _registered_method=True,
59        )
60        signal.signal(signal.SIGINT, handle_sigint)
61        per_process_rpc_future = multicallable.future(
62            _MESSAGE, wait_for_ready=True
63        )
64        result = per_process_rpc_future.result()
65        assert False, _ASSERTION_MESSAGE
66
67
68def main_streaming(server_target):
69    """Initiate a streaming RPC to be interrupted by a SIGINT."""
70    global per_process_rpc_future  # pylint: disable=global-statement
71    with grpc.insecure_channel(server_target) as channel:
72        signal.signal(signal.SIGINT, handle_sigint)
73        per_process_rpc_future = channel.unary_stream(
74            UNARY_STREAM,
75            _registered_method=True,
76        )(_MESSAGE, wait_for_ready=True)
77        for result in per_process_rpc_future:
78            pass
79        assert False, _ASSERTION_MESSAGE
80
81
82def main_unary_with_exception(server_target):
83    """Initiate a unary RPC with a signal handler that will raise."""
84    channel = grpc.insecure_channel(server_target)
85    try:
86        channel.unary_unary(
87            UNARY_UNARY,
88            _registered_method=True,
89        )(_MESSAGE, wait_for_ready=True)
90    except KeyboardInterrupt:
91        sys.stderr.write("Running signal handler.\n")
92        sys.stderr.flush()
93
94    # This call should not freeze.
95    channel.close()
96
97
98def main_streaming_with_exception(server_target):
99    """Initiate a streaming RPC with a signal handler that will raise."""
100    channel = grpc.insecure_channel(server_target)
101    try:
102        for _ in channel.unary_stream(
103            UNARY_STREAM,
104            _registered_method=True,
105        )(_MESSAGE, wait_for_ready=True):
106            pass
107    except KeyboardInterrupt:
108        sys.stderr.write("Running signal handler.\n")
109        sys.stderr.flush()
110
111    # This call should not freeze.
112    channel.close()
113
114
115if __name__ == "__main__":
116    parser = argparse.ArgumentParser(description="Signal test client.")
117    parser.add_argument("server", help="Server target")
118    parser.add_argument("arity", help="Arity", choices=("unary", "streaming"))
119    parser.add_argument(
120        "--exception",
121        help="Whether the signal throws an exception",
122        action="store_true",
123    )
124    parser.add_argument(
125        "--gevent", help="Whether to run under gevent.", action="store_true"
126    )
127    args = parser.parse_args()
128    if args.gevent:
129        from gevent import monkey
130        import gevent.util
131
132        monkey.patch_all()
133
134        import grpc.experimental.gevent
135
136        grpc.experimental.gevent.init_gevent()
137
138    if args.arity == "unary" and not args.exception:
139        main_unary(args.server)
140    elif args.arity == "streaming" and not args.exception:
141        main_streaming(args.server)
142    elif args.arity == "unary" and args.exception:
143        main_unary_with_exception(args.server)
144    else:
145        main_streaming_with_exception(args.server)
146