1# Copyright 2019 The gRPC Authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests behavior of the connectivity state.""" 15 16import asyncio 17import logging 18import threading 19import time 20import unittest 21 22import grpc 23from grpc.experimental import aio 24 25from tests.unit.framework.common import test_constants 26from tests_aio.unit import _common 27from tests_aio.unit._constants import UNREACHABLE_TARGET 28from tests_aio.unit._test_base import AioTestBase 29from tests_aio.unit._test_server import start_test_server 30 31 32class TestConnectivityState(AioTestBase): 33 34 async def setUp(self): 35 self._server_address, self._server = await start_test_server() 36 37 async def tearDown(self): 38 await self._server.stop(None) 39 40 async def test_unavailable_backend(self): 41 async with aio.insecure_channel(UNREACHABLE_TARGET) as channel: 42 self.assertEqual(grpc.ChannelConnectivity.IDLE, 43 channel.get_state(False)) 44 self.assertEqual(grpc.ChannelConnectivity.IDLE, 45 channel.get_state(True)) 46 47 # Should not time out 48 await asyncio.wait_for( 49 _common.block_until_certain_state( 50 channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE), 51 test_constants.SHORT_TIMEOUT) 52 53 async def test_normal_backend(self): 54 async with aio.insecure_channel(self._server_address) as channel: 55 current_state = channel.get_state(True) 56 self.assertEqual(grpc.ChannelConnectivity.IDLE, current_state) 57 58 # Should not time out 59 await asyncio.wait_for( 60 _common.block_until_certain_state( 61 channel, grpc.ChannelConnectivity.READY), 62 test_constants.SHORT_TIMEOUT) 63 64 async def test_timeout(self): 65 async with aio.insecure_channel(self._server_address) as channel: 66 self.assertEqual(grpc.ChannelConnectivity.IDLE, 67 channel.get_state(False)) 68 69 # If timed out, the function should return None. 70 with self.assertRaises(asyncio.TimeoutError): 71 await asyncio.wait_for( 72 _common.block_until_certain_state( 73 channel, grpc.ChannelConnectivity.READY), 74 test_constants.SHORT_TIMEOUT) 75 76 async def test_shutdown(self): 77 channel = aio.insecure_channel(self._server_address) 78 79 self.assertEqual(grpc.ChannelConnectivity.IDLE, 80 channel.get_state(False)) 81 82 # Waiting for changes in a separate coroutine 83 wait_started = asyncio.Event() 84 85 async def a_pending_wait(): 86 wait_started.set() 87 await channel.wait_for_state_change(grpc.ChannelConnectivity.IDLE) 88 89 pending_task = self.loop.create_task(a_pending_wait()) 90 await wait_started.wait() 91 92 await channel.close() 93 94 self.assertEqual(grpc.ChannelConnectivity.SHUTDOWN, 95 channel.get_state(True)) 96 97 self.assertEqual(grpc.ChannelConnectivity.SHUTDOWN, 98 channel.get_state(False)) 99 100 # Make sure there isn't any exception in the task 101 await pending_task 102 103 # It can raise exceptions since it is an usage error, but it should not 104 # segfault or abort. 105 with self.assertRaises(aio.UsageError): 106 await channel.wait_for_state_change( 107 grpc.ChannelConnectivity.SHUTDOWN) 108 109 110if __name__ == '__main__': 111 logging.basicConfig(level=logging.DEBUG) 112 unittest.main(verbosity=2) 113