#region Copyright notice and license
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
#if GRPC_SUPPORT_WATCH
using System.Threading.Channels;
#endif
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Health.V1;
namespace Grpc.HealthCheck
{
///
/// Implementation of a simple Health service. Useful for health checking.
///
/// Registering service with a server:
///
/// var serviceImpl = new HealthServiceImpl();
/// server = new Server();
/// server.AddServiceDefinition(Grpc.Health.V1.Health.BindService(serviceImpl));
///
///
public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
{
// The maximum number of statuses to buffer on the server.
internal const int MaxStatusBufferSize = 5;
private readonly object statusLock = new object();
private readonly Dictionary statusMap =
new Dictionary();
#if GRPC_SUPPORT_WATCH
private readonly object watchersLock = new object();
private readonly Dictionary>> watchers =
new Dictionary>>();
#endif
///
/// Sets the health status for given service.
///
/// The service. Cannot be null.
/// the health status
public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status)
{
HealthCheckResponse.Types.ServingStatus previousStatus;
lock (statusLock)
{
previousStatus = GetServiceStatus(service);
statusMap[service] = status;
}
#if GRPC_SUPPORT_WATCH
if (status != previousStatus)
{
NotifyStatus(service, status);
}
#endif
}
///
/// Clears health status for given service.
///
/// The service. Cannot be null.
public void ClearStatus(string service)
{
HealthCheckResponse.Types.ServingStatus previousStatus;
lock (statusLock)
{
previousStatus = GetServiceStatus(service);
statusMap.Remove(service);
}
#if GRPC_SUPPORT_WATCH
if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
{
NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
}
#endif
}
///
/// Clears statuses for all services.
///
public void ClearAll()
{
List> statuses;
lock (statusLock)
{
statuses = statusMap.ToList();
statusMap.Clear();
}
#if GRPC_SUPPORT_WATCH
foreach (KeyValuePair status in statuses)
{
if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
{
NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
}
}
#endif
}
///
/// Performs a health status check.
///
/// The check request.
/// The call context.
/// The asynchronous response.
public override Task Check(HealthCheckRequest request, ServerCallContext context)
{
HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true);
return Task.FromResult(response);
}
#if GRPC_SUPPORT_WATCH
///
/// Performs a watch for the serving status of the requested service.
/// The server will immediately send back a message indicating the current
/// serving status. It will then subsequently send a new message whenever
/// the service's serving status changes.
///
/// If the requested service is unknown when the call is received, the
/// server will send a message setting the serving status to
/// SERVICE_UNKNOWN but will *not* terminate the call. If at some
/// future point, the serving status of the service becomes known, the
/// server will send a new message with the service's serving status.
///
/// If the call terminates with status UNIMPLEMENTED, then clients
/// should assume this method is not supported and should not retry the
/// call. If the call terminates with any other status (including OK),
/// clients should retry the call with appropriate exponential backoff.
///
/// The request received from the client.
/// Used for sending responses back to the client.
/// The context of the server-side call handler being invoked.
/// A task indicating completion of the handler.
public override async Task Watch(HealthCheckRequest request, IServerStreamWriter responseStream, ServerCallContext context)
{
string service = request.Service;
// Channel is used to to marshall multiple callers updating status into a single queue.
// This is required because IServerStreamWriter is not thread safe.
//
// A queue of unwritten statuses could build up if flow control causes responseStream.WriteAsync to await.
// When this number is exceeded the server will discard older statuses. The discarded intermediate statues
// will never be sent to the client.
Channel channel = Channel.CreateBounded(new BoundedChannelOptions(capacity: MaxStatusBufferSize) {
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.DropOldest
});
lock (watchersLock)
{
if (!watchers.TryGetValue(service, out List> channelWriters))
{
channelWriters = new List>();
watchers.Add(service, channelWriters);
}
channelWriters.Add(channel.Writer);
}
// Watch calls run until ended by the client canceling them.
context.CancellationToken.Register(() => {
lock (watchersLock)
{
if (watchers.TryGetValue(service, out List> channelWriters))
{
// Remove the writer from the watchers
if (channelWriters.Remove(channel.Writer))
{
// Remove empty collection if service has no more response streams
if (channelWriters.Count == 0)
{
watchers.Remove(service);
}
}
}
}
// Signal the writer is complete and the watch method can exit.
channel.Writer.Complete();
});
// Send current status immediately
HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
await responseStream.WriteAsync(response);
// Read messages. WaitToReadAsync will wait until new messages are available.
// Loop will exit when the call is canceled and the writer is marked as complete.
while (await channel.Reader.WaitToReadAsync())
{
if (channel.Reader.TryRead(out HealthCheckResponse item))
{
await responseStream.WriteAsync(item);
}
}
}
private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)
{
lock (watchersLock)
{
if (watchers.TryGetValue(service, out List> channelWriters))
{
HealthCheckResponse response = new HealthCheckResponse { Status = status };
foreach (ChannelWriter writer in channelWriters)
{
if (!writer.TryWrite(response))
{
throw new InvalidOperationException("Unable to queue health check notification.");
}
}
}
}
}
#endif
private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound)
{
HealthCheckResponse response = null;
lock (statusLock)
{
HealthCheckResponse.Types.ServingStatus status;
if (!statusMap.TryGetValue(service, out status))
{
if (throwOnNotFound)
{
// TODO(jtattermusch): returning specific status from server handler is not supported yet.
throw new RpcException(new Status(StatusCode.NotFound, ""));
}
else
{
status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
}
}
response = new HealthCheckResponse { Status = status };
}
return response;
}
private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service)
{
if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s))
{
return s;
}
else
{
// A service with no set status has a status of ServiceUnknown
return HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
}
}
}
}