1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Reference implementation for health checking in gRPC Python.""" 15 16import collections 17import threading 18import sys 19import grpc 20 21from grpc_health.v1 import health_pb2 as _health_pb2 22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc 23 24if sys.version_info[0] >= 3 and sys.version_info[1] >= 6: 25 # Exposes AsyncHealthServicer as public API. 26 from . import _async as aio # pylint: disable=unused-import 27 28# The service name of the health checking servicer. 29SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name 30# The entry of overall health for the entire server. 31OVERALL_HEALTH = '' 32 33 34class _Watcher(): 35 36 def __init__(self): 37 self._condition = threading.Condition() 38 self._responses = collections.deque() 39 self._open = True 40 41 def __iter__(self): 42 return self 43 44 def _next(self): 45 with self._condition: 46 while not self._responses and self._open: 47 self._condition.wait() 48 if self._responses: 49 return self._responses.popleft() 50 else: 51 raise StopIteration() 52 53 def next(self): 54 return self._next() 55 56 def __next__(self): 57 return self._next() 58 59 def add(self, response): 60 with self._condition: 61 self._responses.append(response) 62 self._condition.notify() 63 64 def close(self): 65 with self._condition: 66 self._open = False 67 self._condition.notify() 68 69 70def _watcher_to_send_response_callback_adapter(watcher): 71 72 def send_response_callback(response): 73 if response is None: 74 watcher.close() 75 else: 76 watcher.add(response) 77 78 return send_response_callback 79 80 81class HealthServicer(_health_pb2_grpc.HealthServicer): 82 """Servicer handling RPCs for service statuses.""" 83 84 def __init__(self, 85 experimental_non_blocking=True, 86 experimental_thread_pool=None): 87 self._lock = threading.RLock() 88 self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING} 89 self._send_response_callbacks = {} 90 self.Watch.__func__.experimental_non_blocking = experimental_non_blocking 91 self.Watch.__func__.experimental_thread_pool = experimental_thread_pool 92 self._gracefully_shutting_down = False 93 94 def _on_close_callback(self, send_response_callback, service): 95 96 def callback(): 97 with self._lock: 98 self._send_response_callbacks[service].remove( 99 send_response_callback) 100 send_response_callback(None) 101 102 return callback 103 104 def Check(self, request, context): 105 with self._lock: 106 status = self._server_status.get(request.service) 107 if status is None: 108 context.set_code(grpc.StatusCode.NOT_FOUND) 109 return _health_pb2.HealthCheckResponse() 110 else: 111 return _health_pb2.HealthCheckResponse(status=status) 112 113 # pylint: disable=arguments-differ 114 def Watch(self, request, context, send_response_callback=None): 115 blocking_watcher = None 116 if send_response_callback is None: 117 # The server does not support the experimental_non_blocking 118 # parameter. For backwards compatibility, return a blocking response 119 # generator. 120 blocking_watcher = _Watcher() 121 send_response_callback = _watcher_to_send_response_callback_adapter( 122 blocking_watcher) 123 service = request.service 124 with self._lock: 125 status = self._server_status.get(service) 126 if status is None: 127 status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member 128 send_response_callback( 129 _health_pb2.HealthCheckResponse(status=status)) 130 if service not in self._send_response_callbacks: 131 self._send_response_callbacks[service] = set() 132 self._send_response_callbacks[service].add(send_response_callback) 133 context.add_callback( 134 self._on_close_callback(send_response_callback, service)) 135 return blocking_watcher 136 137 def set(self, service, status): 138 """Sets the status of a service. 139 140 Args: 141 service: string, the name of the service. 142 status: HealthCheckResponse.status enum value indicating the status of 143 the service 144 """ 145 with self._lock: 146 if self._gracefully_shutting_down: 147 return 148 else: 149 self._server_status[service] = status 150 if service in self._send_response_callbacks: 151 for send_response_callback in self._send_response_callbacks[ 152 service]: 153 send_response_callback( 154 _health_pb2.HealthCheckResponse(status=status)) 155 156 def enter_graceful_shutdown(self): 157 """Permanently sets the status of all services to NOT_SERVING. 158 159 This should be invoked when the server is entering a graceful shutdown 160 period. After this method is invoked, future attempts to set the status 161 of a service will be ignored. 162 163 This is an EXPERIMENTAL API. 164 """ 165 with self._lock: 166 if self._gracefully_shutting_down: 167 return 168 else: 169 for service in self._server_status: 170 self.set(service, 171 _health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member 172 self._gracefully_shutting_down = True 173