• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import logging
16import unittest
17
18import grpc
19
20from tests.unit import test_common
21from tests.unit.framework.common import test_constants
22from tests.unit.framework.common import test_control
23
24_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
25_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2 :]
26_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
27_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[: len(bytestring) // 3]
28
29_SERVICE_NAME = "test"
30_UNARY_UNARY = "UnaryUnary"
31_UNARY_UNARY_NESTED_EXCEPTION = "UnaryUnaryNestedException"
32_UNARY_STREAM = "UnaryStream"
33_STREAM_UNARY = "StreamUnary"
34_STREAM_STREAM = "StreamStream"
35_DEFECTIVE_GENERIC_RPC_HANDLER = "DefectiveGenericRpcHandler"
36
37
38class _Handler(object):
39    def __init__(self, control):
40        self._control = control
41
42    def handle_unary_unary(self, request, servicer_context):
43        self._control.control()
44        if servicer_context is not None:
45            servicer_context.set_trailing_metadata(
46                (
47                    (
48                        "testkey",
49                        "testvalue",
50                    ),
51                )
52            )
53        return request
54
55    def handle_unary_unary_with_nested_exception(
56        self, request, servicer_context
57    ):
58        raise test_control.NestedDefect()
59
60    def handle_unary_stream(self, request, servicer_context):
61        for _ in range(test_constants.STREAM_LENGTH):
62            self._control.control()
63            yield request
64        self._control.control()
65        if servicer_context is not None:
66            servicer_context.set_trailing_metadata(
67                (
68                    (
69                        "testkey",
70                        "testvalue",
71                    ),
72                )
73            )
74
75    def handle_stream_unary(self, request_iterator, servicer_context):
76        if servicer_context is not None:
77            servicer_context.invocation_metadata()
78        self._control.control()
79        response_elements = []
80        for request in request_iterator:
81            self._control.control()
82            response_elements.append(request)
83        self._control.control()
84        if servicer_context is not None:
85            servicer_context.set_trailing_metadata(
86                (
87                    (
88                        "testkey",
89                        "testvalue",
90                    ),
91                )
92            )
93        return b"".join(response_elements)
94
95    def handle_stream_stream(self, request_iterator, servicer_context):
96        self._control.control()
97        if servicer_context is not None:
98            servicer_context.set_trailing_metadata(
99                (
100                    (
101                        "testkey",
102                        "testvalue",
103                    ),
104                )
105            )
106        for request in request_iterator:
107            self._control.control()
108            yield request
109        self._control.control()
110
111    def defective_generic_rpc_handler(self):
112        raise test_control.Defect()
113
114
115class _MethodHandler(grpc.RpcMethodHandler):
116    def __init__(
117        self,
118        request_streaming,
119        response_streaming,
120        request_deserializer,
121        response_serializer,
122        unary_unary,
123        unary_stream,
124        stream_unary,
125        stream_stream,
126    ):
127        self.request_streaming = request_streaming
128        self.response_streaming = response_streaming
129        self.request_deserializer = request_deserializer
130        self.response_serializer = response_serializer
131        self.unary_unary = unary_unary
132        self.unary_stream = unary_stream
133        self.stream_unary = stream_unary
134        self.stream_stream = stream_stream
135
136
137def get_method_handlers(handler):
138    return {
139        _UNARY_UNARY: _MethodHandler(
140            False,
141            False,
142            None,
143            None,
144            handler.handle_unary_unary,
145            None,
146            None,
147            None,
148        ),
149        _UNARY_STREAM: _MethodHandler(
150            False,
151            True,
152            _DESERIALIZE_REQUEST,
153            _SERIALIZE_RESPONSE,
154            None,
155            handler.handle_unary_stream,
156            None,
157            None,
158        ),
159        _STREAM_UNARY: _MethodHandler(
160            True,
161            False,
162            _DESERIALIZE_REQUEST,
163            _SERIALIZE_RESPONSE,
164            None,
165            None,
166            handler.handle_stream_unary,
167            None,
168        ),
169        _STREAM_STREAM: _MethodHandler(
170            True,
171            True,
172            None,
173            None,
174            None,
175            None,
176            None,
177            handler.handle_stream_stream,
178        ),
179        _DEFECTIVE_GENERIC_RPC_HANDLER: _MethodHandler(
180            False,
181            False,
182            None,
183            None,
184            handler.defective_generic_rpc_handler,
185            None,
186            None,
187            None,
188        ),
189        _UNARY_UNARY_NESTED_EXCEPTION: _MethodHandler(
190            False,
191            False,
192            None,
193            None,
194            handler.handle_unary_unary_with_nested_exception,
195            None,
196            None,
197            None,
198        ),
199    }
200
201
202class FailAfterFewIterationsCounter(object):
203    def __init__(self, high, bytestring):
204        self._current = 0
205        self._high = high
206        self._bytestring = bytestring
207
208    def __iter__(self):
209        return self
210
211    def __next__(self):
212        if self._current >= self._high:
213            raise test_control.Defect()
214        else:
215            self._current += 1
216            return self._bytestring
217
218    next = __next__
219
220
221def _unary_unary_multi_callable(channel):
222    return channel.unary_unary(
223        grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_UNARY),
224        _registered_method=True,
225    )
226
227
228def _unary_stream_multi_callable(channel):
229    return channel.unary_stream(
230        grpc._common.fully_qualified_method(_SERVICE_NAME, _UNARY_STREAM),
231        request_serializer=_SERIALIZE_REQUEST,
232        response_deserializer=_DESERIALIZE_RESPONSE,
233        _registered_method=True,
234    )
235
236
237def _stream_unary_multi_callable(channel):
238    return channel.stream_unary(
239        grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_UNARY),
240        request_serializer=_SERIALIZE_REQUEST,
241        response_deserializer=_DESERIALIZE_RESPONSE,
242        _registered_method=True,
243    )
244
245
246def _stream_stream_multi_callable(channel):
247    return channel.stream_stream(
248        grpc._common.fully_qualified_method(_SERVICE_NAME, _STREAM_STREAM),
249        _registered_method=True,
250    )
251
252
253def _defective_handler_multi_callable(channel):
254    return channel.unary_unary(
255        grpc._common.fully_qualified_method(
256            _SERVICE_NAME, _DEFECTIVE_GENERIC_RPC_HANDLER
257        ),
258        _registered_method=True,
259    )
260
261
262def _defective_nested_exception_handler_multi_callable(channel):
263    return channel.unary_unary(
264        grpc._common.fully_qualified_method(
265            _SERVICE_NAME, _UNARY_UNARY_NESTED_EXCEPTION
266        ),
267        _registered_method=True,
268    )
269
270
271class InvocationDefectsTest(unittest.TestCase):
272    """Tests the handling of exception-raising user code on the client-side."""
273
274    def setUp(self):
275        self._control = test_control.PauseFailControl()
276        self._handler = _Handler(self._control)
277
278        self._server = test_common.test_server()
279        port = self._server.add_insecure_port("[::]:0")
280        self._server.add_registered_method_handlers(
281            _SERVICE_NAME, get_method_handlers(self._handler)
282        )
283        self._server.start()
284
285        self._channel = grpc.insecure_channel("localhost:%d" % port)
286
287    def tearDown(self):
288        self._server.stop(0)
289        self._channel.close()
290
291    def testIterableStreamRequestBlockingUnaryResponse(self):
292        requests = object()
293        multi_callable = _stream_unary_multi_callable(self._channel)
294
295        with self.assertRaises(grpc.RpcError) as exception_context:
296            multi_callable(
297                requests,
298                metadata=(
299                    ("test", "IterableStreamRequestBlockingUnaryResponse"),
300                ),
301            )
302
303        self.assertIs(
304            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
305        )
306
307    def testIterableStreamRequestFutureUnaryResponse(self):
308        requests = object()
309        multi_callable = _stream_unary_multi_callable(self._channel)
310        response_future = multi_callable.future(
311            requests,
312            metadata=(("test", "IterableStreamRequestFutureUnaryResponse"),),
313        )
314
315        with self.assertRaises(grpc.RpcError) as exception_context:
316            response_future.result()
317
318        self.assertIs(
319            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
320        )
321
322    def testIterableStreamRequestStreamResponse(self):
323        requests = object()
324        multi_callable = _stream_stream_multi_callable(self._channel)
325        response_iterator = multi_callable(
326            requests,
327            metadata=(("test", "IterableStreamRequestStreamResponse"),),
328        )
329
330        with self.assertRaises(grpc.RpcError) as exception_context:
331            next(response_iterator)
332
333        self.assertIs(
334            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
335        )
336
337    def testIteratorStreamRequestStreamResponse(self):
338        requests_iterator = FailAfterFewIterationsCounter(
339            test_constants.STREAM_LENGTH // 2, b"\x07\x08"
340        )
341        multi_callable = _stream_stream_multi_callable(self._channel)
342        response_iterator = multi_callable(
343            requests_iterator,
344            metadata=(("test", "IteratorStreamRequestStreamResponse"),),
345        )
346
347        with self.assertRaises(grpc.RpcError) as exception_context:
348            for _ in range(test_constants.STREAM_LENGTH // 2 + 1):
349                next(response_iterator)
350
351        self.assertIs(
352            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
353        )
354
355    def testDefectiveGenericRpcHandlerUnaryResponse(self):
356        request = b"\x07\x08"
357        multi_callable = _defective_handler_multi_callable(self._channel)
358
359        with self.assertRaises(grpc.RpcError) as exception_context:
360            multi_callable(
361                request, metadata=(("test", "DefectiveGenericRpcHandlerUnary"),)
362            )
363
364        self.assertIs(
365            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
366        )
367
368    def testNestedExceptionGenericRpcHandlerUnaryResponse(self):
369        request = b"\x07\x08"
370        multi_callable = _defective_nested_exception_handler_multi_callable(
371            self._channel
372        )
373
374        with self.assertRaises(grpc.RpcError) as exception_context:
375            multi_callable(
376                request, metadata=(("test", "DefectiveGenericRpcHandlerUnary"),)
377            )
378
379        self.assertIs(
380            grpc.StatusCode.UNKNOWN, exception_context.exception.code()
381        )
382
383
384if __name__ == "__main__":
385    logging.basicConfig()
386    unittest.main(verbosity=2)
387