• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests AsyncIO version of grpcio-health-checking."""
15
16import asyncio
17import logging
18import random
19import time
20import unittest
21
22import grpc
23from grpc.experimental import aio
24from grpc_health.v1 import health
25from grpc_health.v1 import health_pb2
26from grpc_health.v1 import health_pb2_grpc
27
28from tests.unit.framework.common import test_constants
29from tests_aio.unit._test_base import AioTestBase
30
31_SERVING_SERVICE = "grpc.test.TestServiceServing"
32_UNKNOWN_SERVICE = "grpc.test.TestServiceUnknown"
33_NOT_SERVING_SERVICE = "grpc.test.TestServiceNotServing"
34_WATCH_SERVICE = "grpc.test.WatchService"
35
36_LARGE_NUMBER_OF_STATUS_CHANGES = 1000
37
38
39async def _pipe_to_queue(call, queue):
40    async for response in call:
41        await queue.put(response)
42
43
44class HealthServicerTest(AioTestBase):
45    async def setUp(self):
46        self._servicer = health.aio.HealthServicer()
47        await self._servicer.set(
48            _SERVING_SERVICE, health_pb2.HealthCheckResponse.SERVING
49        )
50        await self._servicer.set(
51            _UNKNOWN_SERVICE, health_pb2.HealthCheckResponse.UNKNOWN
52        )
53        await self._servicer.set(
54            _NOT_SERVING_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING
55        )
56        self._server = aio.server()
57        port = self._server.add_insecure_port("[::]:0")
58        health_pb2_grpc.add_HealthServicer_to_server(
59            self._servicer, self._server
60        )
61        await self._server.start()
62
63        self._channel = aio.insecure_channel("localhost:%d" % port)
64        self._stub = health_pb2_grpc.HealthStub(self._channel)
65
66    async def tearDown(self):
67        await self._channel.close()
68        await self._server.stop(None)
69
70    async def test_check_empty_service(self):
71        request = health_pb2.HealthCheckRequest()
72        resp = await self._stub.Check(request)
73        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
74
75    async def test_check_serving_service(self):
76        request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
77        resp = await self._stub.Check(request)
78        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
79
80    async def test_check_unknown_service(self):
81        request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
82        resp = await self._stub.Check(request)
83        self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
84
85    async def test_check_not_serving_service(self):
86        request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
87        resp = await self._stub.Check(request)
88        self.assertEqual(
89            health_pb2.HealthCheckResponse.NOT_SERVING, resp.status
90        )
91
92    async def test_check_not_found_service(self):
93        request = health_pb2.HealthCheckRequest(service="not-found")
94        with self.assertRaises(aio.AioRpcError) as context:
95            await self._stub.Check(request)
96
97        self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
98
99    async def test_health_service_name(self):
100        self.assertEqual(health.SERVICE_NAME, "grpc.health.v1.Health")
101
102    async def test_watch_empty_service(self):
103        request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
104
105        call = self._stub.Watch(request)
106        queue = asyncio.Queue()
107        task = self.loop.create_task(_pipe_to_queue(call, queue))
108
109        self.assertEqual(
110            health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status
111        )
112
113        call.cancel()
114
115        with self.assertRaises(asyncio.CancelledError):
116            await task
117
118        self.assertTrue(queue.empty())
119
120    async def test_watch_new_service(self):
121        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
122        call = self._stub.Watch(request)
123        queue = asyncio.Queue()
124        task = self.loop.create_task(_pipe_to_queue(call, queue))
125
126        self.assertEqual(
127            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
128            (await queue.get()).status,
129        )
130
131        await self._servicer.set(
132            _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
133        )
134        self.assertEqual(
135            health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status
136        )
137
138        await self._servicer.set(
139            _WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING
140        )
141        self.assertEqual(
142            health_pb2.HealthCheckResponse.NOT_SERVING,
143            (await queue.get()).status,
144        )
145
146        call.cancel()
147
148        with self.assertRaises(asyncio.CancelledError):
149            await task
150
151        self.assertTrue(queue.empty())
152
153    async def test_watch_service_isolation(self):
154        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
155        call = self._stub.Watch(request)
156        queue = asyncio.Queue()
157        task = self.loop.create_task(_pipe_to_queue(call, queue))
158
159        self.assertEqual(
160            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
161            (await queue.get()).status,
162        )
163
164        await self._servicer.set(
165            "some-other-service", health_pb2.HealthCheckResponse.SERVING
166        )
167        # The change of health status in other service should be isolated.
168        # Hence, no additional notification should be observed.
169        with self.assertRaises(asyncio.TimeoutError):
170            await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT)
171
172        call.cancel()
173
174        with self.assertRaises(asyncio.CancelledError):
175            await task
176
177        self.assertTrue(queue.empty())
178
179    async def test_two_watchers(self):
180        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
181        queue1 = asyncio.Queue()
182        queue2 = asyncio.Queue()
183        call1 = self._stub.Watch(request)
184        call2 = self._stub.Watch(request)
185        task1 = self.loop.create_task(_pipe_to_queue(call1, queue1))
186        task2 = self.loop.create_task(_pipe_to_queue(call2, queue2))
187
188        self.assertEqual(
189            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
190            (await queue1.get()).status,
191        )
192        self.assertEqual(
193            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
194            (await queue2.get()).status,
195        )
196
197        await self._servicer.set(
198            _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
199        )
200        self.assertEqual(
201            health_pb2.HealthCheckResponse.SERVING, (await queue1.get()).status
202        )
203        self.assertEqual(
204            health_pb2.HealthCheckResponse.SERVING, (await queue2.get()).status
205        )
206
207        call1.cancel()
208        call2.cancel()
209
210        with self.assertRaises(asyncio.CancelledError):
211            await task1
212
213        with self.assertRaises(asyncio.CancelledError):
214            await task2
215
216        self.assertTrue(queue1.empty())
217        self.assertTrue(queue2.empty())
218
219    async def test_cancelled_watch_removed_from_watch_list(self):
220        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
221        call = self._stub.Watch(request)
222        queue = asyncio.Queue()
223        task = self.loop.create_task(_pipe_to_queue(call, queue))
224
225        self.assertEqual(
226            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
227            (await queue.get()).status,
228        )
229
230        call.cancel()
231        await self._servicer.set(
232            _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING
233        )
234
235        with self.assertRaises(asyncio.CancelledError):
236            await task
237
238        # Wait for the serving coroutine to process client cancellation.
239        timeout = time.monotonic() + test_constants.TIME_ALLOWANCE
240        while time.monotonic() < timeout and self._servicer._server_watchers:
241            await asyncio.sleep(1)
242        self.assertFalse(
243            self._servicer._server_watchers,
244            "There should not be any watcher left",
245        )
246        self.assertTrue(queue.empty())
247
248    async def test_graceful_shutdown(self):
249        request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
250        call = self._stub.Watch(request)
251        queue = asyncio.Queue()
252        task = self.loop.create_task(_pipe_to_queue(call, queue))
253
254        self.assertEqual(
255            health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status
256        )
257
258        await self._servicer.enter_graceful_shutdown()
259        self.assertEqual(
260            health_pb2.HealthCheckResponse.NOT_SERVING,
261            (await queue.get()).status,
262        )
263
264        # This should be a no-op.
265        await self._servicer.set(
266            health.OVERALL_HEALTH, health_pb2.HealthCheckResponse.SERVING
267        )
268
269        resp = await self._stub.Check(request)
270        self.assertEqual(
271            health_pb2.HealthCheckResponse.NOT_SERVING, resp.status
272        )
273
274        call.cancel()
275
276        with self.assertRaises(asyncio.CancelledError):
277            await task
278
279        self.assertTrue(queue.empty())
280
281    async def test_no_duplicate_status(self):
282        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
283        call = self._stub.Watch(request)
284        queue = asyncio.Queue()
285        task = self.loop.create_task(_pipe_to_queue(call, queue))
286
287        self.assertEqual(
288            health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
289            (await queue.get()).status,
290        )
291        last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
292
293        for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES):
294            if random.randint(0, 1) == 0:
295                status = health_pb2.HealthCheckResponse.SERVING
296            else:
297                status = health_pb2.HealthCheckResponse.NOT_SERVING
298
299            await self._servicer.set(_WATCH_SERVICE, status)
300            if status != last_status:
301                self.assertEqual(status, (await queue.get()).status)
302            last_status = status
303
304        call.cancel()
305
306        with self.assertRaises(asyncio.CancelledError):
307            await task
308
309        self.assertTrue(queue.empty())
310
311
312if __name__ == "__main__":
313    logging.basicConfig(level=logging.DEBUG)
314    unittest.main(verbosity=2)
315