1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests AsyncIO version of grpcio-health-checking.""" 15 16import asyncio 17import logging 18import time 19import random 20import unittest 21 22import grpc 23 24from grpc_health.v1 import health 25from grpc_health.v1 import health_pb2 26from grpc_health.v1 import health_pb2_grpc 27from grpc.experimental import aio 28 29from tests.unit.framework.common import test_constants 30 31from tests_aio.unit._test_base import AioTestBase 32 33_SERVING_SERVICE = 'grpc.test.TestServiceServing' 34_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' 35_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' 36_WATCH_SERVICE = 'grpc.test.WatchService' 37 38_LARGE_NUMBER_OF_STATUS_CHANGES = 1000 39 40 41async def _pipe_to_queue(call, queue): 42 async for response in call: 43 await queue.put(response) 44 45 46class HealthServicerTest(AioTestBase): 47 48 async def setUp(self): 49 self._servicer = health.aio.HealthServicer() 50 await self._servicer.set(_SERVING_SERVICE, 51 health_pb2.HealthCheckResponse.SERVING) 52 await self._servicer.set(_UNKNOWN_SERVICE, 53 health_pb2.HealthCheckResponse.UNKNOWN) 54 await self._servicer.set(_NOT_SERVING_SERVICE, 55 health_pb2.HealthCheckResponse.NOT_SERVING) 56 self._server = aio.server() 57 port = self._server.add_insecure_port('[::]:0') 58 health_pb2_grpc.add_HealthServicer_to_server(self._servicer, 59 self._server) 60 await self._server.start() 61 62 self._channel = aio.insecure_channel('localhost:%d' % port) 63 self._stub = health_pb2_grpc.HealthStub(self._channel) 64 65 async def tearDown(self): 66 await self._channel.close() 67 await self._server.stop(None) 68 69 async def test_check_empty_service(self): 70 request = health_pb2.HealthCheckRequest() 71 resp = await self._stub.Check(request) 72 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 73 74 async def test_check_serving_service(self): 75 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) 76 resp = await self._stub.Check(request) 77 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 78 79 async def test_check_unknown_service(self): 80 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) 81 resp = await self._stub.Check(request) 82 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) 83 84 async def test_check_not_serving_service(self): 85 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) 86 resp = await self._stub.Check(request) 87 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 88 resp.status) 89 90 async def test_check_not_found_service(self): 91 request = health_pb2.HealthCheckRequest(service='not-found') 92 with self.assertRaises(aio.AioRpcError) as context: 93 await self._stub.Check(request) 94 95 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) 96 97 async def test_health_service_name(self): 98 self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') 99 100 async def test_watch_empty_service(self): 101 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) 102 103 call = self._stub.Watch(request) 104 queue = asyncio.Queue() 105 task = self.loop.create_task(_pipe_to_queue(call, queue)) 106 107 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 108 (await queue.get()).status) 109 110 call.cancel() 111 112 with self.assertRaises(asyncio.CancelledError): 113 await task 114 115 self.assertTrue(queue.empty()) 116 117 async def test_watch_new_service(self): 118 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 119 call = self._stub.Watch(request) 120 queue = asyncio.Queue() 121 task = self.loop.create_task(_pipe_to_queue(call, queue)) 122 123 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 124 (await queue.get()).status) 125 126 await self._servicer.set(_WATCH_SERVICE, 127 health_pb2.HealthCheckResponse.SERVING) 128 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 129 (await queue.get()).status) 130 131 await self._servicer.set(_WATCH_SERVICE, 132 health_pb2.HealthCheckResponse.NOT_SERVING) 133 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 134 (await queue.get()).status) 135 136 call.cancel() 137 138 with self.assertRaises(asyncio.CancelledError): 139 await task 140 141 self.assertTrue(queue.empty()) 142 143 async def test_watch_service_isolation(self): 144 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 145 call = self._stub.Watch(request) 146 queue = asyncio.Queue() 147 task = self.loop.create_task(_pipe_to_queue(call, queue)) 148 149 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 150 (await queue.get()).status) 151 152 await self._servicer.set('some-other-service', 153 health_pb2.HealthCheckResponse.SERVING) 154 # The change of health status in other service should be isolated. 155 # Hence, no additional notification should be observed. 156 with self.assertRaises(asyncio.TimeoutError): 157 await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT) 158 159 call.cancel() 160 161 with self.assertRaises(asyncio.CancelledError): 162 await task 163 164 self.assertTrue(queue.empty()) 165 166 async def test_two_watchers(self): 167 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 168 queue1 = asyncio.Queue() 169 queue2 = asyncio.Queue() 170 call1 = self._stub.Watch(request) 171 call2 = self._stub.Watch(request) 172 task1 = self.loop.create_task(_pipe_to_queue(call1, queue1)) 173 task2 = self.loop.create_task(_pipe_to_queue(call2, queue2)) 174 175 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 176 (await queue1.get()).status) 177 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 178 (await queue2.get()).status) 179 180 await self._servicer.set(_WATCH_SERVICE, 181 health_pb2.HealthCheckResponse.SERVING) 182 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 183 (await queue1.get()).status) 184 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 185 (await queue2.get()).status) 186 187 call1.cancel() 188 call2.cancel() 189 190 with self.assertRaises(asyncio.CancelledError): 191 await task1 192 193 with self.assertRaises(asyncio.CancelledError): 194 await task2 195 196 self.assertTrue(queue1.empty()) 197 self.assertTrue(queue2.empty()) 198 199 async def test_cancelled_watch_removed_from_watch_list(self): 200 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 201 call = self._stub.Watch(request) 202 queue = asyncio.Queue() 203 task = self.loop.create_task(_pipe_to_queue(call, queue)) 204 205 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 206 (await queue.get()).status) 207 208 call.cancel() 209 await self._servicer.set(_WATCH_SERVICE, 210 health_pb2.HealthCheckResponse.SERVING) 211 212 with self.assertRaises(asyncio.CancelledError): 213 await task 214 215 # Wait for the serving coroutine to process client cancellation. 216 timeout = time.monotonic() + test_constants.TIME_ALLOWANCE 217 while (time.monotonic() < timeout and self._servicer._server_watchers): 218 await asyncio.sleep(1) 219 self.assertFalse(self._servicer._server_watchers, 220 'There should not be any watcher left') 221 self.assertTrue(queue.empty()) 222 223 async def test_graceful_shutdown(self): 224 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) 225 call = self._stub.Watch(request) 226 queue = asyncio.Queue() 227 task = self.loop.create_task(_pipe_to_queue(call, queue)) 228 229 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 230 (await queue.get()).status) 231 232 await self._servicer.enter_graceful_shutdown() 233 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 234 (await queue.get()).status) 235 236 # This should be a no-op. 237 await self._servicer.set(health.OVERALL_HEALTH, 238 health_pb2.HealthCheckResponse.SERVING) 239 240 resp = await self._stub.Check(request) 241 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 242 resp.status) 243 244 call.cancel() 245 246 with self.assertRaises(asyncio.CancelledError): 247 await task 248 249 self.assertTrue(queue.empty()) 250 251 async def test_no_duplicate_status(self): 252 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 253 call = self._stub.Watch(request) 254 queue = asyncio.Queue() 255 task = self.loop.create_task(_pipe_to_queue(call, queue)) 256 257 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 258 (await queue.get()).status) 259 last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN 260 261 for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES): 262 if random.randint(0, 1) == 0: 263 status = health_pb2.HealthCheckResponse.SERVING 264 else: 265 status = health_pb2.HealthCheckResponse.NOT_SERVING 266 267 await self._servicer.set(_WATCH_SERVICE, status) 268 if status != last_status: 269 self.assertEqual(status, (await queue.get()).status) 270 last_status = status 271 272 call.cancel() 273 274 with self.assertRaises(asyncio.CancelledError): 275 await task 276 277 self.assertTrue(queue.empty()) 278 279 280if __name__ == '__main__': 281 logging.basicConfig(level=logging.DEBUG) 282 unittest.main(verbosity=2) 283