• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests AsyncIO version of grpcio-health-checking."""
15
16import asyncio
17import logging
18import time
19import random
20import unittest
21
22import grpc
23
24from grpc_health.v1 import health
25from grpc_health.v1 import health_pb2
26from grpc_health.v1 import health_pb2_grpc
27from grpc.experimental import aio
28
29from tests.unit.framework.common import test_constants
30
31from tests_aio.unit._test_base import AioTestBase
32
33_SERVING_SERVICE = 'grpc.test.TestServiceServing'
34_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown'
35_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing'
36_WATCH_SERVICE = 'grpc.test.WatchService'
37
38_LARGE_NUMBER_OF_STATUS_CHANGES = 1000
39
40
41async def _pipe_to_queue(call, queue):
42    async for response in call:
43        await queue.put(response)
44
45
46class HealthServicerTest(AioTestBase):
47
48    async def setUp(self):
49        self._servicer = health.aio.HealthServicer()
50        await self._servicer.set(_SERVING_SERVICE,
51                                 health_pb2.HealthCheckResponse.SERVING)
52        await self._servicer.set(_UNKNOWN_SERVICE,
53                                 health_pb2.HealthCheckResponse.UNKNOWN)
54        await self._servicer.set(_NOT_SERVING_SERVICE,
55                                 health_pb2.HealthCheckResponse.NOT_SERVING)
56        self._server = aio.server()
57        port = self._server.add_insecure_port('[::]:0')
58        health_pb2_grpc.add_HealthServicer_to_server(self._servicer,
59                                                     self._server)
60        await self._server.start()
61
62        self._channel = aio.insecure_channel('localhost:%d' % port)
63        self._stub = health_pb2_grpc.HealthStub(self._channel)
64
65    async def tearDown(self):
66        await self._channel.close()
67        await self._server.stop(None)
68
69    async def test_check_empty_service(self):
70        request = health_pb2.HealthCheckRequest()
71        resp = await self._stub.Check(request)
72        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
73
74    async def test_check_serving_service(self):
75        request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE)
76        resp = await self._stub.Check(request)
77        self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status)
78
79    async def test_check_unknown_service(self):
80        request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE)
81        resp = await self._stub.Check(request)
82        self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status)
83
84    async def test_check_not_serving_service(self):
85        request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE)
86        resp = await self._stub.Check(request)
87        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
88                         resp.status)
89
90    async def test_check_not_found_service(self):
91        request = health_pb2.HealthCheckRequest(service='not-found')
92        with self.assertRaises(aio.AioRpcError) as context:
93            await self._stub.Check(request)
94
95        self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code())
96
97    async def test_health_service_name(self):
98        self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health')
99
100    async def test_watch_empty_service(self):
101        request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
102
103        call = self._stub.Watch(request)
104        queue = asyncio.Queue()
105        task = self.loop.create_task(_pipe_to_queue(call, queue))
106
107        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
108                         (await queue.get()).status)
109
110        call.cancel()
111
112        with self.assertRaises(asyncio.CancelledError):
113            await task
114
115        self.assertTrue(queue.empty())
116
117    async def test_watch_new_service(self):
118        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
119        call = self._stub.Watch(request)
120        queue = asyncio.Queue()
121        task = self.loop.create_task(_pipe_to_queue(call, queue))
122
123        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
124                         (await queue.get()).status)
125
126        await self._servicer.set(_WATCH_SERVICE,
127                                 health_pb2.HealthCheckResponse.SERVING)
128        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
129                         (await queue.get()).status)
130
131        await self._servicer.set(_WATCH_SERVICE,
132                                 health_pb2.HealthCheckResponse.NOT_SERVING)
133        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
134                         (await queue.get()).status)
135
136        call.cancel()
137
138        with self.assertRaises(asyncio.CancelledError):
139            await task
140
141        self.assertTrue(queue.empty())
142
143    async def test_watch_service_isolation(self):
144        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
145        call = self._stub.Watch(request)
146        queue = asyncio.Queue()
147        task = self.loop.create_task(_pipe_to_queue(call, queue))
148
149        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
150                         (await queue.get()).status)
151
152        await self._servicer.set('some-other-service',
153                                 health_pb2.HealthCheckResponse.SERVING)
154        # The change of health status in other service should be isolated.
155        # Hence, no additional notification should be observed.
156        with self.assertRaises(asyncio.TimeoutError):
157            await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT)
158
159        call.cancel()
160
161        with self.assertRaises(asyncio.CancelledError):
162            await task
163
164        self.assertTrue(queue.empty())
165
166    async def test_two_watchers(self):
167        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
168        queue1 = asyncio.Queue()
169        queue2 = asyncio.Queue()
170        call1 = self._stub.Watch(request)
171        call2 = self._stub.Watch(request)
172        task1 = self.loop.create_task(_pipe_to_queue(call1, queue1))
173        task2 = self.loop.create_task(_pipe_to_queue(call2, queue2))
174
175        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
176                         (await queue1.get()).status)
177        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
178                         (await queue2.get()).status)
179
180        await self._servicer.set(_WATCH_SERVICE,
181                                 health_pb2.HealthCheckResponse.SERVING)
182        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
183                         (await queue1.get()).status)
184        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
185                         (await queue2.get()).status)
186
187        call1.cancel()
188        call2.cancel()
189
190        with self.assertRaises(asyncio.CancelledError):
191            await task1
192
193        with self.assertRaises(asyncio.CancelledError):
194            await task2
195
196        self.assertTrue(queue1.empty())
197        self.assertTrue(queue2.empty())
198
199    async def test_cancelled_watch_removed_from_watch_list(self):
200        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
201        call = self._stub.Watch(request)
202        queue = asyncio.Queue()
203        task = self.loop.create_task(_pipe_to_queue(call, queue))
204
205        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
206                         (await queue.get()).status)
207
208        call.cancel()
209        await self._servicer.set(_WATCH_SERVICE,
210                                 health_pb2.HealthCheckResponse.SERVING)
211
212        with self.assertRaises(asyncio.CancelledError):
213            await task
214
215        # Wait for the serving coroutine to process client cancellation.
216        timeout = time.monotonic() + test_constants.TIME_ALLOWANCE
217        while (time.monotonic() < timeout and self._servicer._server_watchers):
218            await asyncio.sleep(1)
219        self.assertFalse(self._servicer._server_watchers,
220                         'There should not be any watcher left')
221        self.assertTrue(queue.empty())
222
223    async def test_graceful_shutdown(self):
224        request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH)
225        call = self._stub.Watch(request)
226        queue = asyncio.Queue()
227        task = self.loop.create_task(_pipe_to_queue(call, queue))
228
229        self.assertEqual(health_pb2.HealthCheckResponse.SERVING,
230                         (await queue.get()).status)
231
232        await self._servicer.enter_graceful_shutdown()
233        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
234                         (await queue.get()).status)
235
236        # This should be a no-op.
237        await self._servicer.set(health.OVERALL_HEALTH,
238                                 health_pb2.HealthCheckResponse.SERVING)
239
240        resp = await self._stub.Check(request)
241        self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING,
242                         resp.status)
243
244        call.cancel()
245
246        with self.assertRaises(asyncio.CancelledError):
247            await task
248
249        self.assertTrue(queue.empty())
250
251    async def test_no_duplicate_status(self):
252        request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE)
253        call = self._stub.Watch(request)
254        queue = asyncio.Queue()
255        task = self.loop.create_task(_pipe_to_queue(call, queue))
256
257        self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN,
258                         (await queue.get()).status)
259        last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN
260
261        for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES):
262            if random.randint(0, 1) == 0:
263                status = health_pb2.HealthCheckResponse.SERVING
264            else:
265                status = health_pb2.HealthCheckResponse.NOT_SERVING
266
267            await self._servicer.set(_WATCH_SERVICE, status)
268            if status != last_status:
269                self.assertEqual(status, (await queue.get()).status)
270            last_status = status
271
272        call.cancel()
273
274        with self.assertRaises(asyncio.CancelledError):
275            await task
276
277        self.assertTrue(queue.empty())
278
279
280if __name__ == '__main__':
281    logging.basicConfig(level=logging.DEBUG)
282    unittest.main(verbosity=2)
283