1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests AsyncIO version of grpcio-health-checking.""" 15 16import asyncio 17import logging 18import random 19import time 20import unittest 21 22import grpc 23from grpc.experimental import aio 24from grpc_health.v1 import health 25from grpc_health.v1 import health_pb2 26from grpc_health.v1 import health_pb2_grpc 27 28from tests.unit.framework.common import test_constants 29from tests_aio.unit._test_base import AioTestBase 30 31_SERVING_SERVICE = "grpc.test.TestServiceServing" 32_UNKNOWN_SERVICE = "grpc.test.TestServiceUnknown" 33_NOT_SERVING_SERVICE = "grpc.test.TestServiceNotServing" 34_WATCH_SERVICE = "grpc.test.WatchService" 35 36_LARGE_NUMBER_OF_STATUS_CHANGES = 1000 37 38 39async def _pipe_to_queue(call, queue): 40 async for response in call: 41 await queue.put(response) 42 43 44class HealthServicerTest(AioTestBase): 45 async def setUp(self): 46 self._servicer = health.aio.HealthServicer() 47 await self._servicer.set( 48 _SERVING_SERVICE, health_pb2.HealthCheckResponse.SERVING 49 ) 50 await self._servicer.set( 51 _UNKNOWN_SERVICE, health_pb2.HealthCheckResponse.UNKNOWN 52 ) 53 await self._servicer.set( 54 _NOT_SERVING_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING 55 ) 56 self._server = aio.server() 57 port = self._server.add_insecure_port("[::]:0") 58 health_pb2_grpc.add_HealthServicer_to_server( 59 self._servicer, self._server 60 ) 61 await self._server.start() 62 63 self._channel = aio.insecure_channel("localhost:%d" % port) 64 self._stub = health_pb2_grpc.HealthStub(self._channel) 65 66 async def tearDown(self): 67 await self._channel.close() 68 await self._server.stop(None) 69 70 async def test_check_empty_service(self): 71 request = health_pb2.HealthCheckRequest() 72 resp = await self._stub.Check(request) 73 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 74 75 async def test_check_serving_service(self): 76 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) 77 resp = await self._stub.Check(request) 78 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 79 80 async def test_check_unknown_service(self): 81 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) 82 resp = await self._stub.Check(request) 83 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) 84 85 async def test_check_not_serving_service(self): 86 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) 87 resp = await self._stub.Check(request) 88 self.assertEqual( 89 health_pb2.HealthCheckResponse.NOT_SERVING, resp.status 90 ) 91 92 async def test_check_not_found_service(self): 93 request = health_pb2.HealthCheckRequest(service="not-found") 94 with self.assertRaises(aio.AioRpcError) as context: 95 await self._stub.Check(request) 96 97 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) 98 99 async def test_health_service_name(self): 100 self.assertEqual(health.SERVICE_NAME, "grpc.health.v1.Health") 101 102 async def test_watch_empty_service(self): 103 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) 104 105 call = self._stub.Watch(request) 106 queue = asyncio.Queue() 107 task = self.loop.create_task(_pipe_to_queue(call, queue)) 108 109 self.assertEqual( 110 health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status 111 ) 112 113 call.cancel() 114 115 with self.assertRaises(asyncio.CancelledError): 116 await task 117 118 self.assertTrue(queue.empty()) 119 120 async def test_watch_new_service(self): 121 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 122 call = self._stub.Watch(request) 123 queue = asyncio.Queue() 124 task = self.loop.create_task(_pipe_to_queue(call, queue)) 125 126 self.assertEqual( 127 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 128 (await queue.get()).status, 129 ) 130 131 await self._servicer.set( 132 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 133 ) 134 self.assertEqual( 135 health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status 136 ) 137 138 await self._servicer.set( 139 _WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING 140 ) 141 self.assertEqual( 142 health_pb2.HealthCheckResponse.NOT_SERVING, 143 (await queue.get()).status, 144 ) 145 146 call.cancel() 147 148 with self.assertRaises(asyncio.CancelledError): 149 await task 150 151 self.assertTrue(queue.empty()) 152 153 async def test_watch_service_isolation(self): 154 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 155 call = self._stub.Watch(request) 156 queue = asyncio.Queue() 157 task = self.loop.create_task(_pipe_to_queue(call, queue)) 158 159 self.assertEqual( 160 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 161 (await queue.get()).status, 162 ) 163 164 await self._servicer.set( 165 "some-other-service", health_pb2.HealthCheckResponse.SERVING 166 ) 167 # The change of health status in other service should be isolated. 168 # Hence, no additional notification should be observed. 169 with self.assertRaises(asyncio.TimeoutError): 170 await asyncio.wait_for(queue.get(), test_constants.SHORT_TIMEOUT) 171 172 call.cancel() 173 174 with self.assertRaises(asyncio.CancelledError): 175 await task 176 177 self.assertTrue(queue.empty()) 178 179 async def test_two_watchers(self): 180 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 181 queue1 = asyncio.Queue() 182 queue2 = asyncio.Queue() 183 call1 = self._stub.Watch(request) 184 call2 = self._stub.Watch(request) 185 task1 = self.loop.create_task(_pipe_to_queue(call1, queue1)) 186 task2 = self.loop.create_task(_pipe_to_queue(call2, queue2)) 187 188 self.assertEqual( 189 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 190 (await queue1.get()).status, 191 ) 192 self.assertEqual( 193 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 194 (await queue2.get()).status, 195 ) 196 197 await self._servicer.set( 198 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 199 ) 200 self.assertEqual( 201 health_pb2.HealthCheckResponse.SERVING, (await queue1.get()).status 202 ) 203 self.assertEqual( 204 health_pb2.HealthCheckResponse.SERVING, (await queue2.get()).status 205 ) 206 207 call1.cancel() 208 call2.cancel() 209 210 with self.assertRaises(asyncio.CancelledError): 211 await task1 212 213 with self.assertRaises(asyncio.CancelledError): 214 await task2 215 216 self.assertTrue(queue1.empty()) 217 self.assertTrue(queue2.empty()) 218 219 async def test_cancelled_watch_removed_from_watch_list(self): 220 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 221 call = self._stub.Watch(request) 222 queue = asyncio.Queue() 223 task = self.loop.create_task(_pipe_to_queue(call, queue)) 224 225 self.assertEqual( 226 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 227 (await queue.get()).status, 228 ) 229 230 call.cancel() 231 await self._servicer.set( 232 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 233 ) 234 235 with self.assertRaises(asyncio.CancelledError): 236 await task 237 238 # Wait for the serving coroutine to process client cancellation. 239 timeout = time.monotonic() + test_constants.TIME_ALLOWANCE 240 while time.monotonic() < timeout and self._servicer._server_watchers: 241 await asyncio.sleep(1) 242 self.assertFalse( 243 self._servicer._server_watchers, 244 "There should not be any watcher left", 245 ) 246 self.assertTrue(queue.empty()) 247 248 async def test_graceful_shutdown(self): 249 request = health_pb2.HealthCheckRequest(service=health.OVERALL_HEALTH) 250 call = self._stub.Watch(request) 251 queue = asyncio.Queue() 252 task = self.loop.create_task(_pipe_to_queue(call, queue)) 253 254 self.assertEqual( 255 health_pb2.HealthCheckResponse.SERVING, (await queue.get()).status 256 ) 257 258 await self._servicer.enter_graceful_shutdown() 259 self.assertEqual( 260 health_pb2.HealthCheckResponse.NOT_SERVING, 261 (await queue.get()).status, 262 ) 263 264 # This should be a no-op. 265 await self._servicer.set( 266 health.OVERALL_HEALTH, health_pb2.HealthCheckResponse.SERVING 267 ) 268 269 resp = await self._stub.Check(request) 270 self.assertEqual( 271 health_pb2.HealthCheckResponse.NOT_SERVING, resp.status 272 ) 273 274 call.cancel() 275 276 with self.assertRaises(asyncio.CancelledError): 277 await task 278 279 self.assertTrue(queue.empty()) 280 281 async def test_no_duplicate_status(self): 282 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 283 call = self._stub.Watch(request) 284 queue = asyncio.Queue() 285 task = self.loop.create_task(_pipe_to_queue(call, queue)) 286 287 self.assertEqual( 288 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 289 (await queue.get()).status, 290 ) 291 last_status = health_pb2.HealthCheckResponse.SERVICE_UNKNOWN 292 293 for _ in range(_LARGE_NUMBER_OF_STATUS_CHANGES): 294 if random.randint(0, 1) == 0: 295 status = health_pb2.HealthCheckResponse.SERVING 296 else: 297 status = health_pb2.HealthCheckResponse.NOT_SERVING 298 299 await self._servicer.set(_WATCH_SERVICE, status) 300 if status != last_status: 301 self.assertEqual(status, (await queue.get()).status) 302 last_status = status 303 304 call.cancel() 305 306 with self.assertRaises(asyncio.CancelledError): 307 await task 308 309 self.assertTrue(queue.empty()) 310 311 312if __name__ == "__main__": 313 logging.basicConfig(level=logging.DEBUG) 314 unittest.main(verbosity=2) 315