• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Test of RPCs made against gRPC Python's application-layer API."""
15
16import itertools
17import threading
18import unittest
19import logging
20from concurrent import futures
21
22import grpc
23from grpc.framework.foundation import logging_pool
24
25from tests.unit._rpc_test_helpers import (
26    TIMEOUT_SHORT, Callback, unary_unary_multi_callable,
27    unary_stream_multi_callable, unary_stream_non_blocking_multi_callable,
28    stream_unary_multi_callable, stream_stream_multi_callable,
29    stream_stream_non_blocking_multi_callable, BaseRPCTest)
30from tests.unit.framework.common import test_constants
31
32
33class RPCPart2Test(BaseRPCTest, unittest.TestCase):
34
35    def testDefaultThreadPoolIsUsed(self):
36        self._consume_one_stream_response_unary_request(
37            unary_stream_multi_callable(self._channel))
38        self.assertFalse(self._thread_pool.was_used())
39
40    def testExperimentalThreadPoolIsUsed(self):
41        self._consume_one_stream_response_unary_request(
42            unary_stream_non_blocking_multi_callable(self._channel))
43        self.assertTrue(self._thread_pool.was_used())
44
45    def testUnrecognizedMethod(self):
46        request = b'abc'
47
48        with self.assertRaises(grpc.RpcError) as exception_context:
49            self._channel.unary_unary('NoSuchMethod')(request)
50
51        self.assertEqual(grpc.StatusCode.UNIMPLEMENTED,
52                         exception_context.exception.code())
53
54    def testSuccessfulUnaryRequestBlockingUnaryResponse(self):
55        request = b'\x07\x08'
56        expected_response = self._handler.handle_unary_unary(request, None)
57
58        multi_callable = unary_unary_multi_callable(self._channel)
59        response = multi_callable(
60            request,
61            metadata=(('test', 'SuccessfulUnaryRequestBlockingUnaryResponse'),))
62
63        self.assertEqual(expected_response, response)
64
65    def testSuccessfulUnaryRequestBlockingUnaryResponseWithCall(self):
66        request = b'\x07\x08'
67        expected_response = self._handler.handle_unary_unary(request, None)
68
69        multi_callable = unary_unary_multi_callable(self._channel)
70        response, call = multi_callable.with_call(
71            request,
72            metadata=(('test',
73                       'SuccessfulUnaryRequestBlockingUnaryResponseWithCall'),))
74
75        self.assertEqual(expected_response, response)
76        self.assertIs(grpc.StatusCode.OK, call.code())
77        self.assertEqual('', call.debug_error_string())
78
79    def testSuccessfulUnaryRequestFutureUnaryResponse(self):
80        request = b'\x07\x08'
81        expected_response = self._handler.handle_unary_unary(request, None)
82
83        multi_callable = unary_unary_multi_callable(self._channel)
84        response_future = multi_callable.future(
85            request,
86            metadata=(('test', 'SuccessfulUnaryRequestFutureUnaryResponse'),))
87        response = response_future.result()
88
89        self.assertIsInstance(response_future, grpc.Future)
90        self.assertIsInstance(response_future, grpc.Call)
91        self.assertEqual(expected_response, response)
92        self.assertIsNone(response_future.exception())
93        self.assertIsNone(response_future.traceback())
94
95    def testSuccessfulUnaryRequestStreamResponse(self):
96        request = b'\x37\x58'
97        expected_responses = tuple(
98            self._handler.handle_unary_stream(request, None))
99
100        multi_callable = unary_stream_multi_callable(self._channel)
101        response_iterator = multi_callable(
102            request,
103            metadata=(('test', 'SuccessfulUnaryRequestStreamResponse'),))
104        responses = tuple(response_iterator)
105
106        self.assertSequenceEqual(expected_responses, responses)
107
108    def testSuccessfulStreamRequestBlockingUnaryResponse(self):
109        requests = tuple(
110            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
111        expected_response = self._handler.handle_stream_unary(
112            iter(requests), None)
113        request_iterator = iter(requests)
114
115        multi_callable = stream_unary_multi_callable(self._channel)
116        response = multi_callable(
117            request_iterator,
118            metadata=(('test',
119                       'SuccessfulStreamRequestBlockingUnaryResponse'),))
120
121        self.assertEqual(expected_response, response)
122
123    def testSuccessfulStreamRequestBlockingUnaryResponseWithCall(self):
124        requests = tuple(
125            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
126        expected_response = self._handler.handle_stream_unary(
127            iter(requests), None)
128        request_iterator = iter(requests)
129
130        multi_callable = stream_unary_multi_callable(self._channel)
131        response, call = multi_callable.with_call(
132            request_iterator,
133            metadata=(
134                ('test',
135                 'SuccessfulStreamRequestBlockingUnaryResponseWithCall'),))
136
137        self.assertEqual(expected_response, response)
138        self.assertIs(grpc.StatusCode.OK, call.code())
139
140    def testSuccessfulStreamRequestFutureUnaryResponse(self):
141        requests = tuple(
142            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
143        expected_response = self._handler.handle_stream_unary(
144            iter(requests), None)
145        request_iterator = iter(requests)
146
147        multi_callable = stream_unary_multi_callable(self._channel)
148        response_future = multi_callable.future(
149            request_iterator,
150            metadata=(('test', 'SuccessfulStreamRequestFutureUnaryResponse'),))
151        response = response_future.result()
152
153        self.assertEqual(expected_response, response)
154        self.assertIsNone(response_future.exception())
155        self.assertIsNone(response_future.traceback())
156
157    def testSuccessfulStreamRequestStreamResponse(self):
158        requests = tuple(
159            b'\x77\x58' for _ in range(test_constants.STREAM_LENGTH))
160
161        expected_responses = tuple(
162            self._handler.handle_stream_stream(iter(requests), None))
163        request_iterator = iter(requests)
164
165        multi_callable = stream_stream_multi_callable(self._channel)
166        response_iterator = multi_callable(
167            request_iterator,
168            metadata=(('test', 'SuccessfulStreamRequestStreamResponse'),))
169        responses = tuple(response_iterator)
170
171        self.assertSequenceEqual(expected_responses, responses)
172
173    def testSequentialInvocations(self):
174        first_request = b'\x07\x08'
175        second_request = b'\x0809'
176        expected_first_response = self._handler.handle_unary_unary(
177            first_request, None)
178        expected_second_response = self._handler.handle_unary_unary(
179            second_request, None)
180
181        multi_callable = unary_unary_multi_callable(self._channel)
182        first_response = multi_callable(first_request,
183                                        metadata=(('test',
184                                                   'SequentialInvocations'),))
185        second_response = multi_callable(second_request,
186                                         metadata=(('test',
187                                                    'SequentialInvocations'),))
188
189        self.assertEqual(expected_first_response, first_response)
190        self.assertEqual(expected_second_response, second_response)
191
192    def testConcurrentBlockingInvocations(self):
193        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
194        requests = tuple(
195            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
196        expected_response = self._handler.handle_stream_unary(
197            iter(requests), None)
198        expected_responses = [expected_response
199                             ] * test_constants.THREAD_CONCURRENCY
200        response_futures = [None] * test_constants.THREAD_CONCURRENCY
201
202        multi_callable = stream_unary_multi_callable(self._channel)
203        for index in range(test_constants.THREAD_CONCURRENCY):
204            request_iterator = iter(requests)
205            response_future = pool.submit(
206                multi_callable,
207                request_iterator,
208                metadata=(('test', 'ConcurrentBlockingInvocations'),))
209            response_futures[index] = response_future
210        responses = tuple(
211            response_future.result() for response_future in response_futures)
212
213        pool.shutdown(wait=True)
214        self.assertSequenceEqual(expected_responses, responses)
215
216    def testConcurrentFutureInvocations(self):
217        requests = tuple(
218            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
219        expected_response = self._handler.handle_stream_unary(
220            iter(requests), None)
221        expected_responses = [expected_response
222                             ] * test_constants.THREAD_CONCURRENCY
223        response_futures = [None] * test_constants.THREAD_CONCURRENCY
224
225        multi_callable = stream_unary_multi_callable(self._channel)
226        for index in range(test_constants.THREAD_CONCURRENCY):
227            request_iterator = iter(requests)
228            response_future = multi_callable.future(
229                request_iterator,
230                metadata=(('test', 'ConcurrentFutureInvocations'),))
231            response_futures[index] = response_future
232        responses = tuple(
233            response_future.result() for response_future in response_futures)
234
235        self.assertSequenceEqual(expected_responses, responses)
236
237    def testWaitingForSomeButNotAllConcurrentFutureInvocations(self):
238        pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
239        request = b'\x67\x68'
240        expected_response = self._handler.handle_unary_unary(request, None)
241        response_futures = [None] * test_constants.THREAD_CONCURRENCY
242        lock = threading.Lock()
243        test_is_running_cell = [True]
244
245        def wrap_future(future):
246
247            def wrap():
248                try:
249                    return future.result()
250                except grpc.RpcError:
251                    with lock:
252                        if test_is_running_cell[0]:
253                            raise
254                    return None
255
256            return wrap
257
258        multi_callable = unary_unary_multi_callable(self._channel)
259        for index in range(test_constants.THREAD_CONCURRENCY):
260            inner_response_future = multi_callable.future(
261                request,
262                metadata=(
263                    ('test',
264                     'WaitingForSomeButNotAllConcurrentFutureInvocations'),))
265            outer_response_future = pool.submit(
266                wrap_future(inner_response_future))
267            response_futures[index] = outer_response_future
268
269        some_completed_response_futures_iterator = itertools.islice(
270            futures.as_completed(response_futures),
271            test_constants.THREAD_CONCURRENCY // 2)
272        for response_future in some_completed_response_futures_iterator:
273            self.assertEqual(expected_response, response_future.result())
274        with lock:
275            test_is_running_cell[0] = False
276
277    def testConsumingOneStreamResponseUnaryRequest(self):
278        self._consume_one_stream_response_unary_request(
279            unary_stream_multi_callable(self._channel))
280
281    def testConsumingOneStreamResponseUnaryRequestNonBlocking(self):
282        self._consume_one_stream_response_unary_request(
283            unary_stream_non_blocking_multi_callable(self._channel))
284
285    def testConsumingSomeButNotAllStreamResponsesUnaryRequest(self):
286        self._consume_some_but_not_all_stream_responses_unary_request(
287            unary_stream_multi_callable(self._channel))
288
289    def testConsumingSomeButNotAllStreamResponsesUnaryRequestNonBlocking(self):
290        self._consume_some_but_not_all_stream_responses_unary_request(
291            unary_stream_non_blocking_multi_callable(self._channel))
292
293    def testConsumingSomeButNotAllStreamResponsesStreamRequest(self):
294        self._consume_some_but_not_all_stream_responses_stream_request(
295            stream_stream_multi_callable(self._channel))
296
297    def testConsumingSomeButNotAllStreamResponsesStreamRequestNonBlocking(self):
298        self._consume_some_but_not_all_stream_responses_stream_request(
299            stream_stream_non_blocking_multi_callable(self._channel))
300
301    def testConsumingTooManyStreamResponsesStreamRequest(self):
302        self._consume_too_many_stream_responses_stream_request(
303            stream_stream_multi_callable(self._channel))
304
305    def testConsumingTooManyStreamResponsesStreamRequestNonBlocking(self):
306        self._consume_too_many_stream_responses_stream_request(
307            stream_stream_non_blocking_multi_callable(self._channel))
308
309    def testCancelledUnaryRequestUnaryResponse(self):
310        request = b'\x07\x17'
311
312        multi_callable = unary_unary_multi_callable(self._channel)
313        with self._control.pause():
314            response_future = multi_callable.future(
315                request,
316                metadata=(('test', 'CancelledUnaryRequestUnaryResponse'),))
317            response_future.cancel()
318
319        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
320        self.assertTrue(response_future.cancelled())
321        with self.assertRaises(grpc.FutureCancelledError):
322            response_future.result()
323        with self.assertRaises(grpc.FutureCancelledError):
324            response_future.exception()
325        with self.assertRaises(grpc.FutureCancelledError):
326            response_future.traceback()
327
328    def testCancelledUnaryRequestStreamResponse(self):
329        self._cancelled_unary_request_stream_response(
330            unary_stream_multi_callable(self._channel))
331
332    def testCancelledUnaryRequestStreamResponseNonBlocking(self):
333        self._cancelled_unary_request_stream_response(
334            unary_stream_non_blocking_multi_callable(self._channel))
335
336    def testCancelledStreamRequestUnaryResponse(self):
337        requests = tuple(
338            b'\x07\x08' for _ in range(test_constants.STREAM_LENGTH))
339        request_iterator = iter(requests)
340
341        multi_callable = stream_unary_multi_callable(self._channel)
342        with self._control.pause():
343            response_future = multi_callable.future(
344                request_iterator,
345                metadata=(('test', 'CancelledStreamRequestUnaryResponse'),))
346            self._control.block_until_paused()
347            response_future.cancel()
348
349        self.assertIs(grpc.StatusCode.CANCELLED, response_future.code())
350        self.assertTrue(response_future.cancelled())
351        with self.assertRaises(grpc.FutureCancelledError):
352            response_future.result()
353        with self.assertRaises(grpc.FutureCancelledError):
354            response_future.exception()
355        with self.assertRaises(grpc.FutureCancelledError):
356            response_future.traceback()
357        self.assertIsNotNone(response_future.initial_metadata())
358        self.assertIsNotNone(response_future.details())
359        self.assertIsNotNone(response_future.trailing_metadata())
360
361    def testCancelledStreamRequestStreamResponse(self):
362        self._cancelled_stream_request_stream_response(
363            stream_stream_multi_callable(self._channel))
364
365    def testCancelledStreamRequestStreamResponseNonBlocking(self):
366        self._cancelled_stream_request_stream_response(
367            stream_stream_non_blocking_multi_callable(self._channel))
368
369    def testExpiredUnaryRequestBlockingUnaryResponse(self):
370        request = b'\x07\x17'
371
372        multi_callable = unary_unary_multi_callable(self._channel)
373        with self._control.pause():
374            with self.assertRaises(grpc.RpcError) as exception_context:
375                multi_callable.with_call(
376                    request,
377                    timeout=TIMEOUT_SHORT,
378                    metadata=(('test',
379                               'ExpiredUnaryRequestBlockingUnaryResponse'),))
380
381        self.assertIsInstance(exception_context.exception, grpc.Call)
382        self.assertIsNotNone(exception_context.exception.initial_metadata())
383        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
384                      exception_context.exception.code())
385        self.assertIsNotNone(exception_context.exception.details())
386        self.assertIsNotNone(exception_context.exception.trailing_metadata())
387
388    def testExpiredUnaryRequestFutureUnaryResponse(self):
389        request = b'\x07\x17'
390        callback = Callback()
391
392        multi_callable = unary_unary_multi_callable(self._channel)
393        with self._control.pause():
394            response_future = multi_callable.future(
395                request,
396                timeout=TIMEOUT_SHORT,
397                metadata=(('test', 'ExpiredUnaryRequestFutureUnaryResponse'),))
398            response_future.add_done_callback(callback)
399            value_passed_to_callback = callback.value()
400
401        self.assertIs(response_future, value_passed_to_callback)
402        self.assertIsNotNone(response_future.initial_metadata())
403        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED, response_future.code())
404        self.assertIsNotNone(response_future.details())
405        self.assertIsNotNone(response_future.trailing_metadata())
406        with self.assertRaises(grpc.RpcError) as exception_context:
407            response_future.result()
408        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
409                      exception_context.exception.code())
410        self.assertIsInstance(response_future.exception(), grpc.RpcError)
411        self.assertIsNotNone(response_future.traceback())
412        self.assertIs(grpc.StatusCode.DEADLINE_EXCEEDED,
413                      response_future.exception().code())
414
415    def testExpiredUnaryRequestStreamResponse(self):
416        self._expired_unary_request_stream_response(
417            unary_stream_multi_callable(self._channel))
418
419    def testExpiredUnaryRequestStreamResponseNonBlocking(self):
420        self._expired_unary_request_stream_response(
421            unary_stream_non_blocking_multi_callable(self._channel))
422
423
424if __name__ == '__main__':
425    logging.basicConfig()
426    unittest.main(verbosity=2)
427