1 #region Copyright notice and license 2 // Copyright 2015 gRPC authors. 3 // 4 // Licensed under the Apache License, Version 2.0 (the "License"); 5 // you may not use this file except in compliance with the License. 6 // You may obtain a copy of the License at 7 // 8 // http://www.apache.org/licenses/LICENSE-2.0 9 // 10 // Unless required by applicable law or agreed to in writing, software 11 // distributed under the License is distributed on an "AS IS" BASIS, 12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 // See the License for the specific language governing permissions and 14 // limitations under the License. 15 #endregion 16 17 using System; 18 using System.Collections.Generic; 19 using System.Linq; 20 #if GRPC_SUPPORT_WATCH 21 using System.Threading.Channels; 22 #endif 23 using System.Threading.Tasks; 24 25 using Grpc.Core; 26 using Grpc.Health.V1; 27 28 namespace Grpc.HealthCheck 29 { 30 /// <summary> 31 /// Implementation of a simple Health service. Useful for health checking. 32 /// 33 /// Registering service with a server: 34 /// <code> 35 /// var serviceImpl = new HealthServiceImpl(); 36 /// server = new Server(); 37 /// server.AddServiceDefinition(Grpc.Health.V1.Health.BindService(serviceImpl)); 38 /// </code> 39 /// </summary> 40 public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase 41 { 42 // The maximum number of statuses to buffer on the server. 43 internal const int MaxStatusBufferSize = 5; 44 45 private readonly object statusLock = new object(); 46 private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap = 47 new Dictionary<string, HealthCheckResponse.Types.ServingStatus>(); 48 49 #if GRPC_SUPPORT_WATCH 50 private readonly object watchersLock = new object(); 51 private readonly Dictionary<string, List<ChannelWriter<HealthCheckResponse>>> watchers = 52 new Dictionary<string, List<ChannelWriter<HealthCheckResponse>>>(); 53 #endif 54 55 /// <summary> 56 /// Sets the health status for given service. 57 /// </summary> 58 /// <param name="service">The service. Cannot be null.</param> 59 /// <param name="status">the health status</param> SetStatus(string service, HealthCheckResponse.Types.ServingStatus status)60 public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status) 61 { 62 HealthCheckResponse.Types.ServingStatus previousStatus; 63 lock (statusLock) 64 { 65 previousStatus = GetServiceStatus(service); 66 statusMap[service] = status; 67 } 68 69 #if GRPC_SUPPORT_WATCH 70 if (status != previousStatus) 71 { 72 NotifyStatus(service, status); 73 } 74 #endif 75 } 76 77 /// <summary> 78 /// Clears health status for given service. 79 /// </summary> 80 /// <param name="service">The service. Cannot be null.</param> ClearStatus(string service)81 public void ClearStatus(string service) 82 { 83 HealthCheckResponse.Types.ServingStatus previousStatus; 84 lock (statusLock) 85 { 86 previousStatus = GetServiceStatus(service); 87 statusMap.Remove(service); 88 } 89 90 #if GRPC_SUPPORT_WATCH 91 if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) 92 { 93 NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); 94 } 95 #endif 96 } 97 98 /// <summary> 99 /// Clears statuses for all services. 100 /// </summary> ClearAll()101 public void ClearAll() 102 { 103 List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses; 104 lock (statusLock) 105 { 106 statuses = statusMap.ToList(); 107 statusMap.Clear(); 108 } 109 110 #if GRPC_SUPPORT_WATCH 111 foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses) 112 { 113 if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) 114 { 115 NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); 116 } 117 } 118 #endif 119 } 120 121 /// <summary> 122 /// Performs a health status check. 123 /// </summary> 124 /// <param name="request">The check request.</param> 125 /// <param name="context">The call context.</param> 126 /// <returns>The asynchronous response.</returns> Check(HealthCheckRequest request, ServerCallContext context)127 public override Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context) 128 { 129 HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true); 130 131 return Task.FromResult(response); 132 } 133 134 #if GRPC_SUPPORT_WATCH 135 /// <summary> 136 /// Performs a watch for the serving status of the requested service. 137 /// The server will immediately send back a message indicating the current 138 /// serving status. It will then subsequently send a new message whenever 139 /// the service's serving status changes. 140 /// 141 /// If the requested service is unknown when the call is received, the 142 /// server will send a message setting the serving status to 143 /// SERVICE_UNKNOWN but will *not* terminate the call. If at some 144 /// future point, the serving status of the service becomes known, the 145 /// server will send a new message with the service's serving status. 146 /// 147 /// If the call terminates with status UNIMPLEMENTED, then clients 148 /// should assume this method is not supported and should not retry the 149 /// call. If the call terminates with any other status (including OK), 150 /// clients should retry the call with appropriate exponential backoff. 151 /// </summary> 152 /// <param name="request">The request received from the client.</param> 153 /// <param name="responseStream">Used for sending responses back to the client.</param> 154 /// <param name="context">The context of the server-side call handler being invoked.</param> 155 /// <returns>A task indicating completion of the handler.</returns> Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)156 public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context) 157 { 158 string service = request.Service; 159 160 // Channel is used to to marshall multiple callers updating status into a single queue. 161 // This is required because IServerStreamWriter is not thread safe. 162 // 163 // A queue of unwritten statuses could build up if flow control causes responseStream.WriteAsync to await. 164 // When this number is exceeded the server will discard older statuses. The discarded intermediate statues 165 // will never be sent to the client. 166 Channel<HealthCheckResponse> channel = Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(capacity: MaxStatusBufferSize) { 167 SingleReader = true, 168 SingleWriter = false, 169 FullMode = BoundedChannelFullMode.DropOldest 170 }); 171 172 lock (watchersLock) 173 { 174 if (!watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters)) 175 { 176 channelWriters = new List<ChannelWriter<HealthCheckResponse>>(); 177 watchers.Add(service, channelWriters); 178 } 179 180 channelWriters.Add(channel.Writer); 181 } 182 183 // Watch calls run until ended by the client canceling them. 184 context.CancellationToken.Register(() => { 185 lock (watchersLock) 186 { 187 if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters)) 188 { 189 // Remove the writer from the watchers 190 if (channelWriters.Remove(channel.Writer)) 191 { 192 // Remove empty collection if service has no more response streams 193 if (channelWriters.Count == 0) 194 { 195 watchers.Remove(service); 196 } 197 } 198 } 199 } 200 201 // Signal the writer is complete and the watch method can exit. 202 channel.Writer.Complete(); 203 }); 204 205 // Send current status immediately 206 HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false); 207 await responseStream.WriteAsync(response); 208 209 // Read messages. WaitToReadAsync will wait until new messages are available. 210 // Loop will exit when the call is canceled and the writer is marked as complete. 211 while (await channel.Reader.WaitToReadAsync()) 212 { 213 if (channel.Reader.TryRead(out HealthCheckResponse item)) 214 { 215 await responseStream.WriteAsync(item); 216 } 217 } 218 } 219 NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)220 private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status) 221 { 222 lock (watchersLock) 223 { 224 if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters)) 225 { 226 HealthCheckResponse response = new HealthCheckResponse { Status = status }; 227 228 foreach (ChannelWriter<HealthCheckResponse> writer in channelWriters) 229 { 230 if (!writer.TryWrite(response)) 231 { 232 throw new InvalidOperationException("Unable to queue health check notification."); 233 } 234 } 235 } 236 } 237 } 238 #endif 239 GetHealthCheckResponse(string service, bool throwOnNotFound)240 private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound) 241 { 242 HealthCheckResponse response = null; 243 lock (statusLock) 244 { 245 HealthCheckResponse.Types.ServingStatus status; 246 if (!statusMap.TryGetValue(service, out status)) 247 { 248 if (throwOnNotFound) 249 { 250 // TODO(jtattermusch): returning specific status from server handler is not supported yet. 251 throw new RpcException(new Status(StatusCode.NotFound, "")); 252 } 253 else 254 { 255 status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown; 256 } 257 } 258 response = new HealthCheckResponse { Status = status }; 259 } 260 261 return response; 262 } 263 GetServiceStatus(string service)264 private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service) 265 { 266 if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s)) 267 { 268 return s; 269 } 270 else 271 { 272 // A service with no set status has a status of ServiceUnknown 273 return HealthCheckResponse.Types.ServingStatus.ServiceUnknown; 274 } 275 } 276 } 277 } 278