1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc._channel.Channel connectivity.""" 15 16import logging 17import threading 18import time 19import unittest 20 21import grpc 22 23from tests.unit import thread_pool 24from tests.unit.framework.common import test_constants 25 26 27def _ready_in_connectivities(connectivities): 28 return grpc.ChannelConnectivity.READY in connectivities 29 30 31def _last_connectivity_is_not_ready(connectivities): 32 return connectivities[-1] is not grpc.ChannelConnectivity.READY 33 34 35class _Callback(object): 36 def __init__(self): 37 self._condition = threading.Condition() 38 self._connectivities = [] 39 40 def update(self, connectivity): 41 with self._condition: 42 self._connectivities.append(connectivity) 43 self._condition.notify() 44 45 def connectivities(self): 46 with self._condition: 47 return tuple(self._connectivities) 48 49 def block_until_connectivities_satisfy(self, predicate): 50 with self._condition: 51 while True: 52 connectivities = tuple(self._connectivities) 53 if predicate(connectivities): 54 return connectivities 55 else: 56 self._condition.wait() 57 58 59class ChannelConnectivityTest(unittest.TestCase): 60 def test_lonely_channel_connectivity(self): 61 callback = _Callback() 62 63 channel = grpc.insecure_channel("localhost:12345") 64 channel.subscribe(callback.update, try_to_connect=False) 65 first_connectivities = callback.block_until_connectivities_satisfy(bool) 66 channel.subscribe(callback.update, try_to_connect=True) 67 second_connectivities = callback.block_until_connectivities_satisfy( 68 lambda connectivities: 2 <= len(connectivities) 69 ) 70 # Wait for a connection that will never happen. 71 time.sleep(test_constants.SHORT_TIMEOUT) 72 third_connectivities = callback.connectivities() 73 channel.unsubscribe(callback.update) 74 fourth_connectivities = callback.connectivities() 75 channel.unsubscribe(callback.update) 76 fifth_connectivities = callback.connectivities() 77 78 channel.close() 79 80 self.assertSequenceEqual( 81 (grpc.ChannelConnectivity.IDLE,), first_connectivities 82 ) 83 self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities) 84 self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities) 85 self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities) 86 self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities) 87 88 def test_immediately_connectable_channel_connectivity(self): 89 recording_thread_pool = thread_pool.RecordingThreadPool( 90 max_workers=None 91 ) 92 server = grpc.server( 93 recording_thread_pool, options=(("grpc.so_reuseport", 0),) 94 ) 95 port = server.add_insecure_port("[::]:0") 96 server.start() 97 first_callback = _Callback() 98 second_callback = _Callback() 99 100 channel = grpc.insecure_channel("localhost:{}".format(port)) 101 channel.subscribe(first_callback.update, try_to_connect=False) 102 first_connectivities = ( 103 first_callback.block_until_connectivities_satisfy(bool) 104 ) 105 # Wait for a connection that will never happen because try_to_connect=True 106 # has not yet been passed. 107 time.sleep(test_constants.SHORT_TIMEOUT) 108 second_connectivities = first_callback.connectivities() 109 channel.subscribe(second_callback.update, try_to_connect=True) 110 third_connectivities = ( 111 first_callback.block_until_connectivities_satisfy( 112 lambda connectivities: 2 <= len(connectivities) 113 ) 114 ) 115 fourth_connectivities = ( 116 second_callback.block_until_connectivities_satisfy(bool) 117 ) 118 # Wait for a connection that will happen (or may already have happened). 119 first_callback.block_until_connectivities_satisfy( 120 _ready_in_connectivities 121 ) 122 second_callback.block_until_connectivities_satisfy( 123 _ready_in_connectivities 124 ) 125 channel.close() 126 server.stop(None) 127 128 self.assertSequenceEqual( 129 (grpc.ChannelConnectivity.IDLE,), first_connectivities 130 ) 131 self.assertSequenceEqual( 132 (grpc.ChannelConnectivity.IDLE,), second_connectivities 133 ) 134 self.assertNotIn( 135 grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities 136 ) 137 self.assertNotIn( 138 grpc.ChannelConnectivity.SHUTDOWN, third_connectivities 139 ) 140 self.assertNotIn( 141 grpc.ChannelConnectivity.TRANSIENT_FAILURE, fourth_connectivities 142 ) 143 self.assertNotIn( 144 grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities 145 ) 146 self.assertFalse(recording_thread_pool.was_used()) 147 148 def test_reachable_then_unreachable_channel_connectivity(self): 149 recording_thread_pool = thread_pool.RecordingThreadPool( 150 max_workers=None 151 ) 152 server = grpc.server( 153 recording_thread_pool, options=(("grpc.so_reuseport", 0),) 154 ) 155 port = server.add_insecure_port("[::]:0") 156 server.start() 157 callback = _Callback() 158 159 channel = grpc.insecure_channel("localhost:{}".format(port)) 160 channel.subscribe(callback.update, try_to_connect=True) 161 callback.block_until_connectivities_satisfy(_ready_in_connectivities) 162 # Now take down the server and confirm that channel readiness is repudiated. 163 server.stop(None) 164 callback.block_until_connectivities_satisfy( 165 _last_connectivity_is_not_ready 166 ) 167 channel.unsubscribe(callback.update) 168 channel.close() 169 self.assertFalse(recording_thread_pool.was_used()) 170 171 172if __name__ == "__main__": 173 logging.basicConfig() 174 unittest.main(verbosity=2) 175