• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2019 The gRPC Authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests behavior of the connectivity state."""
15
16import asyncio
17import logging
18import platform
19import threading
20import time
21import unittest
22
23import grpc
24from grpc.experimental import aio
25
26from tests.unit.framework.common import test_constants
27from tests_aio.unit import _common
28from tests_aio.unit._constants import UNREACHABLE_TARGET
29from tests_aio.unit._test_base import AioTestBase
30from tests_aio.unit._test_server import start_test_server
31
32
33class TestConnectivityState(AioTestBase):
34    async def setUp(self):
35        self._server_address, self._server = await start_test_server()
36
37    async def tearDown(self):
38        await self._server.stop(None)
39
40    @unittest.skipIf(
41        "aarch64" in platform.machine(),
42        "The transient failure propagation is slower on aarch64",
43    )
44    async def test_unavailable_backend(self):
45        async with aio.insecure_channel(UNREACHABLE_TARGET) as channel:
46            self.assertEqual(
47                grpc.ChannelConnectivity.IDLE, channel.get_state(False)
48            )
49            self.assertEqual(
50                grpc.ChannelConnectivity.IDLE, channel.get_state(True)
51            )
52
53            # Should not time out
54            await asyncio.wait_for(
55                _common.block_until_certain_state(
56                    channel, grpc.ChannelConnectivity.TRANSIENT_FAILURE
57                ),
58                test_constants.SHORT_TIMEOUT * 2,
59            )
60
61    async def test_normal_backend(self):
62        async with aio.insecure_channel(self._server_address) as channel:
63            current_state = channel.get_state(True)
64            self.assertEqual(grpc.ChannelConnectivity.IDLE, current_state)
65
66            # Should not time out
67            await asyncio.wait_for(
68                _common.block_until_certain_state(
69                    channel, grpc.ChannelConnectivity.READY
70                ),
71                test_constants.SHORT_TIMEOUT,
72            )
73
74    async def test_timeout(self):
75        async with aio.insecure_channel(self._server_address) as channel:
76            self.assertEqual(
77                grpc.ChannelConnectivity.IDLE, channel.get_state(False)
78            )
79
80            # If timed out, the function should return None.
81            with self.assertRaises(asyncio.TimeoutError):
82                await asyncio.wait_for(
83                    _common.block_until_certain_state(
84                        channel, grpc.ChannelConnectivity.READY
85                    ),
86                    test_constants.SHORT_TIMEOUT,
87                )
88
89    async def test_shutdown(self):
90        channel = aio.insecure_channel(self._server_address)
91
92        self.assertEqual(
93            grpc.ChannelConnectivity.IDLE, channel.get_state(False)
94        )
95
96        # Waiting for changes in a separate coroutine
97        wait_started = asyncio.Event()
98
99        async def a_pending_wait():
100            wait_started.set()
101            await channel.wait_for_state_change(grpc.ChannelConnectivity.IDLE)
102
103        pending_task = self.loop.create_task(a_pending_wait())
104        await wait_started.wait()
105
106        await channel.close()
107
108        self.assertEqual(
109            grpc.ChannelConnectivity.SHUTDOWN, channel.get_state(True)
110        )
111
112        self.assertEqual(
113            grpc.ChannelConnectivity.SHUTDOWN, channel.get_state(False)
114        )
115
116        # Make sure there isn't any exception in the task
117        await pending_task
118
119        # It can raise exceptions since it is a usage error, but it should not
120        # segfault or abort.
121        with self.assertRaises(aio.UsageError):
122            await channel.wait_for_state_change(
123                grpc.ChannelConnectivity.SHUTDOWN
124            )
125
126
127if __name__ == "__main__":
128    logging.basicConfig(level=logging.DEBUG)
129    unittest.main(verbosity=2)
130