1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests behavior of the timeout mechanism on client side.""" 15 16import asyncio 17import logging 18import platform 19import random 20import unittest 21import datetime 22 23import grpc 24from grpc.experimental import aio 25 26from tests_aio.unit._test_base import AioTestBase 27from tests_aio.unit import _common 28 29_SLEEP_TIME_UNIT_S = datetime.timedelta(seconds=1).total_seconds() 30 31_TEST_SLEEPY_UNARY_UNARY = '/test/Test/SleepyUnaryUnary' 32_TEST_SLEEPY_UNARY_STREAM = '/test/Test/SleepyUnaryStream' 33_TEST_SLEEPY_STREAM_UNARY = '/test/Test/SleepyStreamUnary' 34_TEST_SLEEPY_STREAM_STREAM = '/test/Test/SleepyStreamStream' 35 36_REQUEST = b'\x00\x00\x00' 37_RESPONSE = b'\x01\x01\x01' 38 39 40async def _test_sleepy_unary_unary(unused_request, unused_context): 41 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 42 return _RESPONSE 43 44 45async def _test_sleepy_unary_stream(unused_request, unused_context): 46 yield _RESPONSE 47 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 48 yield _RESPONSE 49 50 51async def _test_sleepy_stream_unary(unused_request_iterator, context): 52 assert _REQUEST == await context.read() 53 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 54 assert _REQUEST == await context.read() 55 return _RESPONSE 56 57 58async def _test_sleepy_stream_stream(unused_request_iterator, context): 59 assert _REQUEST == await context.read() 60 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 61 await context.write(_RESPONSE) 62 63 64_ROUTING_TABLE = { 65 _TEST_SLEEPY_UNARY_UNARY: 66 grpc.unary_unary_rpc_method_handler(_test_sleepy_unary_unary), 67 _TEST_SLEEPY_UNARY_STREAM: 68 grpc.unary_stream_rpc_method_handler(_test_sleepy_unary_stream), 69 _TEST_SLEEPY_STREAM_UNARY: 70 grpc.stream_unary_rpc_method_handler(_test_sleepy_stream_unary), 71 _TEST_SLEEPY_STREAM_STREAM: 72 grpc.stream_stream_rpc_method_handler(_test_sleepy_stream_stream) 73} 74 75 76class _GenericHandler(grpc.GenericRpcHandler): 77 78 def service(self, handler_call_details): 79 return _ROUTING_TABLE.get(handler_call_details.method) 80 81 82async def _start_test_server(): 83 server = aio.server() 84 port = server.add_insecure_port('[::]:0') 85 server.add_generic_rpc_handlers((_GenericHandler(),)) 86 await server.start() 87 return f'localhost:{port}', server 88 89 90class TestTimeout(AioTestBase): 91 92 async def setUp(self): 93 address, self._server = await _start_test_server() 94 self._client = aio.insecure_channel(address) 95 self.assertEqual(grpc.ChannelConnectivity.IDLE, 96 self._client.get_state(True)) 97 await _common.block_until_certain_state(self._client, 98 grpc.ChannelConnectivity.READY) 99 100 async def tearDown(self): 101 await self._client.close() 102 await self._server.stop(None) 103 104 async def test_unary_unary_success_with_timeout(self): 105 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY) 106 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S) 107 self.assertEqual(_RESPONSE, await call) 108 self.assertEqual(grpc.StatusCode.OK, await call.code()) 109 110 async def test_unary_unary_deadline_exceeded(self): 111 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY) 112 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S) 113 114 with self.assertRaises(aio.AioRpcError) as exception_context: 115 await call 116 117 rpc_error = exception_context.exception 118 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 119 120 async def test_unary_stream_success_with_timeout(self): 121 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM) 122 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S) 123 self.assertEqual(_RESPONSE, await call.read()) 124 self.assertEqual(_RESPONSE, await call.read()) 125 self.assertEqual(grpc.StatusCode.OK, await call.code()) 126 127 async def test_unary_stream_deadline_exceeded(self): 128 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM) 129 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S) 130 self.assertEqual(_RESPONSE, await call.read()) 131 132 with self.assertRaises(aio.AioRpcError) as exception_context: 133 await call.read() 134 135 rpc_error = exception_context.exception 136 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 137 138 async def test_stream_unary_success_with_timeout(self): 139 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY) 140 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S) 141 await call.write(_REQUEST) 142 await call.write(_REQUEST) 143 self.assertEqual(grpc.StatusCode.OK, await call.code()) 144 145 async def test_stream_unary_deadline_exceeded(self): 146 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY) 147 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S) 148 149 with self.assertRaises(aio.AioRpcError) as exception_context: 150 await call.write(_REQUEST) 151 await call.write(_REQUEST) 152 await call 153 154 rpc_error = exception_context.exception 155 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 156 157 async def test_stream_stream_success_with_timeout(self): 158 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM) 159 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S) 160 await call.write(_REQUEST) 161 self.assertEqual(_RESPONSE, await call.read()) 162 self.assertEqual(grpc.StatusCode.OK, await call.code()) 163 164 async def test_stream_stream_deadline_exceeded(self): 165 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM) 166 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S) 167 168 with self.assertRaises(aio.AioRpcError) as exception_context: 169 await call.write(_REQUEST) 170 await call.read() 171 172 rpc_error = exception_context.exception 173 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 174 175 176if __name__ == '__main__': 177 logging.basicConfig(level=logging.DEBUG) 178 unittest.main(verbosity=2) 179