• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc.channel_ready_future."""
15
16import logging
17import threading
18import unittest
19
20import grpc
21
22from tests.unit import thread_pool
23from tests.unit.framework.common import test_constants
24
25
26class _Callback(object):
27    def __init__(self):
28        self._condition = threading.Condition()
29        self._value = None
30
31    def accept_value(self, value):
32        with self._condition:
33            self._value = value
34            self._condition.notify_all()
35
36    def block_until_called(self):
37        with self._condition:
38            while self._value is None:
39                self._condition.wait()
40            return self._value
41
42
43class ChannelReadyFutureTest(unittest.TestCase):
44    def test_lonely_channel_connectivity(self):
45        channel = grpc.insecure_channel("localhost:12345")
46        callback = _Callback()
47
48        ready_future = grpc.channel_ready_future(channel)
49        ready_future.add_done_callback(callback.accept_value)
50        with self.assertRaises(grpc.FutureTimeoutError):
51            ready_future.result(timeout=test_constants.SHORT_TIMEOUT)
52        self.assertFalse(ready_future.cancelled())
53        self.assertFalse(ready_future.done())
54        self.assertTrue(ready_future.running())
55        ready_future.cancel()
56        value_passed_to_callback = callback.block_until_called()
57        self.assertIs(ready_future, value_passed_to_callback)
58        self.assertTrue(ready_future.cancelled())
59        self.assertTrue(ready_future.done())
60        self.assertFalse(ready_future.running())
61
62        channel.close()
63
64    def test_immediately_connectable_channel_connectivity(self):
65        recording_thread_pool = thread_pool.RecordingThreadPool(
66            max_workers=None
67        )
68        server = grpc.server(
69            recording_thread_pool, options=(("grpc.so_reuseport", 0),)
70        )
71        port = server.add_insecure_port("[::]:0")
72        server.start()
73        channel = grpc.insecure_channel("localhost:{}".format(port))
74        callback = _Callback()
75
76        ready_future = grpc.channel_ready_future(channel)
77        ready_future.add_done_callback(callback.accept_value)
78        self.assertIsNone(
79            ready_future.result(timeout=test_constants.LONG_TIMEOUT)
80        )
81        value_passed_to_callback = callback.block_until_called()
82        self.assertIs(ready_future, value_passed_to_callback)
83        self.assertFalse(ready_future.cancelled())
84        self.assertTrue(ready_future.done())
85        self.assertFalse(ready_future.running())
86        # Cancellation after maturity has no effect.
87        ready_future.cancel()
88        self.assertFalse(ready_future.cancelled())
89        self.assertTrue(ready_future.done())
90        self.assertFalse(ready_future.running())
91        self.assertFalse(recording_thread_pool.was_used())
92
93        channel.close()
94        server.stop(None)
95
96
97if __name__ == "__main__":
98    logging.basicConfig()
99    unittest.main(verbosity=2)
100