• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_health.v1.health."""
15
16import logging
17import threading
18import time
19import unittest
20
21import grpc
22
23from grpc_health.v1 import health
24from grpc_health.v1 import health_pb2
25from grpc_health.v1 import health_pb2_grpc
26
27from tests.unit import test_common
28from tests.unit import thread_pool
29from tests.unit.framework.common import test_constants
30
31from six.moves import queue
32
33_SERVING_SERVICE = 'grpc.test.TestServiceServing'
34_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
35_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
36_WATCH_SERVICE = 'grpc.test.WatchService'
37
38
39def _consume_responses(response_iterator, response_queue):
40    for response in response_iterator:
41        response_queue.put(response)
42
43
44class BaseWatchTests(object):
45
46    class WatchTests(unittest.TestCase):
47
48        def start_server(self, non_blocking=False, thread_pool=None):
49            self._thread_pool = thread_pool
50            self._servicer = health.HealthServicer(
51                experimental_non_blocking=non_blocking,
52                experimental_thread_pool=thread_pool)
53            self._servicer.set(_SERVING_SERVICE,
54                               health_pb2.HealthCheckResponse.SERVING)
55            self._servicer.set(_UNKNOWN_SERVICE,
56                               health_pb2.HealthCheckResponse.UNKNOWN)
57            self._servicer.set(_NOT_SERVING_SERVICE,
58                               health_pb2.HealthCheckResponse.NOT_SERVING)
59            self._server = test_common.test_server()
60            port = self._server.add_insecure_port('[::]:0')
61            health_pb2_grpc.add_HealthServicer_to_server(
62                self._servicer, self._server)
63            self._server.start()
64
65            self._channel = grpc.insecure_channel('localhost:%d' % port)
66            self._stub = health_pb2_grpc.HealthStub(self._channel)
67
68        def tearDown(self):
69            self._server.stop(None)
70            self._channel.close()
71
72        def test_watch_empty_service(self):
73            request = health_pb2.HealthCheckRequest(service='')
74            response_queue = queue.Queue()
75            rendezvous = self._stub.Watch(request)
76            thread = threading.Thread(target=_consume_responses,
77                                      args=(rendezvous, response_queue))
78            thread.start()
79
80            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
81            self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
82                             response.status)
83
84            rendezvous.cancel()
85            thread.join()
86            self.assertTrue(response_queue.empty())
87
88            if self._thread_pool is not None:
89                self.assertTrue(self._thread_pool.was_used())
90
91        def test_watch_new_service(self):
92            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
93            response_queue = queue.Queue()
94            rendezvous = self._stub.Watch(request)
95            thread = threading.Thread(target=_consume_responses,
96                                      args=(rendezvous, response_queue))
97            thread.start()
98
99            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
100            self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
101                             response.status)
102
103            self._servicer.set(_WATCH_SERVICE,
104                               health_pb2.HealthCheckResponse.SERVING)
105            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
106            self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
107                             response.status)
108
109            self._servicer.set(_WATCH_SERVICE,
110                               health_pb2.HealthCheckResponse.NOT_SERVING)
111            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
112            self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
113                             response.status)
114
115            rendezvous.cancel()
116            thread.join()
117            self.assertTrue(response_queue.empty())
118
119        def test_watch_service_isolation(self):
120            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
121            response_queue = queue.Queue()
122            rendezvous = self._stub.Watch(request)
123            thread = threading.Thread(target=_consume_responses,
124                                      args=(rendezvous, response_queue))
125            thread.start()
126
127            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
128            self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
129                             response.status)
130
131            self._servicer.set('some-other-service',
132                               health_pb2.HealthCheckResponse.SERVING)
133            with self.assertRaises(queue.Empty):
134                response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
135
136            rendezvous.cancel()
137            thread.join()
138            self.assertTrue(response_queue.empty())
139
140        def test_two_watchers(self):
141            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
142            response_queue1 = queue.Queue()
143            response_queue2 = queue.Queue()
144            rendezvous1 = self._stub.Watch(request)
145            rendezvous2 = self._stub.Watch(request)
146            thread1 = threading.Thread(target=_consume_responses,
147                                       args=(rendezvous1, response_queue1))
148            thread2 = threading.Thread(target=_consume_responses,
149                                       args=(rendezvous2, response_queue2))
150            thread1.start()
151            thread2.start()
152
153            response1 = response_queue1.get(
154                timeout=test_constants.SHORT_TIMEOUT)
155            response2 = response_queue2.get(
156                timeout=test_constants.SHORT_TIMEOUT)
157            self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
158                             response1.status)
159            self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
160                             response2.status)
161
162            self._servicer.set(_WATCH_SERVICE,
163                               health_pb2.HealthCheckResponse.SERVING)
164            response1 = response_queue1.get(
165                timeout=test_constants.SHORT_TIMEOUT)
166            response2 = response_queue2.get(
167                timeout=test_constants.SHORT_TIMEOUT)
168            self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
169                             response1.status)
170            self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
171                             response2.status)
172
173            rendezvous1.cancel()
174            rendezvous2.cancel()
175            thread1.join()
176            thread2.join()
177            self.assertTrue(response_queue1.empty())
178            self.assertTrue(response_queue2.empty())
179
180        @unittest.skip("https://github.com/grpc/grpc/issues/18127")
181        def test_cancelled_watch_removed_from_watch_list(self):
182            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
183            response_queue = queue.Queue()
184            rendezvous = self._stub.Watch(request)
185            thread = threading.Thread(target=_consume_responses,
186                                      args=(rendezvous, response_queue))
187            thread.start()
188
189            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
190            self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
191                             response.status)
192
193            rendezvous.cancel()
194            self._servicer.set(_WATCH_SERVICE,
195                               health_pb2.HealthCheckResponse.SERVING)
196            thread.join()
197
198            # Wait, if necessary, for serving thread to process client cancellation
199            timeout = time.time() + test_constants.TIME_ALLOWANCE
200            while (time.time() < timeout and
201                   self._servicer._send_response_callbacks[_WATCH_SERVICE]):
202                time.sleep(1)
203            self.assertFalse(
204                self._servicer._send_response_callbacks[_WATCH_SERVICE],
205                'watch set should be empty')
206            self.assertTrue(response_queue.empty())
207
208        def test_graceful_shutdown(self):
209            request = health_pb2.HealthCheckRequest(service='')
210            response_queue = queue.Queue()
211            rendezvous = self._stub.Watch(request)
212            thread = threading.Thread(target=_consume_responses,
213                                      args=(rendezvous, response_queue))
214            thread.start()
215
216            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
217            self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
218                             response.status)
219
220            self._servicer.enter_graceful_shutdown()
221            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
222            self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
223                             response.status)
224
225            # This should be a no-op.
226            self._servicer.set('', health_pb2.HealthCheckResponse.SERVING)
227
228            rendezvous.cancel()
229            thread.join()
230            self.assertTrue(response_queue.empty())
231
232
233class HealthServicerTest(BaseWatchTests.WatchTests):
234
235    def setUp(self):
236        self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
237        super(HealthServicerTest,
238              self).start_server(non_blocking=True,
239                                 thread_pool=self._thread_pool)
240
241    def test_check_empty_service(self):
242        request = health_pb2.HealthCheckRequest()
243        resp = self._stub.Check(request)
244        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
245
246    def test_check_serving_service(self):
247        request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
248        resp = self._stub.Check(request)
249        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
250
251    def test_check_unknown_service(self):
252        request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
253        resp = self._stub.Check(request)
254        self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
255
256    def test_check_not_serving_service(self):
257        request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
258        resp = self._stub.Check(request)
259        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
260                         resp.status)
261
262    def test_check_not_found_service(self):
263        request = health_pb2.HealthCheckRequest(service='not-found')
264        with self.assertRaises(grpc.RpcError) as context:
265            resp = self._stub.Check(request)
266
267        self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
268
269    def test_health_service_name(self):
270        self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
271
272
273class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests):
274
275    def setUp(self):
276        super(HealthServicerBackwardsCompatibleWatchTest,
277              self).start_server(non_blocking=False, thread_pool=None)
278
279
280if __name__ == '__main__':
281    logging.basicConfig()
282    unittest.main(verbosity=2)
283