1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_health.v1.health.""" 15 16import logging 17import queue 18import sys 19import threading 20import time 21import unittest 22 23import grpc 24from grpc_health.v1 import health 25from grpc_health.v1 import health_pb2 26from grpc_health.v1 import health_pb2_grpc 27 28from tests.unit import test_common 29from tests.unit import thread_pool 30from tests.unit.framework.common import test_constants 31 32_SERVING_SERVICE = "grpc.test.TestServiceServing" 33_UNKNOWN_SERVICE = "grpc.test.TestServiceUnknown" 34_NOT_SERVING_SERVICE = "grpc.test.TestServiceNotServing" 35_WATCH_SERVICE = "grpc.test.WatchService" 36 37 38def _consume_responses(response_iterator, response_queue): 39 for response in response_iterator: 40 response_queue.put(response) 41 42 43class BaseWatchTests(object): 44 @unittest.skipIf( 45 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 46 ) 47 class WatchTests(unittest.TestCase): 48 def start_server(self, non_blocking=False, thread_pool=None): 49 self._thread_pool = thread_pool 50 self._servicer = health.HealthServicer( 51 experimental_non_blocking=non_blocking, 52 experimental_thread_pool=thread_pool, 53 ) 54 self._servicer.set( 55 _SERVING_SERVICE, health_pb2.HealthCheckResponse.SERVING 56 ) 57 self._servicer.set( 58 _UNKNOWN_SERVICE, health_pb2.HealthCheckResponse.UNKNOWN 59 ) 60 self._servicer.set( 61 _NOT_SERVING_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING 62 ) 63 self._server = test_common.test_server() 64 port = self._server.add_insecure_port("[::]:0") 65 health_pb2_grpc.add_HealthServicer_to_server( 66 self._servicer, self._server 67 ) 68 self._server.start() 69 70 self._channel = grpc.insecure_channel("localhost:%d" % port) 71 self._stub = health_pb2_grpc.HealthStub(self._channel) 72 73 def tearDown(self): 74 self._server.stop(None) 75 self._channel.close() 76 77 def test_watch_empty_service(self): 78 request = health_pb2.HealthCheckRequest(service="") 79 response_queue = queue.Queue() 80 rendezvous = self._stub.Watch(request) 81 thread = threading.Thread( 82 target=_consume_responses, args=(rendezvous, response_queue) 83 ) 84 thread.start() 85 86 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 87 self.assertEqual( 88 health_pb2.HealthCheckResponse.SERVING, response.status 89 ) 90 91 rendezvous.cancel() 92 thread.join() 93 self.assertTrue(response_queue.empty()) 94 95 if self._thread_pool is not None: 96 self.assertTrue(self._thread_pool.was_used()) 97 98 def test_watch_new_service(self): 99 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 100 response_queue = queue.Queue() 101 rendezvous = self._stub.Watch(request) 102 thread = threading.Thread( 103 target=_consume_responses, args=(rendezvous, response_queue) 104 ) 105 thread.start() 106 107 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 108 self.assertEqual( 109 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status 110 ) 111 112 self._servicer.set( 113 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 114 ) 115 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 116 self.assertEqual( 117 health_pb2.HealthCheckResponse.SERVING, response.status 118 ) 119 120 self._servicer.set( 121 _WATCH_SERVICE, health_pb2.HealthCheckResponse.NOT_SERVING 122 ) 123 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 124 self.assertEqual( 125 health_pb2.HealthCheckResponse.NOT_SERVING, response.status 126 ) 127 128 rendezvous.cancel() 129 thread.join() 130 self.assertTrue(response_queue.empty()) 131 132 def test_watch_service_isolation(self): 133 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 134 response_queue = queue.Queue() 135 rendezvous = self._stub.Watch(request) 136 thread = threading.Thread( 137 target=_consume_responses, args=(rendezvous, response_queue) 138 ) 139 thread.start() 140 141 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 142 self.assertEqual( 143 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status 144 ) 145 146 self._servicer.set( 147 "some-other-service", health_pb2.HealthCheckResponse.SERVING 148 ) 149 with self.assertRaises(queue.Empty): 150 response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 151 152 rendezvous.cancel() 153 thread.join() 154 self.assertTrue(response_queue.empty()) 155 156 def test_two_watchers(self): 157 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 158 response_queue1 = queue.Queue() 159 response_queue2 = queue.Queue() 160 rendezvous1 = self._stub.Watch(request) 161 rendezvous2 = self._stub.Watch(request) 162 thread1 = threading.Thread( 163 target=_consume_responses, args=(rendezvous1, response_queue1) 164 ) 165 thread2 = threading.Thread( 166 target=_consume_responses, args=(rendezvous2, response_queue2) 167 ) 168 thread1.start() 169 thread2.start() 170 171 response1 = response_queue1.get( 172 timeout=test_constants.SHORT_TIMEOUT 173 ) 174 response2 = response_queue2.get( 175 timeout=test_constants.SHORT_TIMEOUT 176 ) 177 self.assertEqual( 178 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response1.status 179 ) 180 self.assertEqual( 181 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response2.status 182 ) 183 184 self._servicer.set( 185 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 186 ) 187 response1 = response_queue1.get( 188 timeout=test_constants.SHORT_TIMEOUT 189 ) 190 response2 = response_queue2.get( 191 timeout=test_constants.SHORT_TIMEOUT 192 ) 193 self.assertEqual( 194 health_pb2.HealthCheckResponse.SERVING, response1.status 195 ) 196 self.assertEqual( 197 health_pb2.HealthCheckResponse.SERVING, response2.status 198 ) 199 200 rendezvous1.cancel() 201 rendezvous2.cancel() 202 thread1.join() 203 thread2.join() 204 self.assertTrue(response_queue1.empty()) 205 self.assertTrue(response_queue2.empty()) 206 207 @unittest.skip("https://github.com/grpc/grpc/issues/18127") 208 def test_cancelled_watch_removed_from_watch_list(self): 209 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 210 response_queue = queue.Queue() 211 rendezvous = self._stub.Watch(request) 212 thread = threading.Thread( 213 target=_consume_responses, args=(rendezvous, response_queue) 214 ) 215 thread.start() 216 217 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 218 self.assertEqual( 219 health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, response.status 220 ) 221 222 rendezvous.cancel() 223 self._servicer.set( 224 _WATCH_SERVICE, health_pb2.HealthCheckResponse.SERVING 225 ) 226 thread.join() 227 228 # Wait, if necessary, for serving thread to process client cancellation 229 timeout = time.time() + test_constants.TIME_ALLOWANCE 230 while ( 231 time.time() < timeout 232 and self._servicer._send_response_callbacks[_WATCH_SERVICE] 233 ): 234 time.sleep(1) 235 self.assertFalse( 236 self._servicer._send_response_callbacks[_WATCH_SERVICE], 237 "watch set should be empty", 238 ) 239 self.assertTrue(response_queue.empty()) 240 241 def test_graceful_shutdown(self): 242 request = health_pb2.HealthCheckRequest(service="") 243 response_queue = queue.Queue() 244 rendezvous = self._stub.Watch(request) 245 thread = threading.Thread( 246 target=_consume_responses, args=(rendezvous, response_queue) 247 ) 248 thread.start() 249 250 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 251 self.assertEqual( 252 health_pb2.HealthCheckResponse.SERVING, response.status 253 ) 254 255 self._servicer.enter_graceful_shutdown() 256 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 257 self.assertEqual( 258 health_pb2.HealthCheckResponse.NOT_SERVING, response.status 259 ) 260 261 # This should be a no-op. 262 self._servicer.set("", health_pb2.HealthCheckResponse.SERVING) 263 264 rendezvous.cancel() 265 thread.join() 266 self.assertTrue(response_queue.empty()) 267 268 269@unittest.skipIf( 270 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 271) 272class HealthServicerTest(BaseWatchTests.WatchTests): 273 def setUp(self): 274 self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None) 275 super(HealthServicerTest, self).start_server( 276 non_blocking=True, thread_pool=self._thread_pool 277 ) 278 279 def test_check_empty_service(self): 280 request = health_pb2.HealthCheckRequest() 281 resp = self._stub.Check(request) 282 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 283 284 def test_check_serving_service(self): 285 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) 286 resp = self._stub.Check(request) 287 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 288 289 def test_check_unknown_service(self): 290 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) 291 resp = self._stub.Check(request) 292 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) 293 294 def test_check_not_serving_service(self): 295 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) 296 resp = self._stub.Check(request) 297 self.assertEqual( 298 health_pb2.HealthCheckResponse.NOT_SERVING, resp.status 299 ) 300 301 def test_check_not_found_service(self): 302 request = health_pb2.HealthCheckRequest(service="not-found") 303 with self.assertRaises(grpc.RpcError) as context: 304 resp = self._stub.Check(request) 305 306 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) 307 308 def test_health_service_name(self): 309 self.assertEqual(health.SERVICE_NAME, "grpc.health.v1.Health") 310 311 312@unittest.skipIf( 313 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 314) 315class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests): 316 def setUp(self): 317 super(HealthServicerBackwardsCompatibleWatchTest, self).start_server( 318 non_blocking=False, thread_pool=None 319 ) 320 321 322if __name__ == "__main__": 323 logging.basicConfig() 324 unittest.main(verbosity=2) 325