• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.protobuf.services;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.util.concurrent.MoreExecutors;
21 import io.grpc.Context;
22 import io.grpc.Context.CancellationListener;
23 import io.grpc.Status;
24 import io.grpc.StatusException;
25 import io.grpc.health.v1.HealthCheckRequest;
26 import io.grpc.health.v1.HealthCheckResponse;
27 import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
28 import io.grpc.health.v1.HealthGrpc;
29 import io.grpc.stub.StreamObserver;
30 import java.util.HashMap;
31 import java.util.IdentityHashMap;
32 import java.util.Map;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 import javax.annotation.Nullable;
37 import javax.annotation.concurrent.GuardedBy;
38 
39 final class HealthServiceImpl extends HealthGrpc.HealthImplBase {
40 
41   private static final Logger logger = Logger.getLogger(HealthServiceImpl.class.getName());
42 
43   // Due to the latency of rpc calls, synchronization of the map does not help with consistency.
44   // However, need use ConcurrentHashMap to allow concurrent reading by check().
45   private final Map<String, ServingStatus> statusMap = new ConcurrentHashMap<>();
46 
47   private final Object watchLock = new Object();
48 
49   // Indicates if future status changes should be ignored.
50   @GuardedBy("watchLock")
51   private boolean terminal;
52 
53   // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>.  The Boolean value is not
54   // used.  The StreamObservers need to be kept in a identity-equality set, to make sure
55   // user-defined equals() doesn't confuse our book-keeping of the StreamObservers.  Constructing
56   // such Multimap would require extra lines and the end result is not significantly simpler, thus I
57   // would rather not have the Guava collections dependency.
58   @GuardedBy("watchLock")
59   private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
60       watchers = new HashMap<>();
61 
HealthServiceImpl()62   HealthServiceImpl() {
63     // Copy of what Go and C++ do.
64     statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING);
65   }
66 
67   @Override
check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver)68   public void check(HealthCheckRequest request,
69       StreamObserver<HealthCheckResponse> responseObserver) {
70     ServingStatus status = statusMap.get(request.getService());
71     if (status == null) {
72       responseObserver.onError(new StatusException(
73           Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
74     } else {
75       HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build();
76       responseObserver.onNext(response);
77       responseObserver.onCompleted();
78     }
79   }
80 
81   @Override
watch(HealthCheckRequest request, final StreamObserver<HealthCheckResponse> responseObserver)82   public void watch(HealthCheckRequest request,
83       final StreamObserver<HealthCheckResponse> responseObserver) {
84     final String service = request.getService();
85     synchronized (watchLock) {
86       ServingStatus status = statusMap.get(service);
87       responseObserver.onNext(getResponseForWatch(status));
88       IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
89           watchers.get(service);
90       if (serviceWatchers == null) {
91         serviceWatchers = new IdentityHashMap<>();
92         watchers.put(service, serviceWatchers);
93       }
94       serviceWatchers.put(responseObserver, Boolean.TRUE);
95     }
96     Context.current().addListener(
97         new CancellationListener() {
98           @Override
99           // Called when the client has closed the stream
100           public void cancelled(Context context) {
101             synchronized (watchLock) {
102               IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
103                   watchers.get(service);
104               if (serviceWatchers != null) {
105                 serviceWatchers.remove(responseObserver);
106                 if (serviceWatchers.isEmpty()) {
107                   watchers.remove(service);
108                 }
109               }
110             }
111           }
112         },
113         MoreExecutors.directExecutor());
114   }
115 
setStatus(String service, ServingStatus status)116   void setStatus(String service, ServingStatus status) {
117     synchronized (watchLock) {
118       if (terminal) {
119         logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service});
120         return;
121       }
122       setStatusInternal(service, status);
123     }
124   }
125 
126   @GuardedBy("watchLock")
setStatusInternal(String service, ServingStatus status)127   private void setStatusInternal(String service, ServingStatus status) {
128     ServingStatus prevStatus = statusMap.put(service, status);
129     if (prevStatus != status) {
130       notifyWatchers(service, status);
131     }
132   }
133 
clearStatus(String service)134   void clearStatus(String service) {
135     synchronized (watchLock) {
136       if (terminal) {
137         logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service});
138         return;
139       }
140       ServingStatus prevStatus = statusMap.remove(service);
141       if (prevStatus != null) {
142         notifyWatchers(service, null);
143       }
144     }
145   }
146 
enterTerminalState()147   void enterTerminalState() {
148     synchronized (watchLock) {
149       if (terminal) {
150         logger.log(Level.WARNING, "Already terminating", new RuntimeException());
151         return;
152       }
153       terminal = true;
154       for (String service : statusMap.keySet()) {
155         setStatusInternal(service, ServingStatus.NOT_SERVING);
156       }
157     }
158   }
159 
160   @VisibleForTesting
numWatchersForTest(String service)161   int numWatchersForTest(String service) {
162     synchronized (watchLock) {
163       IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
164           watchers.get(service);
165       if (serviceWatchers == null) {
166         return 0;
167       }
168       return serviceWatchers.size();
169     }
170   }
171 
172   @GuardedBy("watchLock")
notifyWatchers(String service, @Nullable ServingStatus status)173   private void notifyWatchers(String service, @Nullable ServingStatus status) {
174     HealthCheckResponse response = getResponseForWatch(status);
175     IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
176         watchers.get(service);
177     if (serviceWatchers != null) {
178       for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
179         responseObserver.onNext(response);
180       }
181     }
182   }
183 
getResponseForWatch(@ullable ServingStatus recordedStatus)184   private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) {
185     return HealthCheckResponse.newBuilder().setStatus(
186         recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
187   }
188 }
189