• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.protobuf.services;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static org.junit.Assert.fail;
21 
22 import io.grpc.BindableService;
23 import io.grpc.Context;
24 import io.grpc.Context.CancellableContext;
25 import io.grpc.Status;
26 import io.grpc.StatusRuntimeException;
27 import io.grpc.health.v1.HealthCheckRequest;
28 import io.grpc.health.v1.HealthCheckResponse;
29 import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
30 import io.grpc.health.v1.HealthGrpc;
31 import io.grpc.stub.StreamObserver;
32 import io.grpc.testing.GrpcServerRule;
33 import java.util.ArrayDeque;
34 import java.util.concurrent.TimeUnit;
35 import org.junit.After;
36 import org.junit.Before;
37 import org.junit.Rule;
38 import org.junit.Test;
39 import org.junit.runner.RunWith;
40 import org.junit.runners.JUnit4;
41 
42 /** Tests for {@link HealthStatusManager}. */
43 @RunWith(JUnit4.class)
44 public class HealthStatusManagerTest {
45   private static final String SERVICE1 = "service1";
46   private static final String SERVICE2 = "service2";
47 
48   @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
49 
50   private final HealthStatusManager manager = new HealthStatusManager();
51   private final HealthServiceImpl service = (HealthServiceImpl) manager.getHealthService();
52   private HealthGrpc.HealthStub stub;
53   private HealthGrpc.HealthBlockingStub blockingStub;
54 
55   @Before
setup()56   public void setup() {
57     grpcServerRule.getServiceRegistry().addService(service);
58     stub = HealthGrpc.newStub(grpcServerRule.getChannel());
59     blockingStub = HealthGrpc.newBlockingStub(grpcServerRule.getChannel());
60   }
61 
62   @After
teardown()63   public void teardown() {
64     // Health-check streams are usually not closed in the tests.  Force closing for clean up.
65     grpcServerRule.getServer().shutdownNow();
66   }
67 
68   @Test
enterTerminalState_check()69   public void enterTerminalState_check() throws Exception {
70     manager.setStatus(SERVICE1, ServingStatus.SERVING);
71     RespObserver obs = new RespObserver();
72     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
73     assertThat(obs.responses).hasSize(2);
74     HealthCheckResponse resp = (HealthCheckResponse) obs.responses.poll();
75     assertThat(resp.getStatus()).isEqualTo(ServingStatus.SERVING);
76 
77     manager.enterTerminalState();
78     obs = new RespObserver();
79     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
80     assertThat(obs.responses).hasSize(2);
81     resp = (HealthCheckResponse) obs.responses.poll();
82     assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING);
83 
84     manager.setStatus(SERVICE1, ServingStatus.SERVING);
85     obs = new RespObserver();
86     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
87     assertThat(obs.responses).hasSize(2);
88     resp = (HealthCheckResponse) obs.responses.poll();
89     assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING);
90   }
91 
92   @Test
enterTerminalState_watch()93   public void enterTerminalState_watch() throws Exception {
94     manager.setStatus(SERVICE1, ServingStatus.SERVING);
95     RespObserver obs = new RespObserver();
96     service.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
97     assertThat(obs.responses).hasSize(1);
98     HealthCheckResponse resp = (HealthCheckResponse) obs.responses.poll();
99     assertThat(resp.getStatus()).isEqualTo(ServingStatus.SERVING);
100     obs.responses.clear();
101 
102     manager.enterTerminalState();
103     assertThat(obs.responses).hasSize(1);
104     resp = (HealthCheckResponse) obs.responses.poll();
105     assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING);
106     obs.responses.clear();
107 
108     manager.setStatus(SERVICE1, ServingStatus.SERVING);
109     assertThat(obs.responses).isEmpty();
110   }
111 
112   @Test
enterTerminalState_ignoreClear()113   public void enterTerminalState_ignoreClear() throws Exception {
114     manager.setStatus(SERVICE1, ServingStatus.SERVING);
115     RespObserver obs = new RespObserver();
116     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
117     assertThat(obs.responses).hasSize(2);
118     HealthCheckResponse resp = (HealthCheckResponse) obs.responses.poll();
119     assertThat(resp.getStatus()).isEqualTo(ServingStatus.SERVING);
120 
121     manager.enterTerminalState();
122     obs = new RespObserver();
123     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
124     assertThat(obs.responses).hasSize(2);
125     resp = (HealthCheckResponse) obs.responses.poll();
126     assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING);
127 
128     manager.clearStatus(SERVICE1);
129     obs = new RespObserver();
130     service.check(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), obs);
131     assertThat(obs.responses).hasSize(2);
132     resp = (HealthCheckResponse) obs.responses.poll();
133     assertThat(resp.getStatus()).isEqualTo(ServingStatus.NOT_SERVING);
134   }
135 
136   @Test
defaultIsServing()137   public void defaultIsServing() throws Exception {
138     HealthCheckRequest request =
139         HealthCheckRequest.newBuilder().setService(HealthStatusManager.SERVICE_NAME_ALL_SERVICES)
140             .build();
141 
142     HealthCheckResponse response =
143         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
144 
145     assertThat(response).isEqualTo(
146         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
147   }
148 
149   @Test
getHealthService_getterReturnsTheSameHealthRefAfterUpdate()150   public void getHealthService_getterReturnsTheSameHealthRefAfterUpdate() throws Exception {
151     BindableService health = manager.getHealthService();
152     manager.setStatus(SERVICE1, ServingStatus.UNKNOWN);
153     assertThat(health).isSameInstanceAs(manager.getHealthService());
154   }
155 
156   @Test
checkValidStatus()157   public void checkValidStatus() throws Exception {
158     manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
159     manager.setStatus(SERVICE2, ServingStatus.SERVING);
160     HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
161     HealthCheckResponse response =
162         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
163     assertThat(response).isEqualTo(
164         HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
165 
166     request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
167     response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
168     assertThat(response).isEqualTo(
169         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
170     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
171     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
172   }
173 
174   @Test
checkStatusNotFound()175   public void checkStatusNotFound() throws Exception {
176     manager.setStatus(SERVICE1, ServingStatus.SERVING);
177     // SERVICE2's status is not set
178     HealthCheckRequest request
179         = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
180     try {
181       blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
182       fail("Should've failed");
183     } catch (StatusRuntimeException e) {
184       assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
185     }
186     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
187   }
188 
189   @Test
notFoundForClearedStatus()190   public void notFoundForClearedStatus() throws Exception {
191     manager.setStatus(SERVICE1, ServingStatus.SERVING);
192     manager.clearStatus(SERVICE1);
193     HealthCheckRequest request
194         = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
195     try {
196       blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
197       fail("Should've failed");
198     } catch (StatusRuntimeException e) {
199       assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
200     }
201   }
202 
203   @Test
watch()204   public void watch() throws Exception {
205     manager.setStatus(SERVICE1, ServingStatus.UNKNOWN);
206 
207     // Start a watch on SERVICE1
208     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
209     RespObserver respObs1 = new RespObserver();
210     stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
211     // Will get the current status
212     assertThat(respObs1.responses.poll()).isEqualTo(
213         HealthCheckResponse.newBuilder().setStatus(ServingStatus.UNKNOWN).build());
214     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
215 
216     // Status change is notified of to the RPC
217     manager.setStatus(SERVICE1, ServingStatus.SERVING);
218     assertThat(respObs1.responses.poll()).isEqualTo(
219         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
220 
221     // Start another watch on SERVICE1
222     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
223     RespObserver respObs1b = new RespObserver();
224     stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
225     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
226     // Will get the current status
227     assertThat(respObs1b.responses.poll()).isEqualTo(
228         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
229 
230     // Start a watch on SERVICE2, which is not known yet
231     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
232     RespObserver respObs2 = new RespObserver();
233     stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
234     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
235     // Current status is SERVICE_UNKNOWN
236     assertThat(respObs2.responses.poll()).isEqualTo(
237         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
238 
239     // Set status for SERVICE2, which will be notified of
240     manager.setStatus(SERVICE2, ServingStatus.NOT_SERVING);
241     assertThat(respObs2.responses.poll()).isEqualTo(
242         HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
243 
244     // Clear the status for SERVICE1, which will be notified of
245     manager.clearStatus(SERVICE1);
246     assertThat(respObs1.responses.poll()).isEqualTo(
247         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
248     assertThat(respObs1b.responses.poll()).isEqualTo(
249         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
250 
251     // All responses have been accounted for
252     assertThat(respObs1.responses).isEmpty();
253     assertThat(respObs1b.responses).isEmpty();
254     assertThat(respObs2.responses).isEmpty();
255   }
256 
257   @Test
watchRemovedWhenClientCloses()258   public void watchRemovedWhenClientCloses() throws Exception {
259     CancellableContext withCancellation = Context.current().withCancellation();
260     Context prevCtx = withCancellation.attach();
261     RespObserver respObs1 = new RespObserver();
262     try {
263       assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
264       stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
265     } finally {
266       withCancellation.detach(prevCtx);
267     }
268     RespObserver respObs1b = new RespObserver();
269     stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
270     RespObserver respObs2 = new RespObserver();
271     stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
272 
273     assertThat(respObs1.responses.poll()).isEqualTo(
274         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
275     assertThat(respObs1b.responses.poll()).isEqualTo(
276         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
277     assertThat(respObs2.responses.poll()).isEqualTo(
278         HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
279     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
280     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
281     assertThat(respObs1.responses).isEmpty();
282     assertThat(respObs1b.responses).isEmpty();
283     assertThat(respObs2.responses).isEmpty();
284 
285     // This will cancel the RPC with respObs1
286     withCancellation.close();
287 
288     assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
289     assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
290     assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
291     assertThat(respObs1.responses).isEmpty();
292     assertThat(respObs1b.responses).isEmpty();
293     assertThat(respObs2.responses).isEmpty();
294   }
295 
296   private static class RespObserver implements StreamObserver<HealthCheckResponse> {
297     final ArrayDeque<Object> responses = new ArrayDeque<>();
298 
299     @Override
onNext(HealthCheckResponse value)300     public void onNext(HealthCheckResponse value) {
301       responses.add(value);
302     }
303 
304     @Override
onError(Throwable t)305     public void onError(Throwable t) {
306       responses.add(t);
307     }
308 
309     @Override
onCompleted()310     public void onCompleted() {
311       responses.add("onCompleted");
312     }
313   }
314 }
315