• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc._channel.Channel connectivity."""
15
16import logging
17import threading
18import time
19import unittest
20
21import grpc
22
23from tests.unit import thread_pool
24from tests.unit.framework.common import test_constants
25
26
27def _ready_in_connectivities(connectivities):
28    return grpc.ChannelConnectivity.READY in connectivities
29
30
31def _last_connectivity_is_not_ready(connectivities):
32    return connectivities[-1] is not grpc.ChannelConnectivity.READY
33
34
35class _Callback(object):
36    def __init__(self):
37        self._condition = threading.Condition()
38        self._connectivities = []
39
40    def update(self, connectivity):
41        with self._condition:
42            self._connectivities.append(connectivity)
43            self._condition.notify()
44
45    def connectivities(self):
46        with self._condition:
47            return tuple(self._connectivities)
48
49    def block_until_connectivities_satisfy(self, predicate):
50        with self._condition:
51            while True:
52                connectivities = tuple(self._connectivities)
53                if predicate(connectivities):
54                    return connectivities
55                else:
56                    self._condition.wait()
57
58
59class ChannelConnectivityTest(unittest.TestCase):
60    def test_lonely_channel_connectivity(self):
61        callback = _Callback()
62
63        channel = grpc.insecure_channel("localhost:12345")
64        channel.subscribe(callback.update, try_to_connect=False)
65        first_connectivities = callback.block_until_connectivities_satisfy(bool)
66        channel.subscribe(callback.update, try_to_connect=True)
67        second_connectivities = callback.block_until_connectivities_satisfy(
68            lambda connectivities: 2 <= len(connectivities)
69        )
70        # Wait for a connection that will never happen.
71        time.sleep(test_constants.SHORT_TIMEOUT)
72        third_connectivities = callback.connectivities()
73        channel.unsubscribe(callback.update)
74        fourth_connectivities = callback.connectivities()
75        channel.unsubscribe(callback.update)
76        fifth_connectivities = callback.connectivities()
77
78        channel.close()
79
80        self.assertSequenceEqual(
81            (grpc.ChannelConnectivity.IDLE,), first_connectivities
82        )
83        self.assertNotIn(grpc.ChannelConnectivity.READY, second_connectivities)
84        self.assertNotIn(grpc.ChannelConnectivity.READY, third_connectivities)
85        self.assertNotIn(grpc.ChannelConnectivity.READY, fourth_connectivities)
86        self.assertNotIn(grpc.ChannelConnectivity.READY, fifth_connectivities)
87
88    def test_immediately_connectable_channel_connectivity(self):
89        recording_thread_pool = thread_pool.RecordingThreadPool(
90            max_workers=None
91        )
92        server = grpc.server(
93            recording_thread_pool, options=(("grpc.so_reuseport", 0),)
94        )
95        port = server.add_insecure_port("[::]:0")
96        server.start()
97        first_callback = _Callback()
98        second_callback = _Callback()
99
100        channel = grpc.insecure_channel("localhost:{}".format(port))
101        channel.subscribe(first_callback.update, try_to_connect=False)
102        first_connectivities = (
103            first_callback.block_until_connectivities_satisfy(bool)
104        )
105        # Wait for a connection that will never happen because try_to_connect=True
106        # has not yet been passed.
107        time.sleep(test_constants.SHORT_TIMEOUT)
108        second_connectivities = first_callback.connectivities()
109        channel.subscribe(second_callback.update, try_to_connect=True)
110        third_connectivities = (
111            first_callback.block_until_connectivities_satisfy(
112                lambda connectivities: 2 <= len(connectivities)
113            )
114        )
115        fourth_connectivities = (
116            second_callback.block_until_connectivities_satisfy(bool)
117        )
118        # Wait for a connection that will happen (or may already have happened).
119        first_callback.block_until_connectivities_satisfy(
120            _ready_in_connectivities
121        )
122        second_callback.block_until_connectivities_satisfy(
123            _ready_in_connectivities
124        )
125        channel.close()
126        server.stop(None)
127
128        self.assertSequenceEqual(
129            (grpc.ChannelConnectivity.IDLE,), first_connectivities
130        )
131        self.assertSequenceEqual(
132            (grpc.ChannelConnectivity.IDLE,), second_connectivities
133        )
134        self.assertNotIn(
135            grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities
136        )
137        self.assertNotIn(
138            grpc.ChannelConnectivity.SHUTDOWN, third_connectivities
139        )
140        self.assertNotIn(
141            grpc.ChannelConnectivity.TRANSIENT_FAILURE, fourth_connectivities
142        )
143        self.assertNotIn(
144            grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities
145        )
146        self.assertFalse(recording_thread_pool.was_used())
147
148    def test_reachable_then_unreachable_channel_connectivity(self):
149        recording_thread_pool = thread_pool.RecordingThreadPool(
150            max_workers=None
151        )
152        server = grpc.server(
153            recording_thread_pool, options=(("grpc.so_reuseport", 0),)
154        )
155        port = server.add_insecure_port("[::]:0")
156        server.start()
157        callback = _Callback()
158
159        channel = grpc.insecure_channel("localhost:{}".format(port))
160        channel.subscribe(callback.update, try_to_connect=True)
161        callback.block_until_connectivities_satisfy(_ready_in_connectivities)
162        # Now take down the server and confirm that channel readiness is repudiated.
163        server.stop(None)
164        callback.block_until_connectivities_satisfy(
165            _last_connectivity_is_not_ready
166        )
167        channel.unsubscribe(callback.update)
168        channel.close()
169        self.assertFalse(recording_thread_pool.was_used())
170
171
172if __name__ == "__main__":
173    logging.basicConfig()
174    unittest.main(verbosity=2)
175