#region Copyright notice and license // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #endregion using System; using System.Collections.Generic; using System.Linq; #if GRPC_SUPPORT_WATCH using System.Threading.Channels; #endif using System.Threading.Tasks; using Grpc.Core; using Grpc.Health.V1; namespace Grpc.HealthCheck { /// /// Implementation of a simple Health service. Useful for health checking. /// /// Registering service with a server: /// /// var serviceImpl = new HealthServiceImpl(); /// server = new Server(); /// server.AddServiceDefinition(Grpc.Health.V1.Health.BindService(serviceImpl)); /// /// public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase { // The maximum number of statuses to buffer on the server. internal const int MaxStatusBufferSize = 5; private readonly object statusLock = new object(); private readonly Dictionary statusMap = new Dictionary(); #if GRPC_SUPPORT_WATCH private readonly object watchersLock = new object(); private readonly Dictionary>> watchers = new Dictionary>>(); #endif /// /// Sets the health status for given service. /// /// The service. Cannot be null. /// the health status public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status) { HealthCheckResponse.Types.ServingStatus previousStatus; lock (statusLock) { previousStatus = GetServiceStatus(service); statusMap[service] = status; } #if GRPC_SUPPORT_WATCH if (status != previousStatus) { NotifyStatus(service, status); } #endif } /// /// Clears health status for given service. /// /// The service. Cannot be null. public void ClearStatus(string service) { HealthCheckResponse.Types.ServingStatus previousStatus; lock (statusLock) { previousStatus = GetServiceStatus(service); statusMap.Remove(service); } #if GRPC_SUPPORT_WATCH if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) { NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); } #endif } /// /// Clears statuses for all services. /// public void ClearAll() { List> statuses; lock (statusLock) { statuses = statusMap.ToList(); statusMap.Clear(); } #if GRPC_SUPPORT_WATCH foreach (KeyValuePair status in statuses) { if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown) { NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown); } } #endif } /// /// Performs a health status check. /// /// The check request. /// The call context. /// The asynchronous response. public override Task Check(HealthCheckRequest request, ServerCallContext context) { HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true); return Task.FromResult(response); } #if GRPC_SUPPORT_WATCH /// /// Performs a watch for the serving status of the requested service. /// The server will immediately send back a message indicating the current /// serving status. It will then subsequently send a new message whenever /// the service's serving status changes. /// /// If the requested service is unknown when the call is received, the /// server will send a message setting the serving status to /// SERVICE_UNKNOWN but will *not* terminate the call. If at some /// future point, the serving status of the service becomes known, the /// server will send a new message with the service's serving status. /// /// If the call terminates with status UNIMPLEMENTED, then clients /// should assume this method is not supported and should not retry the /// call. If the call terminates with any other status (including OK), /// clients should retry the call with appropriate exponential backoff. /// /// The request received from the client. /// Used for sending responses back to the client. /// The context of the server-side call handler being invoked. /// A task indicating completion of the handler. public override async Task Watch(HealthCheckRequest request, IServerStreamWriter responseStream, ServerCallContext context) { string service = request.Service; // Channel is used to to marshall multiple callers updating status into a single queue. // This is required because IServerStreamWriter is not thread safe. // // A queue of unwritten statuses could build up if flow control causes responseStream.WriteAsync to await. // When this number is exceeded the server will discard older statuses. The discarded intermediate statues // will never be sent to the client. Channel channel = Channel.CreateBounded(new BoundedChannelOptions(capacity: MaxStatusBufferSize) { SingleReader = true, SingleWriter = false, FullMode = BoundedChannelFullMode.DropOldest }); lock (watchersLock) { if (!watchers.TryGetValue(service, out List> channelWriters)) { channelWriters = new List>(); watchers.Add(service, channelWriters); } channelWriters.Add(channel.Writer); } // Watch calls run until ended by the client canceling them. context.CancellationToken.Register(() => { lock (watchersLock) { if (watchers.TryGetValue(service, out List> channelWriters)) { // Remove the writer from the watchers if (channelWriters.Remove(channel.Writer)) { // Remove empty collection if service has no more response streams if (channelWriters.Count == 0) { watchers.Remove(service); } } } } // Signal the writer is complete and the watch method can exit. channel.Writer.Complete(); }); // Send current status immediately HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false); await responseStream.WriteAsync(response); // Read messages. WaitToReadAsync will wait until new messages are available. // Loop will exit when the call is canceled and the writer is marked as complete. while (await channel.Reader.WaitToReadAsync()) { if (channel.Reader.TryRead(out HealthCheckResponse item)) { await responseStream.WriteAsync(item); } } } private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status) { lock (watchersLock) { if (watchers.TryGetValue(service, out List> channelWriters)) { HealthCheckResponse response = new HealthCheckResponse { Status = status }; foreach (ChannelWriter writer in channelWriters) { if (!writer.TryWrite(response)) { throw new InvalidOperationException("Unable to queue health check notification."); } } } } } #endif private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound) { HealthCheckResponse response = null; lock (statusLock) { HealthCheckResponse.Types.ServingStatus status; if (!statusMap.TryGetValue(service, out status)) { if (throwOnNotFound) { // TODO(jtattermusch): returning specific status from server handler is not supported yet. throw new RpcException(new Status(StatusCode.NotFound, "")); } else { status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown; } } response = new HealthCheckResponse { Status = status }; } return response; } private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service) { if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s)) { return s; } else { // A service with no set status has a status of ServiceUnknown return HealthCheckResponse.Types.ServingStatus.ServiceUnknown; } } } }