• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Defines a number of module-scope gRPC scenarios to test clean exit."""
15
16import argparse
17import threading
18import time
19import logging
20
21import grpc
22
23from tests.unit.framework.common import test_constants
24
25WAIT_TIME = 1000
26
27REQUEST = b'request'
28
29UNSTARTED_SERVER = 'unstarted_server'
30RUNNING_SERVER = 'running_server'
31POLL_CONNECTIVITY_NO_SERVER = 'poll_connectivity_no_server'
32POLL_CONNECTIVITY = 'poll_connectivity'
33IN_FLIGHT_UNARY_UNARY_CALL = 'in_flight_unary_unary_call'
34IN_FLIGHT_UNARY_STREAM_CALL = 'in_flight_unary_stream_call'
35IN_FLIGHT_STREAM_UNARY_CALL = 'in_flight_stream_unary_call'
36IN_FLIGHT_STREAM_STREAM_CALL = 'in_flight_stream_stream_call'
37IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL = 'in_flight_partial_unary_stream_call'
38IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL = 'in_flight_partial_stream_unary_call'
39IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL = 'in_flight_partial_stream_stream_call'
40
41UNARY_UNARY = b'/test/UnaryUnary'
42UNARY_STREAM = b'/test/UnaryStream'
43STREAM_UNARY = b'/test/StreamUnary'
44STREAM_STREAM = b'/test/StreamStream'
45PARTIAL_UNARY_STREAM = b'/test/PartialUnaryStream'
46PARTIAL_STREAM_UNARY = b'/test/PartialStreamUnary'
47PARTIAL_STREAM_STREAM = b'/test/PartialStreamStream'
48
49TEST_TO_METHOD = {
50    IN_FLIGHT_UNARY_UNARY_CALL: UNARY_UNARY,
51    IN_FLIGHT_UNARY_STREAM_CALL: UNARY_STREAM,
52    IN_FLIGHT_STREAM_UNARY_CALL: STREAM_UNARY,
53    IN_FLIGHT_STREAM_STREAM_CALL: STREAM_STREAM,
54    IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL: PARTIAL_UNARY_STREAM,
55    IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL: PARTIAL_STREAM_UNARY,
56    IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL: PARTIAL_STREAM_STREAM,
57}
58
59
60def hang_unary_unary(request, servicer_context):
61    time.sleep(WAIT_TIME)
62
63
64def hang_unary_stream(request, servicer_context):
65    time.sleep(WAIT_TIME)
66
67
68def hang_partial_unary_stream(request, servicer_context):
69    for _ in range(test_constants.STREAM_LENGTH // 2):
70        yield request
71    time.sleep(WAIT_TIME)
72
73
74def hang_stream_unary(request_iterator, servicer_context):
75    time.sleep(WAIT_TIME)
76
77
78def hang_partial_stream_unary(request_iterator, servicer_context):
79    for _ in range(test_constants.STREAM_LENGTH // 2):
80        next(request_iterator)
81    time.sleep(WAIT_TIME)
82
83
84def hang_stream_stream(request_iterator, servicer_context):
85    time.sleep(WAIT_TIME)
86
87
88def hang_partial_stream_stream(request_iterator, servicer_context):
89    for _ in range(test_constants.STREAM_LENGTH // 2):
90        yield next(request_iterator)  #pylint: disable=stop-iteration-return
91    time.sleep(WAIT_TIME)
92
93
94class MethodHandler(grpc.RpcMethodHandler):
95
96    def __init__(self, request_streaming, response_streaming, partial_hang):
97        self.request_streaming = request_streaming
98        self.response_streaming = response_streaming
99        self.request_deserializer = None
100        self.response_serializer = None
101        self.unary_unary = None
102        self.unary_stream = None
103        self.stream_unary = None
104        self.stream_stream = None
105        if self.request_streaming and self.response_streaming:
106            if partial_hang:
107                self.stream_stream = hang_partial_stream_stream
108            else:
109                self.stream_stream = hang_stream_stream
110        elif self.request_streaming:
111            if partial_hang:
112                self.stream_unary = hang_partial_stream_unary
113            else:
114                self.stream_unary = hang_stream_unary
115        elif self.response_streaming:
116            if partial_hang:
117                self.unary_stream = hang_partial_unary_stream
118            else:
119                self.unary_stream = hang_unary_stream
120        else:
121            self.unary_unary = hang_unary_unary
122
123
124class GenericHandler(grpc.GenericRpcHandler):
125
126    def service(self, handler_call_details):
127        if handler_call_details.method == UNARY_UNARY:
128            return MethodHandler(False, False, False)
129        elif handler_call_details.method == UNARY_STREAM:
130            return MethodHandler(False, True, False)
131        elif handler_call_details.method == STREAM_UNARY:
132            return MethodHandler(True, False, False)
133        elif handler_call_details.method == STREAM_STREAM:
134            return MethodHandler(True, True, False)
135        elif handler_call_details.method == PARTIAL_UNARY_STREAM:
136            return MethodHandler(False, True, True)
137        elif handler_call_details.method == PARTIAL_STREAM_UNARY:
138            return MethodHandler(True, False, True)
139        elif handler_call_details.method == PARTIAL_STREAM_STREAM:
140            return MethodHandler(True, True, True)
141        else:
142            return None
143
144
145# Traditional executors will not exit until all their
146# current jobs complete.  Because we submit jobs that will
147# never finish, we don't want to block exit on these jobs.
148class DaemonPool(object):
149
150    def submit(self, fn, *args, **kwargs):
151        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
152        thread.daemon = True
153        thread.start()
154
155    def shutdown(self, wait=True):
156        pass
157
158
159def infinite_request_iterator():
160    while True:
161        yield REQUEST
162
163
164if __name__ == '__main__':
165    logging.basicConfig()
166    parser = argparse.ArgumentParser()
167    parser.add_argument('scenario', type=str)
168    parser.add_argument('--wait_for_interrupt',
169                        dest='wait_for_interrupt',
170                        action='store_true')
171    args = parser.parse_args()
172
173    if args.scenario == UNSTARTED_SERVER:
174        server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
175        if args.wait_for_interrupt:
176            time.sleep(WAIT_TIME)
177    elif args.scenario == RUNNING_SERVER:
178        server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
179        port = server.add_insecure_port('[::]:0')
180        server.start()
181        if args.wait_for_interrupt:
182            time.sleep(WAIT_TIME)
183    elif args.scenario == POLL_CONNECTIVITY_NO_SERVER:
184        channel = grpc.insecure_channel('localhost:12345')
185
186        def connectivity_callback(connectivity):
187            pass
188
189        channel.subscribe(connectivity_callback, try_to_connect=True)
190        if args.wait_for_interrupt:
191            time.sleep(WAIT_TIME)
192    elif args.scenario == POLL_CONNECTIVITY:
193        server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
194        port = server.add_insecure_port('[::]:0')
195        server.start()
196        channel = grpc.insecure_channel('localhost:%d' % port)
197
198        def connectivity_callback(connectivity):
199            pass
200
201        channel.subscribe(connectivity_callback, try_to_connect=True)
202        if args.wait_for_interrupt:
203            time.sleep(WAIT_TIME)
204
205    else:
206        handler = GenericHandler()
207        server = grpc.server(DaemonPool(), options=(('grpc.so_reuseport', 0),))
208        port = server.add_insecure_port('[::]:0')
209        server.add_generic_rpc_handlers((handler,))
210        server.start()
211        channel = grpc.insecure_channel('localhost:%d' % port)
212
213        method = TEST_TO_METHOD[args.scenario]
214
215        if args.scenario == IN_FLIGHT_UNARY_UNARY_CALL:
216            multi_callable = channel.unary_unary(method)
217            future = multi_callable.future(REQUEST)
218            result, call = multi_callable.with_call(REQUEST)
219        elif (args.scenario == IN_FLIGHT_UNARY_STREAM_CALL or
220              args.scenario == IN_FLIGHT_PARTIAL_UNARY_STREAM_CALL):
221            multi_callable = channel.unary_stream(method)
222            response_iterator = multi_callable(REQUEST)
223            for response in response_iterator:
224                pass
225        elif (args.scenario == IN_FLIGHT_STREAM_UNARY_CALL or
226              args.scenario == IN_FLIGHT_PARTIAL_STREAM_UNARY_CALL):
227            multi_callable = channel.stream_unary(method)
228            future = multi_callable.future(infinite_request_iterator())
229            result, call = multi_callable.with_call(
230                iter([REQUEST] * test_constants.STREAM_LENGTH))
231        elif (args.scenario == IN_FLIGHT_STREAM_STREAM_CALL or
232              args.scenario == IN_FLIGHT_PARTIAL_STREAM_STREAM_CALL):
233            multi_callable = channel.stream_stream(method)
234            response_iterator = multi_callable(infinite_request_iterator())
235            for response in response_iterator:
236                pass
237