1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.protobuf.services; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.util.concurrent.MoreExecutors; 21 import io.grpc.Context; 22 import io.grpc.Context.CancellationListener; 23 import io.grpc.Status; 24 import io.grpc.StatusException; 25 import io.grpc.health.v1.HealthCheckRequest; 26 import io.grpc.health.v1.HealthCheckResponse; 27 import io.grpc.health.v1.HealthCheckResponse.ServingStatus; 28 import io.grpc.health.v1.HealthGrpc; 29 import io.grpc.stub.StreamObserver; 30 import java.util.HashMap; 31 import java.util.IdentityHashMap; 32 import java.util.Map; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.logging.Level; 35 import java.util.logging.Logger; 36 import javax.annotation.Nullable; 37 import javax.annotation.concurrent.GuardedBy; 38 39 final class HealthServiceImpl extends HealthGrpc.HealthImplBase { 40 41 private static final Logger logger = Logger.getLogger(HealthServiceImpl.class.getName()); 42 43 // Due to the latency of rpc calls, synchronization of the map does not help with consistency. 44 // However, need use ConcurrentHashMap to allow concurrent reading by check(). 45 private final Map<String, ServingStatus> statusMap = new ConcurrentHashMap<>(); 46 47 private final Object watchLock = new Object(); 48 49 // Indicates if future status changes should be ignored. 50 @GuardedBy("watchLock") 51 private boolean terminal; 52 53 // Technically a Multimap<String, StreamObserver<HealthCheckResponse>>. The Boolean value is not 54 // used. The StreamObservers need to be kept in a identity-equality set, to make sure 55 // user-defined equals() doesn't confuse our book-keeping of the StreamObservers. Constructing 56 // such Multimap would require extra lines and the end result is not significantly simpler, thus I 57 // would rather not have the Guava collections dependency. 58 @GuardedBy("watchLock") 59 private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>> 60 watchers = new HashMap<>(); 61 HealthServiceImpl()62 HealthServiceImpl() { 63 // Copy of what Go and C++ do. 64 statusMap.put(HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING); 65 } 66 67 @Override check(HealthCheckRequest request, StreamObserver<HealthCheckResponse> responseObserver)68 public void check(HealthCheckRequest request, 69 StreamObserver<HealthCheckResponse> responseObserver) { 70 ServingStatus status = statusMap.get(request.getService()); 71 if (status == null) { 72 responseObserver.onError(new StatusException( 73 Status.NOT_FOUND.withDescription("unknown service " + request.getService()))); 74 } else { 75 HealthCheckResponse response = HealthCheckResponse.newBuilder().setStatus(status).build(); 76 responseObserver.onNext(response); 77 responseObserver.onCompleted(); 78 } 79 } 80 81 @Override watch(HealthCheckRequest request, final StreamObserver<HealthCheckResponse> responseObserver)82 public void watch(HealthCheckRequest request, 83 final StreamObserver<HealthCheckResponse> responseObserver) { 84 final String service = request.getService(); 85 synchronized (watchLock) { 86 ServingStatus status = statusMap.get(service); 87 responseObserver.onNext(getResponseForWatch(status)); 88 IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers = 89 watchers.get(service); 90 if (serviceWatchers == null) { 91 serviceWatchers = new IdentityHashMap<>(); 92 watchers.put(service, serviceWatchers); 93 } 94 serviceWatchers.put(responseObserver, Boolean.TRUE); 95 } 96 Context.current().addListener( 97 new CancellationListener() { 98 @Override 99 // Called when the client has closed the stream 100 public void cancelled(Context context) { 101 synchronized (watchLock) { 102 IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers = 103 watchers.get(service); 104 if (serviceWatchers != null) { 105 serviceWatchers.remove(responseObserver); 106 if (serviceWatchers.isEmpty()) { 107 watchers.remove(service); 108 } 109 } 110 } 111 } 112 }, 113 MoreExecutors.directExecutor()); 114 } 115 setStatus(String service, ServingStatus status)116 void setStatus(String service, ServingStatus status) { 117 synchronized (watchLock) { 118 if (terminal) { 119 logger.log(Level.FINE, "Ignoring status {} for {}", new Object[]{status, service}); 120 return; 121 } 122 setStatusInternal(service, status); 123 } 124 } 125 126 @GuardedBy("watchLock") setStatusInternal(String service, ServingStatus status)127 private void setStatusInternal(String service, ServingStatus status) { 128 ServingStatus prevStatus = statusMap.put(service, status); 129 if (prevStatus != status) { 130 notifyWatchers(service, status); 131 } 132 } 133 clearStatus(String service)134 void clearStatus(String service) { 135 synchronized (watchLock) { 136 if (terminal) { 137 logger.log(Level.FINE, "Ignoring status clearing for {}", new Object[]{service}); 138 return; 139 } 140 ServingStatus prevStatus = statusMap.remove(service); 141 if (prevStatus != null) { 142 notifyWatchers(service, null); 143 } 144 } 145 } 146 enterTerminalState()147 void enterTerminalState() { 148 synchronized (watchLock) { 149 if (terminal) { 150 logger.log(Level.WARNING, "Already terminating", new RuntimeException()); 151 return; 152 } 153 terminal = true; 154 for (String service : statusMap.keySet()) { 155 setStatusInternal(service, ServingStatus.NOT_SERVING); 156 } 157 } 158 } 159 160 @VisibleForTesting numWatchersForTest(String service)161 int numWatchersForTest(String service) { 162 synchronized (watchLock) { 163 IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers = 164 watchers.get(service); 165 if (serviceWatchers == null) { 166 return 0; 167 } 168 return serviceWatchers.size(); 169 } 170 } 171 172 @GuardedBy("watchLock") notifyWatchers(String service, @Nullable ServingStatus status)173 private void notifyWatchers(String service, @Nullable ServingStatus status) { 174 HealthCheckResponse response = getResponseForWatch(status); 175 IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers = 176 watchers.get(service); 177 if (serviceWatchers != null) { 178 for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) { 179 responseObserver.onNext(response); 180 } 181 } 182 } 183 getResponseForWatch(@ullable ServingStatus recordedStatus)184 private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) { 185 return HealthCheckResponse.newBuilder().setStatus( 186 recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build(); 187 } 188 } 189