• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of RPCs made against gRPC Python's application-layer API."""
15
16import itertools
17import threading
18import unittest
19from concurrent import futures
20
21import grpc
22from grpc.framework.foundation import logging_pool
23
24from tests.unit import test_common
25from tests.unit.framework.common import test_constants
26from tests.unit.framework.common import test_control
27
28_SERIALIZE_REQUEST = lambda bytestring: bytestring * 2
29_DESERIALIZE_REQUEST = lambda bytestring: bytestring[len(bytestring) // 2:]
30_SERIALIZE_RESPONSE = lambda bytestring: bytestring * 3
31_DESERIALIZE_RESPONSE = lambda bytestring: bytestring[:len(bytestring) // 3]
32
33_UNARY_UNARY = '/test/UnaryUnary'
34_UNARY_STREAM = '/test/UnaryStream'
35_STREAM_UNARY = '/test/StreamUnary'
36_STREAM_STREAM = '/test/StreamStream'
37
38
39class _Callback(object):
40
41    def __init__(self):
42        self._condition = threading.Condition()
43        self._value = None
44        self._called = False
45
46    def __call__(self, value):
47        with self._condition:
48            self._value = value
49            self._called = True
50            self._condition.notify_all()
51
52    def value(self):
53        with self._condition:
54            while not self._called:
55                self._condition.wait()
56            return self._value
57
58
59class _Handler(object):
60
61    def __init__(self, control):
62        self._control = control
63
64    def handle_unary_unary(self, request, servicer_context):
65        self._control.control()
66        if servicer_context is not None:
67            servicer_context.set_trailing_metadata(((
68                'testkey',
69                'testvalue',
70            ),))
71            # TODO(https://github.com/grpc/grpc/issues/8483): test the values
72            # returned by these methods rather than only "smoke" testing that
73            # the return after having been called.
74            servicer_context.is_active()
75            servicer_context.time_remaining()
76        return request
77
78    def handle_unary_stream(self, request, servicer_context):
79        for _ in range(test_constants.STREAM_LENGTH):
80            self._control.control()
81            yield request
82        self._control.control()
83        if servicer_context is not None:
84            servicer_context.set_trailing_metadata(((
85                'testkey',
86                'testvalue',
87            ),))
88
89    def handle_stream_unary(self, request_iterator, servicer_context):
90        if servicer_context is not None:
91            servicer_context.invocation_metadata()
92        self._control.control()
93        response_elements = []
94        for request in request_iterator:
95            self._control.control()
96            response_elements.append(request)
97        self._control.control()
98        if servicer_context is not None:
99            servicer_context.set_trailing_metadata(((
100                'testkey',
101                'testvalue',
102            ),))
103        return b''.join(response_elements)
104
105    def handle_stream_stream(self, request_iterator, servicer_context):
106        self._control.control()
107        if servicer_context is not None:
108            servicer_context.set_trailing_metadata(((
109                'testkey',
110                'testvalue',
111            ),))
112        for request in request_iterator:
113            self._control.control()
114            yield request
115        self._control.control()
116
117
118class _MethodHandler(grpc.RpcMethodHandler):
119
120    def __init__(self, request_streaming, response_streaming,
121                 request_deserializer, response_serializer, unary_unary,
122                 unary_stream, stream_unary, stream_stream):
123        self.request_streaming = request_streaming
124        self.response_streaming = response_streaming
125        self.request_deserializer = request_deserializer
126        self.response_serializer = response_serializer
127        self.unary_unary = unary_unary
128        self.unary_stream = unary_stream
129        self.stream_unary = stream_unary
130        self.stream_stream = stream_stream
131
132
133class _GenericHandler(grpc.GenericRpcHandler):
134
135    def __init__(self, handler):
136        self._handler = handler
137
138    def service(self, handler_call_details):
139        if handler_call_details.method == _UNARY_UNARY:
140            return _MethodHandler(False, False, None, None,
141                                  self._handler.handle_unary_unary, None, None,
142                                  None)
143        elif handler_call_details.method == _UNARY_STREAM:
144            return _MethodHandler(False, True, _DESERIALIZE_REQUEST,
145                                  _SERIALIZE_RESPONSE, None,
146                                  self._handler.handle_unary_stream, None, None)
147        elif handler_call_details.method == _STREAM_UNARY:
148            return _MethodHandler(True, False, _DESERIALIZE_REQUEST,
149                                  _SERIALIZE_RESPONSE, None, None,
150                                  self._handler.handle_stream_unary, None)
151        elif handler_call_details.method == _STREAM_STREAM:
152            return _MethodHandler(True, True, None, None, None, None, None,
153                                  self._handler.handle_stream_stream)
154        else:
155            return None
156
157
158def _unary_unary_multi_callable(channel):
159    return channel.unary_unary(_UNARY_UNARY)
160
161
162def _unary_stream_multi_callable(channel):
163    return channel.unary_stream(
164        _UNARY_STREAM,
165        request_serializer=_SERIALIZE_REQUEST,
166        response_deserializer=_DESERIALIZE_RESPONSE)
167
168
169def _stream_unary_multi_callable(channel):
170    return channel.stream_unary(
171        _STREAM_UNARY,
172        request_serializer=_SERIALIZE_REQUEST,
173        response_deserializer=_DESERIALIZE_RESPONSE)
174
175
176def _stream_stream_multi_callable(channel):
177    return channel.stream_stream(_STREAM_STREAM)
178
179
180class RPCTest(unittest.TestCase):
181
182    def setUp(self):
183        self._control = test_control.PauseFailControl()
184        self._handler = _Handler(self._control)
185
186        self._server = test_common.test_server()
187        port = self._server.add_insecure_port('[::]:0')
188        self._server.add_generic_rpc_handlers((_GenericHandler(self._handler),))
189        self._server.start()
190
191        self._channel = grpc.insecure_channel('localhost:%d' % port)
192
193    def tearDown(self):
194        self._server.stop(None)
195
196    def testUnrecognizedMethod(self):
197        request = b'abc'
198
199        with self.assertRaises(grpc.RpcError) as exception_context:
200            self._channel.unary_unary('NoSuchMethod')(request)
201
202        self.assertEqual(grpc.StatusCode.UNIMPLEMENTED,
203                         exception_context.exception.code())
204
205    def testSuccessfulUnaryRequestBlockingUnaryResponse(self):
206        request = b'\x07\x08'
207        expected_response = self._handler.handle_unary_unary(request, None)
208
209        multi_callable = _unary_unary_multi_callable(self._channel)
210        response = multi_callable(
211            request,
212            metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),))
213
214        self.assertEqual(expected_response, response)
215
216    def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self):
217        request = b'\x07\x08'
218        expected_response = self._handler.handle_unary_unary(request, None)
219
220        multi_callable = _unary_unary_multi_callable(self._channel)
221        response, call = multi_callable.with_call(
222            request,
223            metadata=(('test',
224                       'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
225
226        self.assertEqual(expected_response, response)
227        self.assertIs(grpc.StatusCode.OK, call.code())
228        self.assertEqual("", call.debug_error_string())
229
230    def testSuccessfulUnaryRequestFutureUnaryResponse(self):
231        request = b'\x07\x08'
232        expected_response = self._handler.handle_unary_unary(request, None)
233
234        multi_callable = _unary_unary_multi_callable(self._channel)
235        response_future = multi_callable.future(
236            request,
237            metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),))
238        response = response_future.result()
239
240        self.assertIsInstance(response_future, grpc.Future)
241        self.assertIsInstance(response_future, grpc.Call)
242        self.assertEqual(expected_response, response)
243        self.assertIsNone(response_future.exception())
244        self.assertIsNone(response_future.traceback())
245
246    def testSuccessfulUnaryRequestStreamResponse(self):
247        request = b'\x37\x58'
248        expected_responses = tuple(
249            self._handler.handle_unary_stream(request, None))
250
251        multi_callable = _unary_stream_multi_callable(self._channel)
252        response_iterator = multi_callable(
253            request,
254            metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),))
255        responses = tuple(response_iterator)
256
257        self.assertSequenceEqual(expected_responses, responses)
258
259    def testSuccessfulStreamRequestBlockingUnaryResponse(self):
260        requests = tuple(
261            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
262        expected_response = self._handler.handle_stream_unary(
263            iter(requests), None)
264        request_iterator = iter(requests)
265
266        multi_callable = _stream_unary_multi_callable(self._channel)
267        response = multi_callable(
268            request_iterator,
269            metadata=(('test',
270                       'SuccessfulStreamRequestBlockingUnaryResponse'),))
271
272        self.assertEqual(expected_response, response)
273
274    def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self):
275        requests = tuple(
276            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
277        expected_response = self._handler.handle_stream_unary(
278            iter(requests), None)
279        request_iterator = iter(requests)
280
281        multi_callable = _stream_unary_multi_callable(self._channel)
282        response, call = multi_callable.with_call(
283            request_iterator,
284            metadata=(
285                ('test',
286                 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),))
287
288        self.assertEqual(expected_response, response)
289        self.assertIs(grpc.StatusCode.OK, call.code())
290
291    def testSuccessfulStreamRequestFutureUnaryResponse(self):
292        requests = tuple(
293            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
294        expected_response = self._handler.handle_stream_unary(
295            iter(requests), None)
296        request_iterator = iter(requests)
297
298        multi_callable = _stream_unary_multi_callable(self._channel)
299        response_future = multi_callable.future(
300            request_iterator,
301            metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),))
302        response = response_future.result()
303
304        self.assertEqual(expected_response, response)
305        self.assertIsNone(response_future.exception())
306        self.assertIsNone(response_future.traceback())
307
308    def testSuccessfulStreamRequestStreamResponse(self):
309        requests = tuple(
310            b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
311        expected_responses = tuple(
312            self._handler.handle_stream_stream(iter(requests), None))
313        request_iterator = iter(requests)
314
315        multi_callable = _stream_stream_multi_callable(self._channel)
316        response_iterator = multi_callable(
317            request_iterator,
318            metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),))
319        responses = tuple(response_iterator)
320
321        self.assertSequenceEqual(expected_responses, responses)
322
323    def testSequentialInvocations(self):
324        first_request = b'\x07\x08'
325        second_request = b'\x0809'
326        expected_first_response = self._handler.handle_unary_unary(
327            first_request, None)
328        expected_second_response = self._handler.handle_unary_unary(
329            second_request, None)
330
331        multi_callable = _unary_unary_multi_callable(self._channel)
332        first_response = multi_callable(
333            first_request, metadata=(('test', 'SequentialInvocations'),))
334        second_response = multi_callable(
335            second_request, metadata=(('test', 'SequentialInvocations'),))
336
337        self.assertEqual(expected_first_response, first_response)
338        self.assertEqual(expected_second_response, second_response)
339
340    def testConcurrentBlockingInvocations(self):
341        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
342        requests = tuple(
343            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
344        expected_response = self._handler.handle_stream_unary(
345            iter(requests), None)
346        expected_responses = [expected_response
347                             ] * test_constants.THREAD_CONCURRENCY
348        response_futures = [None] * test_constants.THREAD_CONCURRENCY
349
350        multi_callable = _stream_unary_multi_callable(self._channel)
351        for index in range(test_constants.THREAD_CONCURRENCY):
352            request_iterator = iter(requests)
353            response_future = pool.submit(
354                multi_callable,
355                request_iterator,
356                metadata=(('test', 'ConcurrentBlockingInvocations'),))
357            response_futures[index] = response_future
358        responses = tuple(
359            response_future.result() for response_future in response_futures)
360
361        pool.shutdown(wait=True)
362        self.assertSequenceEqual(expected_responses, responses)
363
364    def testConcurrentFutureInvocations(self):
365        requests = tuple(
366            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
367        expected_response = self._handler.handle_stream_unary(
368            iter(requests), None)
369        expected_responses = [expected_response
370                             ] * test_constants.THREAD_CONCURRENCY
371        response_futures = [None] * test_constants.THREAD_CONCURRENCY
372
373        multi_callable = _stream_unary_multi_callable(self._channel)
374        for index in range(test_constants.THREAD_CONCURRENCY):
375            request_iterator = iter(requests)
376            response_future = multi_callable.future(
377                request_iterator,
378                metadata=(('test', 'ConcurrentFutureInvocations'),))
379            response_futures[index] = response_future
380        responses = tuple(
381            response_future.result() for response_future in response_futures)
382
383        self.assertSequenceEqual(expected_responses, responses)
384
385    def testWaitingForSomeButNotAllConcurrentFutureInvocations(self):
386        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
387        request = b'\x67\x68'
388        expected_response = self._handler.handle_unary_unary(request, None)
389        response_futures = [None] * test_constants.THREAD_CONCURRENCY
390        lock = threading.Lock()
391        test_is_running_cell = [True]
392
393        def wrap_future(future):
394
395            def wrap():
396                try:
397                    return future.result()
398                except grpc.RpcError:
399                    with lock:
400                        if test_is_running_cell[0]:
401                            raise
402                    return None
403
404            return wrap
405
406        multi_callable = _unary_unary_multi_callable(self._channel)
407        for index in range(test_constants.THREAD_CONCURRENCY):
408            inner_response_future = multi_callable.future(
409                request,
410                metadata=(
411                    ('test',
412                     'WaitingForSomeButNotAllConcurrentFutureInvocations'),))
413            outer_response_future = pool.submit(
414                wrap_future(inner_response_future))
415            response_futures[index] = outer_response_future
416
417        some_completed_response_futures_iterator = itertools.islice(
418            futures.as_completed(response_futures),
419            test_constants.THREAD_CONCURRENCY // 2)
420        for response_future in some_completed_response_futures_iterator:
421            self.assertEqual(expected_response, response_future.result())
422        with lock:
423            test_is_running_cell[0] = False
424
425    def testConsumingOneStreamResponseUnaryRequest(self):
426        request = b'\x57\x38'
427
428        multi_callable = _unary_stream_multi_callable(self._channel)
429        response_iterator = multi_callable(
430            request,
431            metadata=(('test', 'ConsumingOneStreamResponseUnaryRequest'),))
432        next(response_iterator)
433
434    def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self):
435        request = b'\x57\x38'
436
437        multi_callable = _unary_stream_multi_callable(self._channel)
438        response_iterator = multi_callable(
439            request,
440            metadata=(('test',
441                       'ConsumingSomeButNotAllStreamResponsesUnaryRequest'),))
442        for _ in range(test_constants.STREAM_LENGTH // 2):
443            next(response_iterator)
444
445    def testConsumingSomeButNotAllStreamResponsesStreamRequest(self):
446        requests = tuple(
447            b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH))
448        request_iterator = iter(requests)
449
450        multi_callable = _stream_stream_multi_callable(self._channel)
451        response_iterator = multi_callable(
452            request_iterator,
453            metadata=(('test',
454                       'ConsumingSomeButNotAllStreamResponsesStreamRequest'),))
455        for _ in range(test_constants.STREAM_LENGTH // 2):
456            next(response_iterator)
457
458    def testConsumingTooManyStreamResponsesStreamRequest(self):
459        requests = tuple(
460            b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH))
461        request_iterator = iter(requests)
462
463        multi_callable = _stream_stream_multi_callable(self._channel)
464        response_iterator = multi_callable(
465            request_iterator,
466            metadata=(('test',
467                       'ConsumingTooManyStreamResponsesStreamRequest'),))
468        for _ in range(test_constants.STREAM_LENGTH):
469            next(response_iterator)
470        for _ in range(test_constants.STREAM_LENGTH):
471            with self.assertRaises(StopIteration):
472                next(response_iterator)
473
474        self.assertIsNotNone(response_iterator.initial_metadata())
475        self.assertIs(grpc.StatusCode.OK, response_iterator.code())
476        self.assertIsNotNone(response_iterator.details())
477        self.assertIsNotNone(response_iterator.trailing_metadata())
478
479    def testCancelledUnaryRequestUnaryResponse(self):
480        request = b'\x07\x17'
481
482        multi_callable = _unary_unary_multi_callable(self._channel)
483        with self._control.pause():
484            response_future = multi_callable.future(
485                request,
486                metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),))
487            response_future.cancel()
488
489        self.assertTrue(response_future.cancelled())
490        with self.assertRaises(grpc.FutureCancelledError):
491            response_future.result()
492        with self.assertRaises(grpc.FutureCancelledError):
493            response_future.exception()
494        with self.assertRaises(grpc.FutureCancelledError):
495            response_future.traceback()
496        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
497
498    def testCancelledUnaryRequestStreamResponse(self):
499        request = b'\x07\x19'
500
501        multi_callable = _unary_stream_multi_callable(self._channel)
502        with self._control.pause():
503            response_iterator = multi_callable(
504                request,
505                metadata=(('test', 'CancelledUnaryRequestStreamResponse'),))
506            self._control.block_until_paused()
507            response_iterator.cancel()
508
509        with self.assertRaises(grpc.RpcError) as exception_context:
510            next(response_iterator)
511        self.assertIs(grpc.StatusCode.CANCELLED,
512                      exception_context.exception.code())
513        self.assertIsNotNone(response_iterator.initial_metadata())
514        self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code())
515        self.assertIsNotNone(response_iterator.details())
516        self.assertIsNotNone(response_iterator.trailing_metadata())
517
518    def testCancelledStreamRequestUnaryResponse(self):
519        requests = tuple(
520            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
521        request_iterator = iter(requests)
522
523        multi_callable = _stream_unary_multi_callable(self._channel)
524        with self._control.pause():
525            response_future = multi_callable.future(
526                request_iterator,
527                metadata=(('test', 'CancelledStreamRequestUnaryResponse'),))
528            self._control.block_until_paused()
529            response_future.cancel()
530
531        self.assertTrue(response_future.cancelled())
532        with self.assertRaises(grpc.FutureCancelledError):
533            response_future.result()
534        with self.assertRaises(grpc.FutureCancelledError):
535            response_future.exception()
536        with self.assertRaises(grpc.FutureCancelledError):
537            response_future.traceback()
538        self.assertIsNotNone(response_future.initial_metadata())
539        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
540        self.assertIsNotNone(response_future.details())
541        self.assertIsNotNone(response_future.trailing_metadata())
542
543    def testCancelledStreamRequestStreamResponse(self):
544        requests = tuple(
545            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
546        request_iterator = iter(requests)
547
548        multi_callable = _stream_stream_multi_callable(self._channel)
549        with self._control.pause():
550            response_iterator = multi_callable(
551                request_iterator,
552                metadata=(('test', 'CancelledStreamRequestStreamResponse'),))
553            response_iterator.cancel()
554
555        with self.assertRaises(grpc.RpcError):
556            next(response_iterator)
557        self.assertIsNotNone(response_iterator.initial_metadata())
558        self.assertIs(grpc.StatusCode.CANCELLED, response_iterator.code())
559        self.assertIsNotNone(response_iterator.details())
560        self.assertIsNotNone(response_iterator.trailing_metadata())
561
562    def testExpiredUnaryRequestBlockingUnaryResponse(self):
563        request = b'\x07\x17'
564
565        multi_callable = _unary_unary_multi_callable(self._channel)
566        with self._control.pause():
567            with self.assertRaises(grpc.RpcError) as exception_context:
568                multi_callable.with_call(
569                    request,
570                    timeout=test_constants.SHORT_TIMEOUT,
571                    metadata=(('test',
572                               'ExpiredUnaryRequestBlockingUnaryResponse'),))
573
574        self.assertIsInstance(exception_context.exception, grpc.Call)
575        self.assertIsNotNone(exception_context.exception.initial_metadata())
576        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
577                      exception_context.exception.code())
578        self.assertIsNotNone(exception_context.exception.details())
579        self.assertIsNotNone(exception_context.exception.trailing_metadata())
580
581    def testExpiredUnaryRequestFutureUnaryResponse(self):
582        request = b'\x07\x17'
583        callback = _Callback()
584
585        multi_callable = _unary_unary_multi_callable(self._channel)
586        with self._control.pause():
587            response_future = multi_callable.future(
588                request,
589                timeout=test_constants.SHORT_TIMEOUT,
590                metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),))
591            response_future.add_done_callback(callback)
592            value_passed_to_callback = callback.value()
593
594        self.assertIs(response_future, value_passed_to_callback)
595        self.assertIsNotNone(response_future.initial_metadata())
596        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
597        self.assertIsNotNone(response_future.details())
598        self.assertIsNotNone(response_future.trailing_metadata())
599        with self.assertRaises(grpc.RpcError) as exception_context:
600            response_future.result()
601        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
602                      exception_context.exception.code())
603        self.assertIsInstance(response_future.exception(), grpc.RpcError)
604        self.assertIsNotNone(response_future.traceback())
605        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
606                      response_future.exception().code())
607
608    def testExpiredUnaryRequestStreamResponse(self):
609        request = b'\x07\x19'
610
611        multi_callable = _unary_stream_multi_callable(self._channel)
612        with self._control.pause():
613            with self.assertRaises(grpc.RpcError) as exception_context:
614                response_iterator = multi_callable(
615                    request,
616                    timeout=test_constants.SHORT_TIMEOUT,
617                    metadata=(('test', 'ExpiredUnaryRequestStreamResponse'),))
618                next(response_iterator)
619
620        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
621                      exception_context.exception.code())
622        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
623                      response_iterator.code())
624
625    def testExpiredStreamRequestBlockingUnaryResponse(self):
626        requests = tuple(
627            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
628        request_iterator = iter(requests)
629
630        multi_callable = _stream_unary_multi_callable(self._channel)
631        with self._control.pause():
632            with self.assertRaises(grpc.RpcError) as exception_context:
633                multi_callable(
634                    request_iterator,
635                    timeout=test_constants.SHORT_TIMEOUT,
636                    metadata=(('test',
637                               'ExpiredStreamRequestBlockingUnaryResponse'),))
638
639        self.assertIsInstance(exception_context.exception, grpc.RpcError)
640        self.assertIsInstance(exception_context.exception, grpc.Call)
641        self.assertIsNotNone(exception_context.exception.initial_metadata())
642        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
643                      exception_context.exception.code())
644        self.assertIsNotNone(exception_context.exception.details())
645        self.assertIsNotNone(exception_context.exception.trailing_metadata())
646
647    def testExpiredStreamRequestFutureUnaryResponse(self):
648        requests = tuple(
649            b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH))
650        request_iterator = iter(requests)
651        callback = _Callback()
652
653        multi_callable = _stream_unary_multi_callable(self._channel)
654        with self._control.pause():
655            response_future = multi_callable.future(
656                request_iterator,
657                timeout=test_constants.SHORT_TIMEOUT,
658                metadata=(('test', 'ExpiredStreamRequestFutureUnaryResponse'),))
659            with self.assertRaises(grpc.FutureTimeoutError):
660                response_future.result(
661                    timeout=test_constants.SHORT_TIMEOUT / 2.0)
662            response_future.add_done_callback(callback)
663            value_passed_to_callback = callback.value()
664
665        with self.assertRaises(grpc.RpcError) as exception_context:
666            response_future.result()
667        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
668        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
669                      exception_context.exception.code())
670        self.assertIsInstance(response_future.exception(), grpc.RpcError)
671        self.assertIsNotNone(response_future.traceback())
672        self.assertIs(response_future, value_passed_to_callback)
673        self.assertIsNotNone(response_future.initial_metadata())
674        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
675        self.assertIsNotNone(response_future.details())
676        self.assertIsNotNone(response_future.trailing_metadata())
677
678    def testExpiredStreamRequestStreamResponse(self):
679        requests = tuple(
680            b'\x67\x18' for _ in range(test_constants.STREAM_LENGTH))
681        request_iterator = iter(requests)
682
683        multi_callable = _stream_stream_multi_callable(self._channel)
684        with self._control.pause():
685            with self.assertRaises(grpc.RpcError) as exception_context:
686                response_iterator = multi_callable(
687                    request_iterator,
688                    timeout=test_constants.SHORT_TIMEOUT,
689                    metadata=(('test', 'ExpiredStreamRequestStreamResponse'),))
690                next(response_iterator)
691
692        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
693                      exception_context.exception.code())
694        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
695                      response_iterator.code())
696
697    def testFailedUnaryRequestBlockingUnaryResponse(self):
698        request = b'\x37\x17'
699
700        multi_callable = _unary_unary_multi_callable(self._channel)
701        with self._control.fail():
702            with self.assertRaises(grpc.RpcError) as exception_context:
703                multi_callable.with_call(
704                    request,
705                    metadata=(('test',
706                               'FailedUnaryRequestBlockingUnaryResponse'),))
707
708        self.assertIs(grpc.StatusCode.UNKNOWN,
709                      exception_context.exception.code())
710        # sanity checks on to make sure returned string contains default members
711        # of the error
712        debug_error_string = exception_context.exception.debug_error_string()
713        self.assertIn("created", debug_error_string)
714        self.assertIn("description", debug_error_string)
715        self.assertIn("file", debug_error_string)
716        self.assertIn("file_line", debug_error_string)
717
718    def testFailedUnaryRequestFutureUnaryResponse(self):
719        request = b'\x37\x17'
720        callback = _Callback()
721
722        multi_callable = _unary_unary_multi_callable(self._channel)
723        with self._control.fail():
724            response_future = multi_callable.future(
725                request,
726                metadata=(('test', 'FailedUnaryRequestFutureUnaryResponse'),))
727            response_future.add_done_callback(callback)
728            value_passed_to_callback = callback.value()
729
730        self.assertIsInstance(response_future, grpc.Future)
731        self.assertIsInstance(response_future, grpc.Call)
732        with self.assertRaises(grpc.RpcError) as exception_context:
733            response_future.result()
734        self.assertIs(grpc.StatusCode.UNKNOWN,
735                      exception_context.exception.code())
736        self.assertIsInstance(response_future.exception(), grpc.RpcError)
737        self.assertIsNotNone(response_future.traceback())
738        self.assertIs(grpc.StatusCode.UNKNOWN,
739                      response_future.exception().code())
740        self.assertIs(response_future, value_passed_to_callback)
741
742    def testFailedUnaryRequestStreamResponse(self):
743        request = b'\x37\x17'
744
745        multi_callable = _unary_stream_multi_callable(self._channel)
746        with self.assertRaises(grpc.RpcError) as exception_context:
747            with self._control.fail():
748                response_iterator = multi_callable(
749                    request,
750                    metadata=(('test', 'FailedUnaryRequestStreamResponse'),))
751                next(response_iterator)
752
753        self.assertIs(grpc.StatusCode.UNKNOWN,
754                      exception_context.exception.code())
755
756    def testFailedStreamRequestBlockingUnaryResponse(self):
757        requests = tuple(
758            b'\x47\x58' for _ in range(test_constants.STREAM_LENGTH))
759        request_iterator = iter(requests)
760
761        multi_callable = _stream_unary_multi_callable(self._channel)
762        with self._control.fail():
763            with self.assertRaises(grpc.RpcError) as exception_context:
764                multi_callable(
765                    request_iterator,
766                    metadata=(('test',
767                               'FailedStreamRequestBlockingUnaryResponse'),))
768
769        self.assertIs(grpc.StatusCode.UNKNOWN,
770                      exception_context.exception.code())
771
772    def testFailedStreamRequestFutureUnaryResponse(self):
773        requests = tuple(
774            b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH))
775        request_iterator = iter(requests)
776        callback = _Callback()
777
778        multi_callable = _stream_unary_multi_callable(self._channel)
779        with self._control.fail():
780            response_future = multi_callable.future(
781                request_iterator,
782                metadata=(('test', 'FailedStreamRequestFutureUnaryResponse'),))
783            response_future.add_done_callback(callback)
784            value_passed_to_callback = callback.value()
785
786        with self.assertRaises(grpc.RpcError) as exception_context:
787            response_future.result()
788        self.assertIs(grpc.StatusCode.UNKNOWN, response_future.code())
789        self.assertIs(grpc.StatusCode.UNKNOWN,
790                      exception_context.exception.code())
791        self.assertIsInstance(response_future.exception(), grpc.RpcError)
792        self.assertIsNotNone(response_future.traceback())
793        self.assertIs(response_future, value_passed_to_callback)
794
795    def testFailedStreamRequestStreamResponse(self):
796        requests = tuple(
797            b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH))
798        request_iterator = iter(requests)
799
800        multi_callable = _stream_stream_multi_callable(self._channel)
801        with self._control.fail():
802            with self.assertRaises(grpc.RpcError) as exception_context:
803                response_iterator = multi_callable(
804                    request_iterator,
805                    metadata=(('test', 'FailedStreamRequestStreamResponse'),))
806                tuple(response_iterator)
807
808        self.assertIs(grpc.StatusCode.UNKNOWN,
809                      exception_context.exception.code())
810        self.assertIs(grpc.StatusCode.UNKNOWN, response_iterator.code())
811
812    def testIgnoredUnaryRequestFutureUnaryResponse(self):
813        request = b'\x37\x17'
814
815        multi_callable = _unary_unary_multi_callable(self._channel)
816        multi_callable.future(
817            request,
818            metadata=(('test', 'IgnoredUnaryRequestFutureUnaryResponse'),))
819
820    def testIgnoredUnaryRequestStreamResponse(self):
821        request = b'\x37\x17'
822
823        multi_callable = _unary_stream_multi_callable(self._channel)
824        multi_callable(
825            request, metadata=(('test', 'IgnoredUnaryRequestStreamResponse'),))
826
827    def testIgnoredStreamRequestFutureUnaryResponse(self):
828        requests = tuple(
829            b'\x07\x18' for _ in range(test_constants.STREAM_LENGTH))
830        request_iterator = iter(requests)
831
832        multi_callable = _stream_unary_multi_callable(self._channel)
833        multi_callable.future(
834            request_iterator,
835            metadata=(('test', 'IgnoredStreamRequestFutureUnaryResponse'),))
836
837    def testIgnoredStreamRequestStreamResponse(self):
838        requests = tuple(
839            b'\x67\x88' for _ in range(test_constants.STREAM_LENGTH))
840        request_iterator = iter(requests)
841
842        multi_callable = _stream_stream_multi_callable(self._channel)
843        multi_callable(
844            request_iterator,
845            metadata=(('test', 'IgnoredStreamRequestStreamResponse'),))
846
847
848if __name__ == '__main__':
849    unittest.main(verbosity=2)
850