1# Copyright 2019 The gRPC Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests behavior of the connectivity state.""" 15 16import asyncio 17import logging 18import platform 19import threading 20import time 21import unittest 22 23import grpc 24from grpc.experimental import aio 25 26from tests.unit.framework.common import test_constants 27from tests_aio.unit import _common 28from tests_aio.unit._constants import UNREACHABLE_TARGET 29from tests_aio.unit._test_base import AioTestBase 30from tests_aio.unit._test_server import start_test_server 31 32 33class TestConnectivityState(AioTestBase): 34 async def setUp(self): 35 self._server_address, self._server = await start_test_server() 36 37 async def tearDown(self): 38 await self._server.stop(None) 39 40 @unittest.skipIf( 41 "aarch64" in platform.machine(), 42 "The transient failure propagation is slower on aarch64", 43 ) 44 async def test_unavailable_backend(self): 45 async with aio.insecure_channel(UNREACHABLE_TARGET) as channel: 46 self.assertEqual( 47 grpc.ChannelConnectivity.IDLE, channel.get_state(False) 48 ) 49 self.assertEqual( 50 grpc.ChannelConnectivity.IDLE, channel.get_state(True) 51 ) 52 53 # Should not time out 54 await asyncio.wait_for( 55 _common.block_until_certain_state( 56 channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE 57 ), 58 test_constants.SHORT_TIMEOUT * 2, 59 ) 60 61 async def test_normal_backend(self): 62 async with aio.insecure_channel(self._server_address) as channel: 63 current_state = channel.get_state(True) 64 self.assertEqual(grpc.ChannelConnectivity.IDLE, current_state) 65 66 # Should not time out 67 await asyncio.wait_for( 68 _common.block_until_certain_state( 69 channel, grpc.ChannelConnectivity.READY 70 ), 71 test_constants.SHORT_TIMEOUT, 72 ) 73 74 async def test_timeout(self): 75 async with aio.insecure_channel(self._server_address) as channel: 76 self.assertEqual( 77 grpc.ChannelConnectivity.IDLE, channel.get_state(False) 78 ) 79 80 # If timed out, the function should return None. 81 with self.assertRaises(asyncio.TimeoutError): 82 await asyncio.wait_for( 83 _common.block_until_certain_state( 84 channel, grpc.ChannelConnectivity.READY 85 ), 86 test_constants.SHORT_TIMEOUT, 87 ) 88 89 async def test_shutdown(self): 90 channel = aio.insecure_channel(self._server_address) 91 92 self.assertEqual( 93 grpc.ChannelConnectivity.IDLE, channel.get_state(False) 94 ) 95 96 # Waiting for changes in a separate coroutine 97 wait_started = asyncio.Event() 98 99 async def a_pending_wait(): 100 wait_started.set() 101 await channel.wait_for_state_change(grpc.ChannelConnectivity.IDLE) 102 103 pending_task = self.loop.create_task(a_pending_wait()) 104 await wait_started.wait() 105 106 await channel.close() 107 108 self.assertEqual( 109 grpc.ChannelConnectivity.SHUTDOWN, channel.get_state(True) 110 ) 111 112 self.assertEqual( 113 grpc.ChannelConnectivity.SHUTDOWN, channel.get_state(False) 114 ) 115 116 # Make sure there isn't any exception in the task 117 await pending_task 118 119 # It can raise exceptions since it is a usage error, but it should not 120 # segfault or abort. 121 with self.assertRaises(aio.UsageError): 122 await channel.wait_for_state_change( 123 grpc.ChannelConnectivity.SHUTDOWN 124 ) 125 126 127if __name__ == "__main__": 128 logging.basicConfig(level=logging.DEBUG) 129 unittest.main(verbosity=2) 130