• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.protobuf.services;
18 
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertFalse;
21 import static org.junit.Assert.assertTrue;
22 import static org.junit.Assert.fail;
23 
24 import com.google.protobuf.ByteString;
25 import io.grpc.BindableService;
26 import io.grpc.ManagedChannel;
27 import io.grpc.Server;
28 import io.grpc.ServerServiceDefinition;
29 import io.grpc.inprocess.InProcessChannelBuilder;
30 import io.grpc.inprocess.InProcessServerBuilder;
31 import io.grpc.internal.testing.StreamRecorder;
32 import io.grpc.reflection.testing.AnotherDynamicServiceGrpc;
33 import io.grpc.reflection.testing.AnotherReflectableServiceGrpc;
34 import io.grpc.reflection.testing.DynamicReflectionTestDepthTwoProto;
35 import io.grpc.reflection.testing.DynamicServiceGrpc;
36 import io.grpc.reflection.testing.ReflectableServiceGrpc;
37 import io.grpc.reflection.testing.ReflectionTestDepthThreeProto;
38 import io.grpc.reflection.testing.ReflectionTestDepthTwoAlternateProto;
39 import io.grpc.reflection.testing.ReflectionTestDepthTwoProto;
40 import io.grpc.reflection.testing.ReflectionTestProto;
41 import io.grpc.reflection.v1alpha.ExtensionNumberResponse;
42 import io.grpc.reflection.v1alpha.ExtensionRequest;
43 import io.grpc.reflection.v1alpha.FileDescriptorResponse;
44 import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
45 import io.grpc.reflection.v1alpha.ServerReflectionRequest;
46 import io.grpc.reflection.v1alpha.ServerReflectionResponse;
47 import io.grpc.reflection.v1alpha.ServiceResponse;
48 import io.grpc.stub.ClientCallStreamObserver;
49 import io.grpc.stub.ClientResponseObserver;
50 import io.grpc.stub.StreamObserver;
51 import io.grpc.testing.GrpcCleanupRule;
52 import io.grpc.util.MutableHandlerRegistry;
53 import java.io.IOException;
54 import java.util.ArrayList;
55 import java.util.Arrays;
56 import java.util.HashSet;
57 import java.util.List;
58 import java.util.Set;
59 import java.util.concurrent.ExecutionException;
60 import org.junit.Before;
61 import org.junit.Rule;
62 import org.junit.Test;
63 import org.junit.runner.RunWith;
64 import org.junit.runners.JUnit4;
65 
66 /** Tests for {@link ProtoReflectionService}. */
67 @RunWith(JUnit4.class)
68 public class ProtoReflectionServiceTest {
69   @Rule
70   public GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
71 
72   private static final String TEST_HOST = "localhost";
73   private MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
74   private BindableService reflectionService;
75   private ServerServiceDefinition dynamicService =
76       new DynamicServiceGrpc.DynamicServiceImplBase() {}.bindService();
77   private ServerServiceDefinition anotherDynamicService =
78       new AnotherDynamicServiceGrpc.AnotherDynamicServiceImplBase() {}.bindService();
79   private ServerReflectionGrpc.ServerReflectionStub stub;
80 
81   @Before
setUp()82   public void setUp() throws Exception {
83     reflectionService = ProtoReflectionService.newInstance();
84     Server server =
85         InProcessServerBuilder.forName("proto-reflection-test")
86             .directExecutor()
87             .addService(reflectionService)
88             .addService(new ReflectableServiceGrpc.ReflectableServiceImplBase() {})
89             .fallbackHandlerRegistry(handlerRegistry)
90             .build()
91             .start();
92     grpcCleanupRule.register(server);
93     ManagedChannel channel =
94         grpcCleanupRule.register(
95             InProcessChannelBuilder.forName("proto-reflection-test").directExecutor().build());
96     stub = ServerReflectionGrpc.newStub(channel);
97   }
98 
99   @Test
listServices()100   public void listServices() throws Exception {
101     Set<ServiceResponse> originalServices =
102         new HashSet<>(
103             Arrays.asList(
104                 ServiceResponse.newBuilder()
105                     .setName("grpc.reflection.v1alpha.ServerReflection")
106                     .build(),
107                 ServiceResponse.newBuilder()
108                     .setName("grpc.reflection.testing.ReflectableService")
109                     .build()));
110     assertServiceResponseEquals(originalServices);
111 
112     handlerRegistry.addService(dynamicService);
113     assertServiceResponseEquals(
114         new HashSet<>(
115             Arrays.asList(
116                 ServiceResponse.newBuilder()
117                     .setName("grpc.reflection.v1alpha.ServerReflection")
118                     .build(),
119                 ServiceResponse.newBuilder()
120                     .setName("grpc.reflection.testing.ReflectableService")
121                     .build(),
122                 ServiceResponse.newBuilder()
123                     .setName("grpc.reflection.testing.DynamicService")
124                     .build())));
125 
126     handlerRegistry.addService(anotherDynamicService);
127     assertServiceResponseEquals(
128         new HashSet<>(
129             Arrays.asList(
130                 ServiceResponse.newBuilder()
131                     .setName("grpc.reflection.v1alpha.ServerReflection")
132                     .build(),
133                 ServiceResponse.newBuilder()
134                     .setName("grpc.reflection.testing.ReflectableService")
135                     .build(),
136                 ServiceResponse.newBuilder()
137                     .setName("grpc.reflection.testing.DynamicService")
138                     .build(),
139                 ServiceResponse.newBuilder()
140                     .setName("grpc.reflection.testing.AnotherDynamicService")
141                     .build())));
142 
143     handlerRegistry.removeService(dynamicService);
144     assertServiceResponseEquals(
145         new HashSet<>(
146             Arrays.asList(
147                 ServiceResponse.newBuilder()
148                     .setName("grpc.reflection.v1alpha.ServerReflection")
149                     .build(),
150                 ServiceResponse.newBuilder()
151                     .setName("grpc.reflection.testing.ReflectableService")
152                     .build(),
153                 ServiceResponse.newBuilder()
154                     .setName("grpc.reflection.testing.AnotherDynamicService")
155                     .build())));
156 
157     handlerRegistry.removeService(anotherDynamicService);
158     assertServiceResponseEquals(originalServices);
159   }
160 
161   @Test
fileByFilename()162   public void fileByFilename() throws Exception {
163     ServerReflectionRequest request =
164         ServerReflectionRequest.newBuilder()
165             .setHost(TEST_HOST)
166             .setFileByFilename("io/grpc/reflection/testing/reflection_test_depth_three.proto")
167             .build();
168 
169     ServerReflectionResponse goldenResponse =
170         ServerReflectionResponse.newBuilder()
171             .setValidHost(TEST_HOST)
172             .setOriginalRequest(request)
173             .setFileDescriptorResponse(
174                 FileDescriptorResponse.newBuilder()
175                     .addFileDescriptorProto(
176                         ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
177                     .build())
178             .build();
179 
180     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
181     StreamObserver<ServerReflectionRequest> requestObserver =
182         stub.serverReflectionInfo(responseObserver);
183     requestObserver.onNext(request);
184     requestObserver.onCompleted();
185 
186     assertEquals(goldenResponse, responseObserver.firstValue().get());
187   }
188 
189   @Test
fileByFilenameConsistentForMutableServices()190   public void fileByFilenameConsistentForMutableServices() throws Exception {
191     ServerReflectionRequest request =
192         ServerReflectionRequest.newBuilder()
193             .setHost(TEST_HOST)
194             .setFileByFilename("io/grpc/reflection/testing/dynamic_reflection_test_depth_two.proto")
195             .build();
196     ServerReflectionResponse goldenResponse =
197         ServerReflectionResponse.newBuilder()
198             .setValidHost(TEST_HOST)
199             .setOriginalRequest(request)
200             .setFileDescriptorResponse(
201                 FileDescriptorResponse.newBuilder()
202                     .addFileDescriptorProto(
203                         DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
204                     .build())
205             .build();
206 
207     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
208     StreamObserver<ServerReflectionRequest> requestObserver =
209         stub.serverReflectionInfo(responseObserver);
210     handlerRegistry.addService(dynamicService);
211     requestObserver.onNext(request);
212     requestObserver.onCompleted();
213     StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
214     StreamObserver<ServerReflectionRequest> requestObserver2 =
215         stub.serverReflectionInfo(responseObserver2);
216     handlerRegistry.removeService(dynamicService);
217     requestObserver2.onNext(request);
218     requestObserver2.onCompleted();
219     StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
220     StreamObserver<ServerReflectionRequest> requestObserver3 =
221         stub.serverReflectionInfo(responseObserver3);
222     requestObserver3.onNext(request);
223     requestObserver3.onCompleted();
224 
225     assertEquals(
226         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
227         responseObserver.firstValue().get().getMessageResponseCase());
228     assertEquals(goldenResponse, responseObserver2.firstValue().get());
229     assertEquals(
230         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
231         responseObserver3.firstValue().get().getMessageResponseCase());
232   }
233 
234   @Test
fileContainingSymbol()235   public void fileContainingSymbol() throws Exception {
236     ServerReflectionRequest request =
237         ServerReflectionRequest.newBuilder()
238             .setHost(TEST_HOST)
239             .setFileContainingSymbol("grpc.reflection.testing.ReflectableService.Method")
240             .build();
241 
242     List<ByteString> goldenResponse =
243         Arrays.asList(
244             ReflectionTestProto.getDescriptor().toProto().toByteString(),
245             ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
246             ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
247             ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
248 
249     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
250     StreamObserver<ServerReflectionRequest> requestObserver =
251         stub.serverReflectionInfo(responseObserver);
252     requestObserver.onNext(request);
253     requestObserver.onCompleted();
254 
255     List<ByteString> response =
256         responseObserver
257             .firstValue()
258             .get()
259             .getFileDescriptorResponse()
260             .getFileDescriptorProtoList();
261     assertEquals(goldenResponse.size(), response.size());
262     assertEquals(new HashSet<>(goldenResponse), new HashSet<>(response));
263   }
264 
265   @Test
fileContainingNestedSymbol()266   public void fileContainingNestedSymbol() throws Exception {
267     ServerReflectionRequest request =
268         ServerReflectionRequest.newBuilder()
269             .setHost(TEST_HOST)
270             .setFileContainingSymbol("grpc.reflection.testing.NestedTypeOuter.Middle.Inner")
271             .build();
272 
273     ServerReflectionResponse goldenResponse =
274         ServerReflectionResponse.newBuilder()
275             .setValidHost(TEST_HOST)
276             .setOriginalRequest(request)
277             .setFileDescriptorResponse(
278                 FileDescriptorResponse.newBuilder()
279                     .addFileDescriptorProto(
280                         ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
281                     .build())
282             .build();
283 
284     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
285     StreamObserver<ServerReflectionRequest> requestObserver =
286         stub.serverReflectionInfo(responseObserver);
287     requestObserver.onNext(request);
288     requestObserver.onCompleted();
289     assertEquals(goldenResponse, responseObserver.firstValue().get());
290   }
291 
292   @Test
fileContainingSymbolForMutableServices()293   public void fileContainingSymbolForMutableServices() throws Exception {
294     ServerReflectionRequest request =
295         ServerReflectionRequest.newBuilder()
296             .setHost(TEST_HOST)
297             .setFileContainingSymbol("grpc.reflection.testing.DynamicRequest")
298             .build();
299     ServerReflectionResponse goldenResponse =
300         ServerReflectionResponse.newBuilder()
301             .setValidHost(TEST_HOST)
302             .setOriginalRequest(request)
303             .setFileDescriptorResponse(
304                 FileDescriptorResponse.newBuilder()
305                     .addFileDescriptorProto(
306                         DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
307                     .build())
308             .build();
309 
310     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
311     StreamObserver<ServerReflectionRequest> requestObserver =
312         stub.serverReflectionInfo(responseObserver);
313     handlerRegistry.addService(dynamicService);
314     requestObserver.onNext(request);
315     requestObserver.onCompleted();
316     StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
317     StreamObserver<ServerReflectionRequest> requestObserver2 =
318         stub.serverReflectionInfo(responseObserver2);
319     handlerRegistry.removeService(dynamicService);
320     requestObserver2.onNext(request);
321     requestObserver2.onCompleted();
322     StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
323     StreamObserver<ServerReflectionRequest> requestObserver3 =
324         stub.serverReflectionInfo(responseObserver3);
325     requestObserver3.onNext(request);
326     requestObserver3.onCompleted();
327 
328     assertEquals(
329         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
330         responseObserver.firstValue().get().getMessageResponseCase());
331     assertEquals(goldenResponse, responseObserver2.firstValue().get());
332     assertEquals(
333         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
334         responseObserver3.firstValue().get().getMessageResponseCase());
335   }
336 
337   @Test
fileContainingExtension()338   public void fileContainingExtension() throws Exception {
339     ServerReflectionRequest request =
340         ServerReflectionRequest.newBuilder()
341             .setHost(TEST_HOST)
342             .setFileContainingExtension(
343                 ExtensionRequest.newBuilder()
344                     .setContainingType("grpc.reflection.testing.ThirdLevelType")
345                     .setExtensionNumber(100)
346                     .build())
347             .build();
348 
349     List<ByteString> goldenResponse =
350         Arrays.asList(
351             ReflectionTestProto.getDescriptor().toProto().toByteString(),
352             ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
353             ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
354             ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
355 
356     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
357     StreamObserver<ServerReflectionRequest> requestObserver =
358         stub.serverReflectionInfo(responseObserver);
359     requestObserver.onNext(request);
360     requestObserver.onCompleted();
361 
362     List<ByteString> response =
363         responseObserver
364             .firstValue()
365             .get()
366             .getFileDescriptorResponse()
367             .getFileDescriptorProtoList();
368     assertEquals(goldenResponse.size(), response.size());
369     assertEquals(new HashSet<>(goldenResponse), new HashSet<>(response));
370   }
371 
372   @Test
fileContainingNestedExtension()373   public void fileContainingNestedExtension() throws Exception {
374     ServerReflectionRequest request =
375         ServerReflectionRequest.newBuilder()
376             .setHost(TEST_HOST)
377             .setFileContainingExtension(
378                 ExtensionRequest.newBuilder()
379                     .setContainingType("grpc.reflection.testing.ThirdLevelType")
380                     .setExtensionNumber(101)
381                     .build())
382             .build();
383 
384     ServerReflectionResponse goldenResponse =
385         ServerReflectionResponse.newBuilder()
386             .setValidHost(TEST_HOST)
387             .setOriginalRequest(request)
388             .setFileDescriptorResponse(
389                 FileDescriptorResponse.newBuilder()
390                     .addFileDescriptorProto(
391                         ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
392                     .addFileDescriptorProto(
393                         ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
394                     .build())
395             .build();
396 
397     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
398     StreamObserver<ServerReflectionRequest> requestObserver =
399         stub.serverReflectionInfo(responseObserver);
400     requestObserver.onNext(request);
401     requestObserver.onCompleted();
402     assertEquals(goldenResponse, responseObserver.firstValue().get());
403   }
404 
405   @Test
fileContainingExtensionForMutableServices()406   public void fileContainingExtensionForMutableServices() throws Exception {
407     ServerReflectionRequest request =
408         ServerReflectionRequest.newBuilder()
409             .setHost(TEST_HOST)
410             .setFileContainingExtension(
411                 ExtensionRequest.newBuilder()
412                     .setContainingType("grpc.reflection.testing.TypeWithExtensions")
413                     .setExtensionNumber(200)
414                     .build())
415             .build();
416     ServerReflectionResponse goldenResponse =
417         ServerReflectionResponse.newBuilder()
418             .setValidHost(TEST_HOST)
419             .setOriginalRequest(request)
420             .setFileDescriptorResponse(
421                 FileDescriptorResponse.newBuilder()
422                     .addFileDescriptorProto(
423                         DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
424                     .build())
425             .build();
426 
427     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
428     StreamObserver<ServerReflectionRequest> requestObserver =
429         stub.serverReflectionInfo(responseObserver);
430     handlerRegistry.addService(dynamicService);
431     requestObserver.onNext(request);
432     requestObserver.onCompleted();
433     StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
434     StreamObserver<ServerReflectionRequest> requestObserver2 =
435         stub.serverReflectionInfo(responseObserver2);
436     handlerRegistry.removeService(dynamicService);
437     requestObserver2.onNext(request);
438     requestObserver2.onCompleted();
439     StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
440     StreamObserver<ServerReflectionRequest> requestObserver3 =
441         stub.serverReflectionInfo(responseObserver3);
442     requestObserver3.onNext(request);
443     requestObserver3.onCompleted();
444 
445     assertEquals(
446         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
447         responseObserver.firstValue().get().getMessageResponseCase());
448     assertEquals(goldenResponse, responseObserver2.firstValue().get());
449     assertEquals(
450         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
451         responseObserver3.firstValue().get().getMessageResponseCase());
452   }
453 
454   @Test
allExtensionNumbersOfType()455   public void allExtensionNumbersOfType() throws Exception {
456     ServerReflectionRequest request =
457         ServerReflectionRequest.newBuilder()
458             .setHost(TEST_HOST)
459             .setAllExtensionNumbersOfType("grpc.reflection.testing.ThirdLevelType")
460             .build();
461 
462     Set<Integer> goldenResponse = new HashSet<>(Arrays.asList(100, 101));
463 
464     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
465     StreamObserver<ServerReflectionRequest> requestObserver =
466         stub.serverReflectionInfo(responseObserver);
467     requestObserver.onNext(request);
468     requestObserver.onCompleted();
469     Set<Integer> extensionNumberResponseSet =
470         new HashSet<>(
471             responseObserver
472                 .firstValue()
473                 .get()
474                 .getAllExtensionNumbersResponse()
475                 .getExtensionNumberList());
476     assertEquals(goldenResponse, extensionNumberResponseSet);
477   }
478 
479   @Test
allExtensionNumbersOfTypeForMutableServices()480   public void allExtensionNumbersOfTypeForMutableServices() throws Exception {
481     String type = "grpc.reflection.testing.TypeWithExtensions";
482     ServerReflectionRequest request =
483         ServerReflectionRequest.newBuilder()
484             .setHost(TEST_HOST)
485             .setAllExtensionNumbersOfType(type)
486             .build();
487     ServerReflectionResponse goldenResponse =
488         ServerReflectionResponse.newBuilder()
489             .setValidHost(TEST_HOST)
490             .setOriginalRequest(request)
491             .setAllExtensionNumbersResponse(
492                 ExtensionNumberResponse.newBuilder()
493                     .setBaseTypeName(type)
494                     .addExtensionNumber(200)
495                     .build())
496             .build();
497 
498     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
499     StreamObserver<ServerReflectionRequest> requestObserver =
500         stub.serverReflectionInfo(responseObserver);
501     handlerRegistry.addService(dynamicService);
502     requestObserver.onNext(request);
503     requestObserver.onCompleted();
504     StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
505     StreamObserver<ServerReflectionRequest> requestObserver2 =
506         stub.serverReflectionInfo(responseObserver2);
507     handlerRegistry.removeService(dynamicService);
508     requestObserver2.onNext(request);
509     requestObserver2.onCompleted();
510     StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
511     StreamObserver<ServerReflectionRequest> requestObserver3 =
512         stub.serverReflectionInfo(responseObserver3);
513     requestObserver3.onNext(request);
514     requestObserver3.onCompleted();
515 
516     assertEquals(
517         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
518         responseObserver.firstValue().get().getMessageResponseCase());
519     assertEquals(goldenResponse, responseObserver2.firstValue().get());
520     assertEquals(
521         ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
522         responseObserver3.firstValue().get().getMessageResponseCase());
523   }
524 
525   @Test
sharedServiceBetweenServers()526   public void sharedServiceBetweenServers()
527       throws IOException, ExecutionException, InterruptedException {
528     Server anotherServer = InProcessServerBuilder.forName("proto-reflection-test-2")
529         .directExecutor()
530         .addService(reflectionService)
531         .addService(new AnotherReflectableServiceGrpc.AnotherReflectableServiceImplBase() {})
532         .build()
533         .start();
534     grpcCleanupRule.register(anotherServer);
535     ManagedChannel anotherChannel = grpcCleanupRule.register(
536         InProcessChannelBuilder.forName("proto-reflection-test-2").directExecutor().build());
537     ServerReflectionGrpc.ServerReflectionStub stub2 = ServerReflectionGrpc.newStub(anotherChannel);
538 
539     ServerReflectionRequest request =
540         ServerReflectionRequest.newBuilder().setHost(TEST_HOST).setListServices("services").build();
541     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
542     StreamObserver<ServerReflectionRequest> requestObserver =
543         stub2.serverReflectionInfo(responseObserver);
544     requestObserver.onNext(request);
545     requestObserver.onCompleted();
546     List<ServiceResponse> response =
547         responseObserver.firstValue().get().getListServicesResponse().getServiceList();
548     assertEquals(new HashSet<>(
549         Arrays.asList(
550             ServiceResponse.newBuilder()
551                 .setName("grpc.reflection.v1alpha.ServerReflection")
552                 .build(),
553             ServiceResponse.newBuilder()
554                 .setName("grpc.reflection.testing.AnotherReflectableService")
555                 .build())),
556         new HashSet<>(response));
557   }
558 
559   @Test
flowControl()560   public void flowControl() throws Exception {
561     FlowControlClientResponseObserver clientResponseObserver =
562         new FlowControlClientResponseObserver();
563     ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
564         (ClientCallStreamObserver<ServerReflectionRequest>)
565             stub.serverReflectionInfo(clientResponseObserver);
566 
567     // Verify we don't receive a response until we request it.
568     requestObserver.onNext(flowControlRequest);
569     assertEquals(0, clientResponseObserver.getResponses().size());
570 
571     requestObserver.request(1);
572     assertEquals(1, clientResponseObserver.getResponses().size());
573     assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
574 
575     // Verify we don't receive an additional response until we request it.
576     requestObserver.onNext(flowControlRequest);
577     assertEquals(1, clientResponseObserver.getResponses().size());
578 
579     requestObserver.request(1);
580     assertEquals(2, clientResponseObserver.getResponses().size());
581     assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
582 
583     requestObserver.onCompleted();
584     assertTrue(clientResponseObserver.onCompleteCalled());
585   }
586 
587   @Test
flowControlOnCompleteWithPendingRequest()588   public void flowControlOnCompleteWithPendingRequest() throws Exception {
589     FlowControlClientResponseObserver clientResponseObserver =
590         new FlowControlClientResponseObserver();
591     ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
592         (ClientCallStreamObserver<ServerReflectionRequest>)
593             stub.serverReflectionInfo(clientResponseObserver);
594 
595     requestObserver.onNext(flowControlRequest);
596     requestObserver.onCompleted();
597     assertEquals(0, clientResponseObserver.getResponses().size());
598     assertFalse(clientResponseObserver.onCompleteCalled());
599 
600     requestObserver.request(1);
601     assertTrue(clientResponseObserver.onCompleteCalled());
602     assertEquals(1, clientResponseObserver.getResponses().size());
603     assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
604   }
605 
606   private final ServerReflectionRequest flowControlRequest =
607       ServerReflectionRequest.newBuilder()
608           .setHost(TEST_HOST)
609           .setFileByFilename("io/grpc/reflection/testing/reflection_test_depth_three.proto")
610           .build();
611   private final ServerReflectionResponse flowControlGoldenResponse =
612       ServerReflectionResponse.newBuilder()
613           .setValidHost(TEST_HOST)
614           .setOriginalRequest(flowControlRequest)
615           .setFileDescriptorResponse(
616               FileDescriptorResponse.newBuilder()
617                   .addFileDescriptorProto(
618                       ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
619                   .build())
620           .build();
621 
622   private static class FlowControlClientResponseObserver
623       implements ClientResponseObserver<ServerReflectionRequest, ServerReflectionResponse> {
624     private final List<ServerReflectionResponse> responses =
625         new ArrayList<>();
626     private boolean onCompleteCalled = false;
627 
628     @Override
beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream)629     public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
630       requestStream.disableAutoRequestWithInitial(0);
631     }
632 
633     @Override
onNext(ServerReflectionResponse value)634     public void onNext(ServerReflectionResponse value) {
635       responses.add(value);
636     }
637 
638     @Override
onError(Throwable t)639     public void onError(Throwable t) {
640       fail("onError called");
641     }
642 
643     @Override
onCompleted()644     public void onCompleted() {
645       onCompleteCalled = true;
646     }
647 
getResponses()648     public List<ServerReflectionResponse> getResponses() {
649       return responses;
650     }
651 
onCompleteCalled()652     public boolean onCompleteCalled() {
653       return onCompleteCalled;
654     }
655   }
656 
assertServiceResponseEquals(Set<ServiceResponse> goldenResponse)657   private void assertServiceResponseEquals(Set<ServiceResponse> goldenResponse) throws Exception {
658     ServerReflectionRequest request =
659         ServerReflectionRequest.newBuilder().setHost(TEST_HOST).setListServices("services").build();
660     StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
661     StreamObserver<ServerReflectionRequest> requestObserver =
662         stub.serverReflectionInfo(responseObserver);
663     requestObserver.onNext(request);
664     requestObserver.onCompleted();
665     List<ServiceResponse> response =
666         responseObserver.firstValue().get().getListServicesResponse().getServiceList();
667     assertEquals(goldenResponse.size(), response.size());
668     assertEquals(goldenResponse, new HashSet<>(response));
669   }
670 }
671