1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc._channel.Channel connectivity.""" 15 16import logging 17import threading 18import time 19import unittest 20 21import grpc 22from tests.unit.framework.common import test_constants 23from tests.unit import thread_pool 24 25 26def _ready_in_connectivities(connectivities): 27 return grpc.ChannelConnectivity.READY in connectivities 28 29 30def _last_connectivity_is_not_ready(connectivities): 31 return connectivities[-1] is not grpc.ChannelConnectivity.READY 32 33 34class _Callback(object): 35 36 def __init__(self): 37 self._condition = threading.Condition() 38 self._connectivities = [] 39 40 def update(self, connectivity): 41 with self._condition: 42 self._connectivities.append(connectivity) 43 self._condition.notify() 44 45 def connectivities(self): 46 with self._condition: 47 return tuple(self._connectivities) 48 49 def block_until_connectivities_satisfy(self, predicate): 50 with self._condition: 51 while True: 52 connectivities = tuple(self._connectivities) 53 if predicate(connectivities): 54 return connectivities 55 else: 56 self._condition.wait() 57 58 59class ChannelConnectivityTest(unittest.TestCase): 60 61 def test_lonely_channel_connectivity(self): 62 callback = _Callback() 63 64 channel = grpc.insecure_channel('localhost:12345') 65 channel.subscribe(callback.update, try_to_connect=False) 66 first_connectivities = callback.block_until_connectivities_satisfy(bool) 67 channel.subscribe(callback.update, try_to_connect=True) 68 second_connectivities = callback.block_until_connectivities_satisfy( 69 lambda connectivities: 2 <= len(connectivities)) 70 # Wait for a connection that will never happen. 71 time.sleep(test_constants.SHORT_TIMEOUT) 72 third_connectivities = callback.connectivities() 73 channel.unsubscribe(callback.update) 74 fourth_connectivities = callback.connectivities() 75 channel.unsubscribe(callback.update) 76 fifth_connectivities = callback.connectivities() 77 78 channel.close() 79 80 self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), 81 first_connectivities) 82 self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities) 83 self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities) 84 self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities) 85 self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities) 86 87 def test_immediately_connectable_channel_connectivity(self): 88 recording_thread_pool = thread_pool.RecordingThreadPool( 89 max_workers=None) 90 server = grpc.server(recording_thread_pool, 91 options=(('grpc.so_reuseport', 0),)) 92 port = server.add_insecure_port('[::]:0') 93 server.start() 94 first_callback = _Callback() 95 second_callback = _Callback() 96 97 channel = grpc.insecure_channel('localhost:{}'.format(port)) 98 channel.subscribe(first_callback.update, try_to_connect=False) 99 first_connectivities = first_callback.block_until_connectivities_satisfy( 100 bool) 101 # Wait for a connection that will never happen because try_to_connect=True 102 # has not yet been passed. 103 time.sleep(test_constants.SHORT_TIMEOUT) 104 second_connectivities = first_callback.connectivities() 105 channel.subscribe(second_callback.update, try_to_connect=True) 106 third_connectivities = first_callback.block_until_connectivities_satisfy( 107 lambda connectivities: 2 <= len(connectivities)) 108 fourth_connectivities = second_callback.block_until_connectivities_satisfy( 109 bool) 110 # Wait for a connection that will happen (or may already have happened). 111 first_callback.block_until_connectivities_satisfy( 112 _ready_in_connectivities) 113 second_callback.block_until_connectivities_satisfy( 114 _ready_in_connectivities) 115 channel.close() 116 server.stop(None) 117 118 self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), 119 first_connectivities) 120 self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,), 121 second_connectivities) 122 self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE, 123 third_connectivities) 124 self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN, 125 third_connectivities) 126 self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE, 127 fourth_connectivities) 128 self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN, 129 fourth_connectivities) 130 self.assertFalse(recording_thread_pool.was_used()) 131 132 def test_reachable_then_unreachable_channel_connectivity(self): 133 recording_thread_pool = thread_pool.RecordingThreadPool( 134 max_workers=None) 135 server = grpc.server(recording_thread_pool, 136 options=(('grpc.so_reuseport', 0),)) 137 port = server.add_insecure_port('[::]:0') 138 server.start() 139 callback = _Callback() 140 141 channel = grpc.insecure_channel('localhost:{}'.format(port)) 142 channel.subscribe(callback.update, try_to_connect=True) 143 callback.block_until_connectivities_satisfy(_ready_in_connectivities) 144 # Now take down the server and confirm that channel readiness is repudiated. 145 server.stop(None) 146 callback.block_until_connectivities_satisfy( 147 _last_connectivity_is_not_ready) 148 channel.unsubscribe(callback.update) 149 channel.close() 150 self.assertFalse(recording_thread_pool.was_used()) 151 152 153if __name__ == '__main__': 154 logging.basicConfig() 155 unittest.main(verbosity=2) 156