• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Reference implementation for health checking in gRPC Python."""
15
16import collections
17import threading
18import sys
19import grpc
20
21from grpc_health.v1 import health_pb2 as _health_pb2
22from grpc_health.v1 import health_pb2_grpc as _health_pb2_grpc
23
24if sys.version_info[0] >= 3 and sys.version_info[1] >= 6:
25    # Exposes AsyncHealthServicer as public API.
26    from . import _async as aio  # pylint: disable=unused-import
27
28# The service name of the health checking servicer.
29SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name
30# The entry of overall health for the entire server.
31OVERALL_HEALTH = ''
32
33
34class _Watcher():
35
36    def __init__(self):
37        self._condition = threading.Condition()
38        self._responses = collections.deque()
39        self._open = True
40
41    def __iter__(self):
42        return self
43
44    def _next(self):
45        with self._condition:
46            while not self._responses and self._open:
47                self._condition.wait()
48            if self._responses:
49                return self._responses.popleft()
50            else:
51                raise StopIteration()
52
53    def next(self):
54        return self._next()
55
56    def __next__(self):
57        return self._next()
58
59    def add(self, response):
60        with self._condition:
61            self._responses.append(response)
62            self._condition.notify()
63
64    def close(self):
65        with self._condition:
66            self._open = False
67            self._condition.notify()
68
69
70def _watcher_to_send_response_callback_adapter(watcher):
71
72    def send_response_callback(response):
73        if response is None:
74            watcher.close()
75        else:
76            watcher.add(response)
77
78    return send_response_callback
79
80
81class HealthServicer(_health_pb2_grpc.HealthServicer):
82    """Servicer handling RPCs for service statuses."""
83
84    def __init__(self,
85                 experimental_non_blocking=True,
86                 experimental_thread_pool=None):
87        self._lock = threading.RLock()
88        self._server_status = {"": _health_pb2.HealthCheckResponse.SERVING}
89        self._send_response_callbacks = {}
90        self.Watch.__func__.experimental_non_blocking = experimental_non_blocking
91        self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
92        self._gracefully_shutting_down = False
93
94    def _on_close_callback(self, send_response_callback, service):
95
96        def callback():
97            with self._lock:
98                self._send_response_callbacks[service].remove(
99                    send_response_callback)
100            send_response_callback(None)
101
102        return callback
103
104    def Check(self, request, context):
105        with self._lock:
106            status = self._server_status.get(request.service)
107            if status is None:
108                context.set_code(grpc.StatusCode.NOT_FOUND)
109                return _health_pb2.HealthCheckResponse()
110            else:
111                return _health_pb2.HealthCheckResponse(status=status)
112
113    # pylint: disable=arguments-differ
114    def Watch(self, request, context, send_response_callback=None):
115        blocking_watcher = None
116        if send_response_callback is None:
117            # The server does not support the experimental_non_blocking
118            # parameter. For backwards compatibility, return a blocking response
119            # generator.
120            blocking_watcher = _Watcher()
121            send_response_callback = _watcher_to_send_response_callback_adapter(
122                blocking_watcher)
123        service = request.service
124        with self._lock:
125            status = self._server_status.get(service)
126            if status is None:
127                status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN  # pylint: disable=no-member
128            send_response_callback(
129                _health_pb2.HealthCheckResponse(status=status))
130            if service not in self._send_response_callbacks:
131                self._send_response_callbacks[service] = set()
132            self._send_response_callbacks[service].add(send_response_callback)
133            context.add_callback(
134                self._on_close_callback(send_response_callback, service))
135        return blocking_watcher
136
137    def set(self, service, status):
138        """Sets the status of a service.
139
140        Args:
141          service: string, the name of the service.
142          status: HealthCheckResponse.status enum value indicating the status of
143            the service
144        """
145        with self._lock:
146            if self._gracefully_shutting_down:
147                return
148            else:
149                self._server_status[service] = status
150                if service in self._send_response_callbacks:
151                    for send_response_callback in self._send_response_callbacks[
152                            service]:
153                        send_response_callback(
154                            _health_pb2.HealthCheckResponse(status=status))
155
156    def enter_graceful_shutdown(self):
157        """Permanently sets the status of all services to NOT_SERVING.
158
159        This should be invoked when the server is entering a graceful shutdown
160        period. After this method is invoked, future attempts to set the status
161        of a service will be ignored.
162
163        This is an EXPERIMENTAL API.
164        """
165        with self._lock:
166            if self._gracefully_shutting_down:
167                return
168            else:
169                for service in self._server_status:
170                    self.set(service,
171                             _health_pb2.HealthCheckResponse.NOT_SERVING)  # pylint: disable=no-member
172                self._gracefully_shutting_down = True
173