1# Copyright 2016 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_health.v1.health.""" 15 16import logging 17import threading 18import time 19import unittest 20 21import grpc 22 23from grpc_health.v1 import health 24from grpc_health.v1 import health_pb2 25from grpc_health.v1 import health_pb2_grpc 26 27from tests.unit import test_common 28from tests.unit import thread_pool 29from tests.unit.framework.common import test_constants 30 31from six.moves import queue 32 33_SERVING_SERVICE = 'grpc.test.TestServiceServing' 34_UNKNOWN_SERVICE = 'grpc.test.TestServiceUnknown' 35_NOT_SERVING_SERVICE = 'grpc.test.TestServiceNotServing' 36_WATCH_SERVICE = 'grpc.test.WatchService' 37 38 39def _consume_responses(response_iterator, response_queue): 40 for response in response_iterator: 41 response_queue.put(response) 42 43 44class BaseWatchTests(object): 45 46 class WatchTests(unittest.TestCase): 47 48 def start_server(self, non_blocking=False, thread_pool=None): 49 self._thread_pool = thread_pool 50 self._servicer = health.HealthServicer( 51 experimental_non_blocking=non_blocking, 52 experimental_thread_pool=thread_pool) 53 self._servicer.set(_SERVING_SERVICE, 54 health_pb2.HealthCheckResponse.SERVING) 55 self._servicer.set(_UNKNOWN_SERVICE, 56 health_pb2.HealthCheckResponse.UNKNOWN) 57 self._servicer.set(_NOT_SERVING_SERVICE, 58 health_pb2.HealthCheckResponse.NOT_SERVING) 59 self._server = test_common.test_server() 60 port = self._server.add_insecure_port('[::]:0') 61 health_pb2_grpc.add_HealthServicer_to_server( 62 self._servicer, self._server) 63 self._server.start() 64 65 self._channel = grpc.insecure_channel('localhost:%d' % port) 66 self._stub = health_pb2_grpc.HealthStub(self._channel) 67 68 def tearDown(self): 69 self._server.stop(None) 70 self._channel.close() 71 72 def test_watch_empty_service(self): 73 request = health_pb2.HealthCheckRequest(service='') 74 response_queue = queue.Queue() 75 rendezvous = self._stub.Watch(request) 76 thread = threading.Thread(target=_consume_responses, 77 args=(rendezvous, response_queue)) 78 thread.start() 79 80 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 81 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 82 response.status) 83 84 rendezvous.cancel() 85 thread.join() 86 self.assertTrue(response_queue.empty()) 87 88 if self._thread_pool is not None: 89 self.assertTrue(self._thread_pool.was_used()) 90 91 def test_watch_new_service(self): 92 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 93 response_queue = queue.Queue() 94 rendezvous = self._stub.Watch(request) 95 thread = threading.Thread(target=_consume_responses, 96 args=(rendezvous, response_queue)) 97 thread.start() 98 99 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 100 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 101 response.status) 102 103 self._servicer.set(_WATCH_SERVICE, 104 health_pb2.HealthCheckResponse.SERVING) 105 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 106 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 107 response.status) 108 109 self._servicer.set(_WATCH_SERVICE, 110 health_pb2.HealthCheckResponse.NOT_SERVING) 111 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 112 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 113 response.status) 114 115 rendezvous.cancel() 116 thread.join() 117 self.assertTrue(response_queue.empty()) 118 119 def test_watch_service_isolation(self): 120 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 121 response_queue = queue.Queue() 122 rendezvous = self._stub.Watch(request) 123 thread = threading.Thread(target=_consume_responses, 124 args=(rendezvous, response_queue)) 125 thread.start() 126 127 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 128 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 129 response.status) 130 131 self._servicer.set('some-other-service', 132 health_pb2.HealthCheckResponse.SERVING) 133 with self.assertRaises(queue.Empty): 134 response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 135 136 rendezvous.cancel() 137 thread.join() 138 self.assertTrue(response_queue.empty()) 139 140 def test_two_watchers(self): 141 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 142 response_queue1 = queue.Queue() 143 response_queue2 = queue.Queue() 144 rendezvous1 = self._stub.Watch(request) 145 rendezvous2 = self._stub.Watch(request) 146 thread1 = threading.Thread(target=_consume_responses, 147 args=(rendezvous1, response_queue1)) 148 thread2 = threading.Thread(target=_consume_responses, 149 args=(rendezvous2, response_queue2)) 150 thread1.start() 151 thread2.start() 152 153 response1 = response_queue1.get( 154 timeout=test_constants.SHORT_TIMEOUT) 155 response2 = response_queue2.get( 156 timeout=test_constants.SHORT_TIMEOUT) 157 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 158 response1.status) 159 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 160 response2.status) 161 162 self._servicer.set(_WATCH_SERVICE, 163 health_pb2.HealthCheckResponse.SERVING) 164 response1 = response_queue1.get( 165 timeout=test_constants.SHORT_TIMEOUT) 166 response2 = response_queue2.get( 167 timeout=test_constants.SHORT_TIMEOUT) 168 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 169 response1.status) 170 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 171 response2.status) 172 173 rendezvous1.cancel() 174 rendezvous2.cancel() 175 thread1.join() 176 thread2.join() 177 self.assertTrue(response_queue1.empty()) 178 self.assertTrue(response_queue2.empty()) 179 180 @unittest.skip("https://github.com/grpc/grpc/issues/18127") 181 def test_cancelled_watch_removed_from_watch_list(self): 182 request = health_pb2.HealthCheckRequest(service=_WATCH_SERVICE) 183 response_queue = queue.Queue() 184 rendezvous = self._stub.Watch(request) 185 thread = threading.Thread(target=_consume_responses, 186 args=(rendezvous, response_queue)) 187 thread.start() 188 189 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 190 self.assertEqual(health_pb2.HealthCheckResponse.SERVICE_UNKNOWN, 191 response.status) 192 193 rendezvous.cancel() 194 self._servicer.set(_WATCH_SERVICE, 195 health_pb2.HealthCheckResponse.SERVING) 196 thread.join() 197 198 # Wait, if necessary, for serving thread to process client cancellation 199 timeout = time.time() + test_constants.TIME_ALLOWANCE 200 while (time.time() < timeout and 201 self._servicer._send_response_callbacks[_WATCH_SERVICE]): 202 time.sleep(1) 203 self.assertFalse( 204 self._servicer._send_response_callbacks[_WATCH_SERVICE], 205 'watch set should be empty') 206 self.assertTrue(response_queue.empty()) 207 208 def test_graceful_shutdown(self): 209 request = health_pb2.HealthCheckRequest(service='') 210 response_queue = queue.Queue() 211 rendezvous = self._stub.Watch(request) 212 thread = threading.Thread(target=_consume_responses, 213 args=(rendezvous, response_queue)) 214 thread.start() 215 216 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 217 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, 218 response.status) 219 220 self._servicer.enter_graceful_shutdown() 221 response = response_queue.get(timeout=test_constants.SHORT_TIMEOUT) 222 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 223 response.status) 224 225 # This should be a no-op. 226 self._servicer.set('', health_pb2.HealthCheckResponse.SERVING) 227 228 rendezvous.cancel() 229 thread.join() 230 self.assertTrue(response_queue.empty()) 231 232 233class HealthServicerTest(BaseWatchTests.WatchTests): 234 235 def setUp(self): 236 self._thread_pool = thread_pool.RecordingThreadPool(max_workers=None) 237 super(HealthServicerTest, 238 self).start_server(non_blocking=True, 239 thread_pool=self._thread_pool) 240 241 def test_check_empty_service(self): 242 request = health_pb2.HealthCheckRequest() 243 resp = self._stub.Check(request) 244 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 245 246 def test_check_serving_service(self): 247 request = health_pb2.HealthCheckRequest(service=_SERVING_SERVICE) 248 resp = self._stub.Check(request) 249 self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) 250 251 def test_check_unknown_service(self): 252 request = health_pb2.HealthCheckRequest(service=_UNKNOWN_SERVICE) 253 resp = self._stub.Check(request) 254 self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) 255 256 def test_check_not_serving_service(self): 257 request = health_pb2.HealthCheckRequest(service=_NOT_SERVING_SERVICE) 258 resp = self._stub.Check(request) 259 self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, 260 resp.status) 261 262 def test_check_not_found_service(self): 263 request = health_pb2.HealthCheckRequest(service='not-found') 264 with self.assertRaises(grpc.RpcError) as context: 265 resp = self._stub.Check(request) 266 267 self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) 268 269 def test_health_service_name(self): 270 self.assertEqual(health.SERVICE_NAME, 'grpc.health.v1.Health') 271 272 273class HealthServicerBackwardsCompatibleWatchTest(BaseWatchTests.WatchTests): 274 275 def setUp(self): 276 super(HealthServicerBackwardsCompatibleWatchTest, 277 self).start_server(non_blocking=False, thread_pool=None) 278 279 280if __name__ == '__main__': 281 logging.basicConfig() 282 unittest.main(verbosity=2) 283