• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 the gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Client for testing responsiveness to signals."""
15
16from __future__ import print_function
17
18import argparse
19import functools
20import logging
21import signal
22import sys
23
24import grpc
25
26SIGTERM_MESSAGE = "Handling sigterm!"
27
28_SERVICE_NAME = "test"
29UNARY_UNARY = "Unary"
30UNARY_STREAM = "ServerStreaming"
31
32_MESSAGE = b"\x00\x00\x00"
33
34_ASSERTION_MESSAGE = "Control flow should never reach here."
35
36# NOTE(gnossen): We use a global variable here so that the signal handler can be
37# installed before the RPC begins. If we do not do this, then we may receive the
38# SIGINT before the signal handler is installed. I'm not happy with per-process
39# global state, but the per-process global state that is signal handlers
40# somewhat forces my hand.
41per_process_rpc_future = None
42
43
44def handle_sigint(unused_signum, unused_frame):
45    print(SIGTERM_MESSAGE)
46    if per_process_rpc_future is not None:
47        per_process_rpc_future.cancel()
48    sys.stderr.flush()
49    # This sys.exit(0) avoids an exception caused by the cancelled RPC.
50    sys.exit(0)
51
52
53def main_unary(server_target):
54    """Initiate a unary RPC to be interrupted by a SIGINT."""
55    global per_process_rpc_future  # pylint: disable=global-statement
56    with grpc.insecure_channel(server_target) as channel:
57        multicallable = channel.unary_unary(
58            grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_UNARY),
59            _registered_method=True,
60        )
61        signal.signal(signal.SIGINT, handle_sigint)
62        per_process_rpc_future = multicallable.future(
63            _MESSAGE, wait_for_ready=True
64        )
65        result = per_process_rpc_future.result()
66        assert False, _ASSERTION_MESSAGE
67
68
69def main_streaming(server_target):
70    """Initiate a streaming RPC to be interrupted by a SIGINT."""
71    global per_process_rpc_future  # pylint: disable=global-statement
72    with grpc.insecure_channel(server_target) as channel:
73        signal.signal(signal.SIGINT, handle_sigint)
74        per_process_rpc_future = channel.unary_stream(
75            grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_STREAM),
76            _registered_method=True,
77        )(_MESSAGE, wait_for_ready=True)
78        for result in per_process_rpc_future:
79            pass
80        assert False, _ASSERTION_MESSAGE
81
82
83def main_unary_with_exception(server_target):
84    """Initiate a unary RPC with a signal handler that will raise."""
85    channel = grpc.insecure_channel(server_target)
86    try:
87        channel.unary_unary(
88            grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_UNARY),
89            _registered_method=True,
90        )(_MESSAGE, wait_for_ready=True)
91    except KeyboardInterrupt:
92        sys.stderr.write("Running signal handler.\n")
93        sys.stderr.flush()
94
95    # This call should not freeze.
96    channel.close()
97
98
99def main_streaming_with_exception(server_target):
100    """Initiate a streaming RPC with a signal handler that will raise."""
101    channel = grpc.insecure_channel(server_target)
102    try:
103        for _ in channel.unary_stream(
104            grpc._common.fully_qualified_method(_SERVICE_NAME, UNARY_STREAM),
105            _registered_method=True,
106        )(_MESSAGE, wait_for_ready=True):
107            pass
108    except KeyboardInterrupt:
109        sys.stderr.write("Running signal handler.\n")
110        sys.stderr.flush()
111
112    # This call should not freeze.
113    channel.close()
114
115
116if __name__ == "__main__":
117    parser = argparse.ArgumentParser(description="Signal test client.")
118    parser.add_argument("server", help="Server target")
119    parser.add_argument("arity", help="Arity", choices=("unary", "streaming"))
120    parser.add_argument(
121        "--exception",
122        help="Whether the signal throws an exception",
123        action="store_true",
124    )
125    parser.add_argument(
126        "--gevent", help="Whether to run under gevent.", action="store_true"
127    )
128    args = parser.parse_args()
129    if args.gevent:
130        from gevent import monkey
131        import gevent.util
132
133        monkey.patch_all()
134
135        import grpc.experimental.gevent
136
137        grpc.experimental.gevent.init_gevent()
138
139    if args.arity == "unary" and not args.exception:
140        main_unary(args.server)
141    elif args.arity == "streaming" and not args.exception:
142        main_streaming(args.server)
143    elif args.arity == "unary" and args.exception:
144        main_unary_with_exception(args.server)
145    else:
146        main_streaming_with_exception(args.server)
147