• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests behavior of the timeout mechanism on client side."""
15
16import asyncio
17import logging
18import platform
19import random
20import unittest
21import datetime
22
23import grpc
24from grpc.experimental import aio
25
26from tests_aio.unit._test_base import AioTestBase
27from tests_aio.unit import _common
28
29_SLEEP_TIME_UNIT_S = datetime.timedelta(seconds=1).total_seconds()
30
31_TEST_SLEEPY_UNARY_UNARY = '/test/Test/SleepyUnaryUnary'
32_TEST_SLEEPY_UNARY_STREAM = '/test/Test/SleepyUnaryStream'
33_TEST_SLEEPY_STREAM_UNARY = '/test/Test/SleepyStreamUnary'
34_TEST_SLEEPY_STREAM_STREAM = '/test/Test/SleepyStreamStream'
35
36_REQUEST = b'\x00\x00\x00'
37_RESPONSE = b'\x01\x01\x01'
38
39
40async def _test_sleepy_unary_unary(unused_request, unused_context):
41    await asyncio.sleep(_SLEEP_TIME_UNIT_S)
42    return _RESPONSE
43
44
45async def _test_sleepy_unary_stream(unused_request, unused_context):
46    yield _RESPONSE
47    await asyncio.sleep(_SLEEP_TIME_UNIT_S)
48    yield _RESPONSE
49
50
51async def _test_sleepy_stream_unary(unused_request_iterator, context):
52    assert _REQUEST == await context.read()
53    await asyncio.sleep(_SLEEP_TIME_UNIT_S)
54    assert _REQUEST == await context.read()
55    return _RESPONSE
56
57
58async def _test_sleepy_stream_stream(unused_request_iterator, context):
59    assert _REQUEST == await context.read()
60    await asyncio.sleep(_SLEEP_TIME_UNIT_S)
61    await context.write(_RESPONSE)
62
63
64_ROUTING_TABLE = {
65    _TEST_SLEEPY_UNARY_UNARY:
66        grpc.unary_unary_rpc_method_handler(_test_sleepy_unary_unary),
67    _TEST_SLEEPY_UNARY_STREAM:
68        grpc.unary_stream_rpc_method_handler(_test_sleepy_unary_stream),
69    _TEST_SLEEPY_STREAM_UNARY:
70        grpc.stream_unary_rpc_method_handler(_test_sleepy_stream_unary),
71    _TEST_SLEEPY_STREAM_STREAM:
72        grpc.stream_stream_rpc_method_handler(_test_sleepy_stream_stream)
73}
74
75
76class _GenericHandler(grpc.GenericRpcHandler):
77
78    def service(self, handler_call_details):
79        return _ROUTING_TABLE.get(handler_call_details.method)
80
81
82async def _start_test_server():
83    server = aio.server()
84    port = server.add_insecure_port('[::]:0')
85    server.add_generic_rpc_handlers((_GenericHandler(),))
86    await server.start()
87    return f'localhost:{port}', server
88
89
90class TestTimeout(AioTestBase):
91
92    async def setUp(self):
93        address, self._server = await _start_test_server()
94        self._client = aio.insecure_channel(address)
95        self.assertEqual(grpc.ChannelConnectivity.IDLE,
96                         self._client.get_state(True))
97        await _common.block_until_certain_state(self._client,
98                                                grpc.ChannelConnectivity.READY)
99
100    async def tearDown(self):
101        await self._client.close()
102        await self._server.stop(None)
103
104    async def test_unary_unary_success_with_timeout(self):
105        multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY)
106        call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S)
107        self.assertEqual(_RESPONSE, await call)
108        self.assertEqual(grpc.StatusCode.OK, await call.code())
109
110    async def test_unary_unary_deadline_exceeded(self):
111        multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY)
112        call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S)
113
114        with self.assertRaises(aio.AioRpcError) as exception_context:
115            await call
116
117        rpc_error = exception_context.exception
118        self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())
119
120    async def test_unary_stream_success_with_timeout(self):
121        multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM)
122        call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S)
123        self.assertEqual(_RESPONSE, await call.read())
124        self.assertEqual(_RESPONSE, await call.read())
125        self.assertEqual(grpc.StatusCode.OK, await call.code())
126
127    async def test_unary_stream_deadline_exceeded(self):
128        multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM)
129        call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S)
130        self.assertEqual(_RESPONSE, await call.read())
131
132        with self.assertRaises(aio.AioRpcError) as exception_context:
133            await call.read()
134
135        rpc_error = exception_context.exception
136        self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())
137
138    async def test_stream_unary_success_with_timeout(self):
139        multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY)
140        call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S)
141        await call.write(_REQUEST)
142        await call.write(_REQUEST)
143        self.assertEqual(grpc.StatusCode.OK, await call.code())
144
145    async def test_stream_unary_deadline_exceeded(self):
146        multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY)
147        call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S)
148
149        with self.assertRaises(aio.AioRpcError) as exception_context:
150            await call.write(_REQUEST)
151            await call.write(_REQUEST)
152            await call
153
154        rpc_error = exception_context.exception
155        self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())
156
157    async def test_stream_stream_success_with_timeout(self):
158        multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM)
159        call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S)
160        await call.write(_REQUEST)
161        self.assertEqual(_RESPONSE, await call.read())
162        self.assertEqual(grpc.StatusCode.OK, await call.code())
163
164    async def test_stream_stream_deadline_exceeded(self):
165        multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM)
166        call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S)
167
168        with self.assertRaises(aio.AioRpcError) as exception_context:
169            await call.write(_REQUEST)
170            await call.read()
171
172        rpc_error = exception_context.exception
173        self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())
174
175
176if __name__ == '__main__':
177    logging.basicConfig(level=logging.DEBUG)
178    unittest.main(verbosity=2)
179