• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc._channel.Channel connectivity."""
15
16import logging
17import threading
18import time
19import unittest
20
21import grpc
22from tests.unit.framework.common import test_constants
23from tests.unit import thread_pool
24
25
26def _ready_in_connectivities(connectivities):
27    return grpc.ChannelConnectivity.READY in connectivities
28
29
30def _last_connectivity_is_not_ready(connectivities):
31    return connectivities[-1] is not grpc.ChannelConnectivity.READY
32
33
34class _Callback(object):
35
36    def __init__(self):
37        self._condition = threading.Condition()
38        self._connectivities = []
39
40    def update(self, connectivity):
41        with self._condition:
42            self._connectivities.append(connectivity)
43            self._condition.notify()
44
45    def connectivities(self):
46        with self._condition:
47            return tuple(self._connectivities)
48
49    def block_until_connectivities_satisfy(self, predicate):
50        with self._condition:
51            while True:
52                connectivities = tuple(self._connectivities)
53                if predicate(connectivities):
54                    return connectivities
55                else:
56                    self._condition.wait()
57
58
59class ChannelConnectivityTest(unittest.TestCase):
60
61    def test_lonely_channel_connectivity(self):
62        callback = _Callback()
63
64        channel = grpc.insecure_channel('localhost:12345')
65        channel.subscribe(callback.update, try_to_connect=False)
66        first_connectivities = callback.block_until_connectivities_satisfy(bool)
67        channel.subscribe(callback.update, try_to_connect=True)
68        second_connectivities = callback.block_until_connectivities_satisfy(
69            lambda connectivities: 2 <= len(connectivities))
70        # Wait for a connection that will never happen.
71        time.sleep(test_constants.SHORT_TIMEOUT)
72        third_connectivities = callback.connectivities()
73        channel.unsubscribe(callback.update)
74        fourth_connectivities = callback.connectivities()
75        channel.unsubscribe(callback.update)
76        fifth_connectivities = callback.connectivities()
77
78        channel.close()
79
80        self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
81                                 first_connectivities)
82        self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities)
83        self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities)
84        self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities)
85        self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities)
86
87    def test_immediately_connectable_channel_connectivity(self):
88        recording_thread_pool = thread_pool.RecordingThreadPool(
89            max_workers=None)
90        server = grpc.server(recording_thread_pool,
91                             options=(('grpc.so_reuseport', 0),))
92        port = server.add_insecure_port('[::]:0')
93        server.start()
94        first_callback = _Callback()
95        second_callback = _Callback()
96
97        channel = grpc.insecure_channel('localhost:{}'.format(port))
98        channel.subscribe(first_callback.update, try_to_connect=False)
99        first_connectivities = first_callback.block_until_connectivities_satisfy(
100            bool)
101        # Wait for a connection that will never happen because try_to_connect=True
102        # has not yet been passed.
103        time.sleep(test_constants.SHORT_TIMEOUT)
104        second_connectivities = first_callback.connectivities()
105        channel.subscribe(second_callback.update, try_to_connect=True)
106        third_connectivities = first_callback.block_until_connectivities_satisfy(
107            lambda connectivities: 2 <= len(connectivities))
108        fourth_connectivities = second_callback.block_until_connectivities_satisfy(
109            bool)
110        # Wait for a connection that will happen (or may already have happened).
111        first_callback.block_until_connectivities_satisfy(
112            _ready_in_connectivities)
113        second_callback.block_until_connectivities_satisfy(
114            _ready_in_connectivities)
115        channel.close()
116        server.stop(None)
117
118        self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
119                                 first_connectivities)
120        self.assertSequenceEqual((grpc.ChannelConnectivity.IDLE,),
121                                 second_connectivities)
122        self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
123                         third_connectivities)
124        self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
125                         third_connectivities)
126        self.assertNotIn(grpc.ChannelConnectivity.TRANSIENT_FAILURE,
127                         fourth_connectivities)
128        self.assertNotIn(grpc.ChannelConnectivity.SHUTDOWN,
129                         fourth_connectivities)
130        self.assertFalse(recording_thread_pool.was_used())
131
132    def test_reachable_then_unreachable_channel_connectivity(self):
133        recording_thread_pool = thread_pool.RecordingThreadPool(
134            max_workers=None)
135        server = grpc.server(recording_thread_pool,
136                             options=(('grpc.so_reuseport', 0),))
137        port = server.add_insecure_port('[::]:0')
138        server.start()
139        callback = _Callback()
140
141        channel = grpc.insecure_channel('localhost:{}'.format(port))
142        channel.subscribe(callback.update, try_to_connect=True)
143        callback.block_until_connectivities_satisfy(_ready_in_connectivities)
144        # Now take down the server and confirm that channel readiness is repudiated.
145        server.stop(None)
146        callback.block_until_connectivities_satisfy(
147            _last_connectivity_is_not_ready)
148        channel.unsubscribe(callback.update)
149        channel.close()
150        self.assertFalse(recording_thread_pool.was_used())
151
152
153if __name__ == '__main__':
154    logging.basicConfig()
155    unittest.main(verbosity=2)
156