1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc.channel_ready_future.""" 15 16import threading 17import unittest 18import logging 19 20import grpc 21from tests.unit.framework.common import test_constants 22from tests.unit import thread_pool 23 24 25class _Callback(object): 26 27 def __init__(self): 28 self._condition = threading.Condition() 29 self._value = None 30 31 def accept_value(self, value): 32 with self._condition: 33 self._value = value 34 self._condition.notify_all() 35 36 def block_until_called(self): 37 with self._condition: 38 while self._value is None: 39 self._condition.wait() 40 return self._value 41 42 43class ChannelReadyFutureTest(unittest.TestCase): 44 45 def test_lonely_channel_connectivity(self): 46 channel = grpc.insecure_channel('localhost:12345') 47 callback = _Callback() 48 49 ready_future = grpc.channel_ready_future(channel) 50 ready_future.add_done_callback(callback.accept_value) 51 with self.assertRaises(grpc.FutureTimeoutError): 52 ready_future.result(timeout=test_constants.SHORT_TIMEOUT) 53 self.assertFalse(ready_future.cancelled()) 54 self.assertFalse(ready_future.done()) 55 self.assertTrue(ready_future.running()) 56 ready_future.cancel() 57 value_passed_to_callback = callback.block_until_called() 58 self.assertIs(ready_future, value_passed_to_callback) 59 self.assertTrue(ready_future.cancelled()) 60 self.assertTrue(ready_future.done()) 61 self.assertFalse(ready_future.running()) 62 63 channel.close() 64 65 def test_immediately_connectable_channel_connectivity(self): 66 recording_thread_pool = thread_pool.RecordingThreadPool( 67 max_workers=None) 68 server = grpc.server(recording_thread_pool, 69 options=(('grpc.so_reuseport', 0),)) 70 port = server.add_insecure_port('[::]:0') 71 server.start() 72 channel = grpc.insecure_channel('localhost:{}'.format(port)) 73 callback = _Callback() 74 75 ready_future = grpc.channel_ready_future(channel) 76 ready_future.add_done_callback(callback.accept_value) 77 self.assertIsNone( 78 ready_future.result(timeout=test_constants.LONG_TIMEOUT)) 79 value_passed_to_callback = callback.block_until_called() 80 self.assertIs(ready_future, value_passed_to_callback) 81 self.assertFalse(ready_future.cancelled()) 82 self.assertTrue(ready_future.done()) 83 self.assertFalse(ready_future.running()) 84 # Cancellation after maturity has no effect. 85 ready_future.cancel() 86 self.assertFalse(ready_future.cancelled()) 87 self.assertTrue(ready_future.done()) 88 self.assertFalse(ready_future.running()) 89 self.assertFalse(recording_thread_pool.was_used()) 90 91 channel.close() 92 server.stop(None) 93 94 95if __name__ == '__main__': 96 logging.basicConfig() 97 unittest.main(verbosity=2) 98