1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc.channel_ready_future.""" 15 16import logging 17import threading 18import unittest 19 20import grpc 21 22from tests.unit import thread_pool 23from tests.unit.framework.common import test_constants 24 25 26class _Callback(object): 27 def __init__(self): 28 self._condition = threading.Condition() 29 self._value = None 30 31 def accept_value(self, value): 32 with self._condition: 33 self._value = value 34 self._condition.notify_all() 35 36 def block_until_called(self): 37 with self._condition: 38 while self._value is None: 39 self._condition.wait() 40 return self._value 41 42 43class ChannelReadyFutureTest(unittest.TestCase): 44 def test_lonely_channel_connectivity(self): 45 channel = grpc.insecure_channel("localhost:12345") 46 callback = _Callback() 47 48 ready_future = grpc.channel_ready_future(channel) 49 ready_future.add_done_callback(callback.accept_value) 50 with self.assertRaises(grpc.FutureTimeoutError): 51 ready_future.result(timeout=test_constants.SHORT_TIMEOUT) 52 self.assertFalse(ready_future.cancelled()) 53 self.assertFalse(ready_future.done()) 54 self.assertTrue(ready_future.running()) 55 ready_future.cancel() 56 value_passed_to_callback = callback.block_until_called() 57 self.assertIs(ready_future, value_passed_to_callback) 58 self.assertTrue(ready_future.cancelled()) 59 self.assertTrue(ready_future.done()) 60 self.assertFalse(ready_future.running()) 61 62 channel.close() 63 64 def test_immediately_connectable_channel_connectivity(self): 65 recording_thread_pool = thread_pool.RecordingThreadPool( 66 max_workers=None 67 ) 68 server = grpc.server( 69 recording_thread_pool, options=(("grpc.so_reuseport", 0),) 70 ) 71 port = server.add_insecure_port("[::]:0") 72 server.start() 73 channel = grpc.insecure_channel("localhost:{}".format(port)) 74 callback = _Callback() 75 76 ready_future = grpc.channel_ready_future(channel) 77 ready_future.add_done_callback(callback.accept_value) 78 self.assertIsNone( 79 ready_future.result(timeout=test_constants.LONG_TIMEOUT) 80 ) 81 value_passed_to_callback = callback.block_until_called() 82 self.assertIs(ready_future, value_passed_to_callback) 83 self.assertFalse(ready_future.cancelled()) 84 self.assertTrue(ready_future.done()) 85 self.assertFalse(ready_future.running()) 86 # Cancellation after maturity has no effect. 87 ready_future.cancel() 88 self.assertFalse(ready_future.cancelled()) 89 self.assertTrue(ready_future.done()) 90 self.assertFalse(ready_future.running()) 91 self.assertFalse(recording_thread_pool.was_used()) 92 93 channel.close() 94 server.stop(None) 95 96 97if __name__ == "__main__": 98 logging.basicConfig() 99 unittest.main(verbosity=2) 100