• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2016 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_health.v1.health."""
15
16import logging
17import queue
18import sys
19import threading
20import time
21import unittest
22
23import grpc
24from grpc_health.v1 import health
25from grpc_health.v1 import health_pb2
26from grpc_health.v1 import health_pb2_grpc
27
28from tests.unit import test_common
29from tests.unit import thread_pool
30from tests.unit.framework.common import test_constants
31
32_SERVING_SERVICE = "grpc.test.TestServiceServing"
33_UNKNOWN_SERVICE = "grpc.test.TestServiceUnknown"
34_NOT_SERVING_SERVICE = "grpc.test.TestServiceNotServing"
35_WATCH_SERVICE = "grpc.test.WatchService"
36
37
38def _consume_responses(response_iterator, response_queue):
39    for response in response_iterator:
40        response_queue.put(response)
41
42
43class BaseWatchTests(object):
44    @unittest.skipIf(
45        sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
46    )
47    class WatchTests(unittest.TestCase):
48        def start_server(self, non_blocking=False, thread_pool=None):
49            self._thread_pool = thread_pool
50            self._servicer = health.HealthServicer(
51                experimental_non_blocking=non_blocking,
52                experimental_thread_pool=thread_pool,
53            )
54            self._servicer.set(
55                _SERVING_SERVICE, health_pb2.HealthCheckResponse.SERVING
56            )
57            self._servicer.set(
58                _UNKNOWN_SERVICE, health_pb2.HealthCheckResponse.UNKNOWN
59            )
60            self._servicer.set(
61                _NOT_SERVING_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING
62            )
63            self._server = test_common.test_server()
64            port = self._server.add_insecure_port("[::]:0")
65            health_pb2_grpc.add_HealthServicer_to_server(
66                self._servicer, self._server
67            )
68            self._server.start()
69
70            self._channel = grpc.insecure_channel("localhost:%d" % port)
71            self._stub = health_pb2_grpc.HealthStub(self._channel)
72
73        def tearDown(self):
74            self._server.stop(None)
75            self._channel.close()
76
77        def test_watch_empty_service(self):
78            request = health_pb2.HealthCheckRequest(service="")
79            response_queue = queue.Queue()
80            rendezvous = self._stub.Watch(request)
81            thread = threading.Thread(
82                target=_consume_responses, args=(rendezvous, response_queue)
83            )
84            thread.start()
85
86            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
87            self.assertEqual(
88                health_pb2.HealthCheckResponse.SERVING, response.status
89            )
90
91            rendezvous.cancel()
92            thread.join()
93            self.assertTrue(response_queue.empty())
94
95            if self._thread_pool is not None:
96                self.assertTrue(self._thread_pool.was_used())
97
98        def test_watch_new_service(self):
99            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
100            response_queue = queue.Queue()
101            rendezvous = self._stub.Watch(request)
102            thread = threading.Thread(
103                target=_consume_responses, args=(rendezvous, response_queue)
104            )
105            thread.start()
106
107            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
108            self.assertEqual(
109                health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status
110            )
111
112            self._servicer.set(
113                _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
114            )
115            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
116            self.assertEqual(
117                health_pb2.HealthCheckResponse.SERVING, response.status
118            )
119
120            self._servicer.set(
121                _WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING
122            )
123            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
124            self.assertEqual(
125                health_pb2.HealthCheckResponse.NOT_SERVING, response.status
126            )
127
128            rendezvous.cancel()
129            thread.join()
130            self.assertTrue(response_queue.empty())
131
132        def test_watch_service_isolation(self):
133            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
134            response_queue = queue.Queue()
135            rendezvous = self._stub.Watch(request)
136            thread = threading.Thread(
137                target=_consume_responses, args=(rendezvous, response_queue)
138            )
139            thread.start()
140
141            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
142            self.assertEqual(
143                health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status
144            )
145
146            self._servicer.set(
147                "some-other-service", health_pb2.HealthCheckResponse.SERVING
148            )
149            with self.assertRaises(queue.Empty):
150                response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
151
152            rendezvous.cancel()
153            thread.join()
154            self.assertTrue(response_queue.empty())
155
156        def test_two_watchers(self):
157            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
158            response_queue1 = queue.Queue()
159            response_queue2 = queue.Queue()
160            rendezvous1 = self._stub.Watch(request)
161            rendezvous2 = self._stub.Watch(request)
162            thread1 = threading.Thread(
163                target=_consume_responses, args=(rendezvous1, response_queue1)
164            )
165            thread2 = threading.Thread(
166                target=_consume_responses, args=(rendezvous2, response_queue2)
167            )
168            thread1.start()
169            thread2.start()
170
171            response1 = response_queue1.get(
172                timeout=test_constants.SHORT_TIMEOUT
173            )
174            response2 = response_queue2.get(
175                timeout=test_constants.SHORT_TIMEOUT
176            )
177            self.assertEqual(
178                health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response1.status
179            )
180            self.assertEqual(
181                health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response2.status
182            )
183
184            self._servicer.set(
185                _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
186            )
187            response1 = response_queue1.get(
188                timeout=test_constants.SHORT_TIMEOUT
189            )
190            response2 = response_queue2.get(
191                timeout=test_constants.SHORT_TIMEOUT
192            )
193            self.assertEqual(
194                health_pb2.HealthCheckResponse.SERVING, response1.status
195            )
196            self.assertEqual(
197                health_pb2.HealthCheckResponse.SERVING, response2.status
198            )
199
200            rendezvous1.cancel()
201            rendezvous2.cancel()
202            thread1.join()
203            thread2.join()
204            self.assertTrue(response_queue1.empty())
205            self.assertTrue(response_queue2.empty())
206
207        @unittest.skip("https://github.com/grpc/grpc/issues/18127")
208        def test_cancelled_watch_removed_from_watch_list(self):
209            request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
210            response_queue = queue.Queue()
211            rendezvous = self._stub.Watch(request)
212            thread = threading.Thread(
213                target=_consume_responses, args=(rendezvous, response_queue)
214            )
215            thread.start()
216
217            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
218            self.assertEqual(
219                health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status
220            )
221
222            rendezvous.cancel()
223            self._servicer.set(
224                _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
225            )
226            thread.join()
227
228            # Wait, if necessary, for serving thread to process client cancellation
229            timeout = time.time() + test_constants.TIME_ALLOWANCE
230            while (
231                time.time() < timeout
232                and self._servicer._send_response_callbacks[_WATCH_SERVICE]
233            ):
234                time.sleep(1)
235            self.assertFalse(
236                self._servicer._send_response_callbacks[_WATCH_SERVICE],
237                "watch set should be empty",
238            )
239            self.assertTrue(response_queue.empty())
240
241        def test_graceful_shutdown(self):
242            request = health_pb2.HealthCheckRequest(service="")
243            response_queue = queue.Queue()
244            rendezvous = self._stub.Watch(request)
245            thread = threading.Thread(
246                target=_consume_responses, args=(rendezvous, response_queue)
247            )
248            thread.start()
249
250            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
251            self.assertEqual(
252                health_pb2.HealthCheckResponse.SERVING, response.status
253            )
254
255            self._servicer.enter_graceful_shutdown()
256            response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT)
257            self.assertEqual(
258                health_pb2.HealthCheckResponse.NOT_SERVING, response.status
259            )
260
261            # This should be a no-op.
262            self._servicer.set("", health_pb2.HealthCheckResponse.SERVING)
263
264            rendezvous.cancel()
265            thread.join()
266            self.assertTrue(response_queue.empty())
267
268
269@unittest.skipIf(
270    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
271)
272class HealthServicerTest(BaseWatchTests.WatchTests):
273    def setUp(self):
274        self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None)
275        super(HealthServicerTest, self).start_server(
276            non_blocking=True, thread_pool=self._thread_pool
277        )
278
279    def test_check_empty_service(self):
280        request = health_pb2.HealthCheckRequest()
281        resp = self._stub.Check(request)
282        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
283
284    def test_check_serving_service(self):
285        request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
286        resp = self._stub.Check(request)
287        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
288
289    def test_check_unknown_service(self):
290        request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
291        resp = self._stub.Check(request)
292        self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
293
294    def test_check_not_serving_service(self):
295        request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
296        resp = self._stub.Check(request)
297        self.assertEqual(
298            health_pb2.HealthCheckResponse.NOT_SERVING, resp.status
299        )
300
301    def test_check_not_found_service(self):
302        request = health_pb2.HealthCheckRequest(service="not-found")
303        with self.assertRaises(grpc.RpcError) as context:
304            resp = self._stub.Check(request)
305
306        self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
307
308    def test_health_service_name(self):
309        self.assertEqual(health.SERVICE_NAME, "grpc.health.v1.Health")
310
311
312@unittest.skipIf(
313    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
314)
315class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests):
316    def setUp(self):
317        super(HealthServicerBackwardsCompatibleWatchTest, self).start_server(
318            non_blocking=False, thread_pool=None
319        )
320
321
322if __name__ == "__main__":
323    logging.basicConfig()
324    unittest.main(verbosity=2)
325