1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests behavior of the timeout mechanism on client side.""" 15 16import asyncio 17import datetime 18import logging 19import platform 20import random 21import unittest 22 23import grpc 24from grpc.experimental import aio 25 26from tests_aio.unit import _common 27from tests_aio.unit._test_base import AioTestBase 28 29_SLEEP_TIME_UNIT_S = datetime.timedelta(seconds=1).total_seconds() 30 31_TEST_SLEEPY_UNARY_UNARY = "/test/Test/SleepyUnaryUnary" 32_TEST_SLEEPY_UNARY_STREAM = "/test/Test/SleepyUnaryStream" 33_TEST_SLEEPY_STREAM_UNARY = "/test/Test/SleepyStreamUnary" 34_TEST_SLEEPY_STREAM_STREAM = "/test/Test/SleepyStreamStream" 35 36_REQUEST = b"\x00\x00\x00" 37_RESPONSE = b"\x01\x01\x01" 38 39 40async def _test_sleepy_unary_unary(unused_request, unused_context): 41 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 42 return _RESPONSE 43 44 45async def _test_sleepy_unary_stream(unused_request, unused_context): 46 yield _RESPONSE 47 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 48 yield _RESPONSE 49 50 51async def _test_sleepy_stream_unary(unused_request_iterator, context): 52 assert _REQUEST == await context.read() 53 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 54 assert _REQUEST == await context.read() 55 return _RESPONSE 56 57 58async def _test_sleepy_stream_stream(unused_request_iterator, context): 59 assert _REQUEST == await context.read() 60 await asyncio.sleep(_SLEEP_TIME_UNIT_S) 61 await context.write(_RESPONSE) 62 63 64_ROUTING_TABLE = { 65 _TEST_SLEEPY_UNARY_UNARY: grpc.unary_unary_rpc_method_handler( 66 _test_sleepy_unary_unary 67 ), 68 _TEST_SLEEPY_UNARY_STREAM: grpc.unary_stream_rpc_method_handler( 69 _test_sleepy_unary_stream 70 ), 71 _TEST_SLEEPY_STREAM_UNARY: grpc.stream_unary_rpc_method_handler( 72 _test_sleepy_stream_unary 73 ), 74 _TEST_SLEEPY_STREAM_STREAM: grpc.stream_stream_rpc_method_handler( 75 _test_sleepy_stream_stream 76 ), 77} 78 79 80class _GenericHandler(grpc.GenericRpcHandler): 81 def service(self, handler_call_details): 82 return _ROUTING_TABLE.get(handler_call_details.method) 83 84 85async def _start_test_server(): 86 server = aio.server() 87 port = server.add_insecure_port("[::]:0") 88 server.add_generic_rpc_handlers((_GenericHandler(),)) 89 await server.start() 90 return f"localhost:{port}", server 91 92 93class TestTimeout(AioTestBase): 94 async def setUp(self): 95 address, self._server = await _start_test_server() 96 self._client = aio.insecure_channel(address) 97 self.assertEqual( 98 grpc.ChannelConnectivity.IDLE, self._client.get_state(True) 99 ) 100 await _common.block_until_certain_state( 101 self._client, grpc.ChannelConnectivity.READY 102 ) 103 104 async def tearDown(self): 105 await self._client.close() 106 await self._server.stop(None) 107 108 async def test_unary_unary_success_with_timeout(self): 109 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY) 110 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S) 111 self.assertEqual(_RESPONSE, await call) 112 self.assertEqual(grpc.StatusCode.OK, await call.code()) 113 114 async def test_unary_unary_deadline_exceeded(self): 115 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY) 116 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S) 117 118 with self.assertRaises(aio.AioRpcError) as exception_context: 119 await call 120 121 rpc_error = exception_context.exception 122 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 123 124 async def test_unary_stream_success_with_timeout(self): 125 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM) 126 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S) 127 self.assertEqual(_RESPONSE, await call.read()) 128 self.assertEqual(_RESPONSE, await call.read()) 129 self.assertEqual(grpc.StatusCode.OK, await call.code()) 130 131 async def test_unary_stream_deadline_exceeded(self): 132 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM) 133 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S) 134 self.assertEqual(_RESPONSE, await call.read()) 135 136 with self.assertRaises(aio.AioRpcError) as exception_context: 137 await call.read() 138 139 rpc_error = exception_context.exception 140 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 141 142 async def test_stream_unary_success_with_timeout(self): 143 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY) 144 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S) 145 await call.write(_REQUEST) 146 await call.write(_REQUEST) 147 self.assertEqual(grpc.StatusCode.OK, await call.code()) 148 149 async def test_stream_unary_deadline_exceeded(self): 150 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY) 151 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S) 152 153 with self.assertRaises(aio.AioRpcError) as exception_context: 154 await call.write(_REQUEST) 155 await call.write(_REQUEST) 156 await call 157 158 rpc_error = exception_context.exception 159 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 160 161 async def test_stream_stream_success_with_timeout(self): 162 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM) 163 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S) 164 await call.write(_REQUEST) 165 self.assertEqual(_RESPONSE, await call.read()) 166 self.assertEqual(grpc.StatusCode.OK, await call.code()) 167 168 async def test_stream_stream_deadline_exceeded(self): 169 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM) 170 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S) 171 172 with self.assertRaises(aio.AioRpcError) as exception_context: 173 await call.write(_REQUEST) 174 await call.read() 175 176 rpc_error = exception_context.exception 177 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code()) 178 179 180if __name__ == "__main__": 181 logging.basicConfig(level=logging.DEBUG) 182 unittest.main(verbosity=2) 183