• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
21 import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE;
22 import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
23 import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
24 import static org.junit.Assert.assertEquals;
25 import static org.junit.Assert.assertFalse;
26 import static org.junit.Assert.assertNotEquals;
27 import static org.junit.Assert.assertNotNull;
28 import static org.junit.Assert.assertNull;
29 import static org.junit.Assert.assertTrue;
30 import static org.junit.Assert.fail;
31 import static org.mockito.Mockito.mock;
32 import static org.mockito.Mockito.timeout;
33 
34 import com.google.auth.oauth2.AccessToken;
35 import com.google.auth.oauth2.ComputeEngineCredentials;
36 import com.google.auth.oauth2.GoogleCredentials;
37 import com.google.auth.oauth2.OAuth2Credentials;
38 import com.google.auth.oauth2.ServiceAccountCredentials;
39 import com.google.common.annotations.VisibleForTesting;
40 import com.google.common.base.Throwables;
41 import com.google.common.collect.ImmutableList;
42 import com.google.common.collect.Lists;
43 import com.google.common.io.ByteStreams;
44 import com.google.common.util.concurrent.SettableFuture;
45 import com.google.protobuf.BoolValue;
46 import com.google.protobuf.ByteString;
47 import com.google.protobuf.MessageLite;
48 import io.grpc.CallOptions;
49 import io.grpc.Channel;
50 import io.grpc.ClientCall;
51 import io.grpc.ClientInterceptor;
52 import io.grpc.ClientInterceptors;
53 import io.grpc.ClientStreamTracer;
54 import io.grpc.Context;
55 import io.grpc.Grpc;
56 import io.grpc.ManagedChannel;
57 import io.grpc.Metadata;
58 import io.grpc.MethodDescriptor;
59 import io.grpc.Server;
60 import io.grpc.ServerCall;
61 import io.grpc.ServerCallHandler;
62 import io.grpc.ServerInterceptor;
63 import io.grpc.ServerInterceptors;
64 import io.grpc.ServerStreamTracer;
65 import io.grpc.Status;
66 import io.grpc.StatusRuntimeException;
67 import io.grpc.auth.MoreCallCredentials;
68 import io.grpc.internal.AbstractServerImplBuilder;
69 import io.grpc.internal.CensusStatsModule;
70 import io.grpc.internal.GrpcUtil;
71 import io.grpc.internal.testing.StatsTestUtils;
72 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
73 import io.grpc.internal.testing.StatsTestUtils.FakeTagContext;
74 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
75 import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
76 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
77 import io.grpc.internal.testing.StreamRecorder;
78 import io.grpc.internal.testing.TestClientStreamTracer;
79 import io.grpc.internal.testing.TestServerStreamTracer;
80 import io.grpc.internal.testing.TestStreamTracer;
81 import io.grpc.stub.ClientCallStreamObserver;
82 import io.grpc.stub.ClientCalls;
83 import io.grpc.stub.MetadataUtils;
84 import io.grpc.stub.StreamObserver;
85 import io.grpc.testing.TestUtils;
86 import io.grpc.testing.integration.EmptyProtos.Empty;
87 import io.grpc.testing.integration.Messages.EchoStatus;
88 import io.grpc.testing.integration.Messages.Payload;
89 import io.grpc.testing.integration.Messages.PayloadType;
90 import io.grpc.testing.integration.Messages.ResponseParameters;
91 import io.grpc.testing.integration.Messages.SimpleRequest;
92 import io.grpc.testing.integration.Messages.SimpleResponse;
93 import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
94 import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
95 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
96 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
97 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
98 import io.opencensus.tags.TagKey;
99 import io.opencensus.tags.TagValue;
100 import io.opencensus.trace.Span;
101 import io.opencensus.trace.SpanContext;
102 import io.opencensus.trace.Tracing;
103 import io.opencensus.trace.unsafe.ContextUtils;
104 import java.io.ByteArrayInputStream;
105 import java.io.ByteArrayOutputStream;
106 import java.io.IOException;
107 import java.io.InputStream;
108 import java.net.SocketAddress;
109 import java.security.cert.Certificate;
110 import java.security.cert.X509Certificate;
111 import java.util.ArrayList;
112 import java.util.Arrays;
113 import java.util.Collection;
114 import java.util.Collections;
115 import java.util.List;
116 import java.util.Map;
117 import java.util.concurrent.ArrayBlockingQueue;
118 import java.util.concurrent.Executors;
119 import java.util.concurrent.LinkedBlockingQueue;
120 import java.util.concurrent.ScheduledExecutorService;
121 import java.util.concurrent.TimeUnit;
122 import java.util.concurrent.atomic.AtomicReference;
123 import java.util.logging.Level;
124 import java.util.logging.Logger;
125 import javax.annotation.Nullable;
126 import javax.net.ssl.SSLPeerUnverifiedException;
127 import javax.net.ssl.SSLSession;
128 import org.junit.After;
129 import org.junit.Assert;
130 import org.junit.Assume;
131 import org.junit.Before;
132 import org.junit.Rule;
133 import org.junit.Test;
134 import org.junit.rules.Timeout;
135 import org.mockito.ArgumentCaptor;
136 import org.mockito.Mockito;
137 import org.mockito.verification.VerificationMode;
138 
139 /**
140  * Abstract base class for all GRPC transport tests.
141  *
142  * <p> New tests should avoid using Mockito to support running on AppEngine.</p>
143  */
144 public abstract class AbstractInteropTest {
145   private static Logger logger = Logger.getLogger(AbstractInteropTest.class.getName());
146 
147   @Rule public final Timeout globalTimeout = Timeout.seconds(30);
148 
149   /** Must be at least {@link #unaryPayloadLength()}, plus some to account for encoding overhead. */
150   public static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
151 
152   private static final FakeTagger tagger = new FakeTagger();
153   private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
154       new FakeTagContextBinarySerializer();
155 
156   private final AtomicReference<ServerCall<?, ?>> serverCallCapture =
157       new AtomicReference<ServerCall<?, ?>>();
158   private final AtomicReference<Metadata> requestHeadersCapture =
159       new AtomicReference<Metadata>();
160   private final AtomicReference<Context> contextCapture =
161       new AtomicReference<Context>();
162   private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
163   private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder();
164 
165   private ScheduledExecutorService testServiceExecutor;
166   private Server server;
167 
168   private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
169       new LinkedBlockingQueue<ServerStreamTracerInfo>();
170 
171   private static final class ServerStreamTracerInfo {
172     final String fullMethodName;
173     final InteropServerStreamTracer tracer;
174 
ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer)175     ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer) {
176       this.fullMethodName = fullMethodName;
177       this.tracer = tracer;
178     }
179 
180     private static final class InteropServerStreamTracer extends TestServerStreamTracer {
181       private volatile Context contextCapture;
182 
183       @Override
filterContext(Context context)184       public Context filterContext(Context context) {
185         contextCapture = context;
186         return super.filterContext(context);
187       }
188     }
189   }
190 
191   private final ServerStreamTracer.Factory serverStreamTracerFactory =
192       new ServerStreamTracer.Factory() {
193         @Override
194         public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
195           ServerStreamTracerInfo.InteropServerStreamTracer tracer
196               = new ServerStreamTracerInfo.InteropServerStreamTracer();
197           serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer));
198           return tracer;
199         }
200       };
201 
202   protected static final Empty EMPTY = Empty.getDefaultInstance();
203 
startServer()204   private void startServer() {
205     AbstractServerImplBuilder<?> builder = getServerBuilder();
206     if (builder == null) {
207       server = null;
208       return;
209     }
210     testServiceExecutor = Executors.newScheduledThreadPool(2);
211 
212     List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder()
213         .add(recordServerCallInterceptor(serverCallCapture))
214         .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
215         .add(recordContextInterceptor(contextCapture))
216         .addAll(TestServiceImpl.interceptors())
217         .build();
218 
219     builder
220         .addService(
221             ServerInterceptors.intercept(
222                 new TestServiceImpl(testServiceExecutor),
223                 allInterceptors))
224         .addStreamTracerFactory(serverStreamTracerFactory);
225     io.grpc.internal.TestingAccessor.setStatsImplementation(
226         builder,
227         new CensusStatsModule(
228             tagger,
229             tagContextBinarySerializer,
230             serverStatsRecorder,
231             GrpcUtil.STOPWATCH_SUPPLIER,
232             true));
233     try {
234       server = builder.build().start();
235     } catch (IOException ex) {
236       throw new RuntimeException(ex);
237     }
238   }
239 
stopServer()240   private void stopServer() {
241     if (server != null) {
242       server.shutdownNow();
243     }
244     if (testServiceExecutor != null) {
245       testServiceExecutor.shutdown();
246     }
247   }
248 
249   @VisibleForTesting
getPort()250   final int getPort() {
251     return server.getPort();
252   }
253 
254   protected ManagedChannel channel;
255   protected TestServiceGrpc.TestServiceBlockingStub blockingStub;
256   protected TestServiceGrpc.TestServiceStub asyncStub;
257 
258   private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers =
259       new LinkedBlockingQueue<TestClientStreamTracer>();
260 
261   private final ClientStreamTracer.Factory clientStreamTracerFactory =
262       new ClientStreamTracer.Factory() {
263         @Override
264         public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
265           TestClientStreamTracer tracer = new TestClientStreamTracer();
266           clientStreamTracers.add(tracer);
267           return tracer;
268         }
269       };
270   private final ClientInterceptor tracerSetupInterceptor = new ClientInterceptor() {
271         @Override
272         public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
273             MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
274           return next.newCall(
275               method, callOptions.withStreamTracerFactory(clientStreamTracerFactory));
276         }
277       };
278 
279   /**
280    * Must be called by the subclass setup method if overridden.
281    */
282   @Before
setUp()283   public void setUp() {
284     startServer();
285     channel = createChannel();
286 
287     blockingStub =
288         TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor);
289     asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor);
290 
291     ClientInterceptor[] additionalInterceptors = getAdditionalInterceptors();
292     if (additionalInterceptors != null) {
293       blockingStub = blockingStub.withInterceptors(additionalInterceptors);
294       asyncStub = asyncStub.withInterceptors(additionalInterceptors);
295     }
296 
297     requestHeadersCapture.set(null);
298   }
299 
300   /** Clean up. */
301   @After
tearDown()302   public void tearDown() {
303     if (channel != null) {
304       channel.shutdownNow();
305       try {
306         channel.awaitTermination(1, TimeUnit.SECONDS);
307       } catch (InterruptedException ie) {
308         logger.log(Level.FINE, "Interrupted while waiting for channel termination", ie);
309         // Best effort. If there is an interruption, we want to continue cleaning up, but quickly
310         Thread.currentThread().interrupt();
311       }
312     }
313     stopServer();
314   }
315 
createChannel()316   protected abstract ManagedChannel createChannel();
317 
318   @Nullable
getAdditionalInterceptors()319   protected ClientInterceptor[] getAdditionalInterceptors() {
320     return null;
321   }
322 
323   /**
324    * Returns the server builder used to create server for each test run.  Return {@code null} if
325    * it shouldn't start a server in the same process.
326    */
327   @Nullable
getServerBuilder()328   protected AbstractServerImplBuilder<?> getServerBuilder() {
329     return null;
330   }
331 
createClientCensusStatsModule()332   protected final CensusStatsModule createClientCensusStatsModule() {
333     return new CensusStatsModule(
334         tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true);
335   }
336 
337   /**
338    * Return true if exact metric values should be checked.
339    */
metricsExpected()340   protected boolean metricsExpected() {
341     return true;
342   }
343 
344   @Test
emptyUnary()345   public void emptyUnary() throws Exception {
346     assertEquals(EMPTY, blockingStub.emptyCall(EMPTY));
347   }
348 
349   /** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
cacheableUnary()350   public void cacheableUnary() {
351     // Set safe to true.
352     MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
353         TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
354     // Set fake user IP since some proxies (GFE) won't cache requests from localhost.
355     Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
356     Metadata metadata = new Metadata();
357     metadata.put(userIpKey, "1.2.3.4");
358     Channel channelWithUserIpKey =
359         ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
360     SimpleRequest requests1And2 =
361         SimpleRequest.newBuilder()
362             .setPayload(
363                 Payload.newBuilder()
364                     .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
365             .build();
366     SimpleRequest request3 =
367         SimpleRequest.newBuilder()
368             .setPayload(
369                 Payload.newBuilder()
370                     .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
371             .build();
372 
373     SimpleResponse response1 =
374         ClientCalls.blockingUnaryCall(
375             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
376     SimpleResponse response2 =
377         ClientCalls.blockingUnaryCall(
378             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
379     SimpleResponse response3 =
380         ClientCalls.blockingUnaryCall(
381             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
382 
383     assertEquals(response1, response2);
384     assertNotEquals(response1, response3);
385   }
386 
387   @Test
largeUnary()388   public void largeUnary() throws Exception {
389     assumeEnoughMemory();
390     final SimpleRequest request = SimpleRequest.newBuilder()
391         .setResponseSize(314159)
392         .setResponseType(PayloadType.COMPRESSABLE)
393         .setPayload(Payload.newBuilder()
394             .setBody(ByteString.copyFrom(new byte[271828])))
395         .build();
396     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
397         .setPayload(Payload.newBuilder()
398             .setType(PayloadType.COMPRESSABLE)
399             .setBody(ByteString.copyFrom(new byte[314159])))
400         .build();
401 
402     assertEquals(goldenResponse, blockingStub.unaryCall(request));
403 
404     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
405         Collections.singleton(request), Collections.singleton(goldenResponse));
406   }
407 
408   /**
409    * Tests client per-message compression for unary calls. The Java API does not support inspecting
410    * a message's compression level, so this is primarily intended to run against a gRPC C++ server.
411    */
clientCompressedUnary(boolean probe)412   public void clientCompressedUnary(boolean probe) throws Exception {
413     assumeEnoughMemory();
414     final SimpleRequest expectCompressedRequest =
415         SimpleRequest.newBuilder()
416             .setExpectCompressed(BoolValue.newBuilder().setValue(true))
417             .setResponseSize(314159)
418             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
419             .build();
420     final SimpleRequest expectUncompressedRequest =
421         SimpleRequest.newBuilder()
422             .setExpectCompressed(BoolValue.newBuilder().setValue(false))
423             .setResponseSize(314159)
424             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
425             .build();
426     final SimpleResponse goldenResponse =
427         SimpleResponse.newBuilder()
428             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
429             .build();
430 
431     if (probe) {
432       // Send a non-compressed message with expectCompress=true. Servers supporting this test case
433       // should return INVALID_ARGUMENT.
434       try {
435         blockingStub.unaryCall(expectCompressedRequest);
436         fail("expected INVALID_ARGUMENT");
437       } catch (StatusRuntimeException e) {
438         assertEquals(Status.INVALID_ARGUMENT.getCode(), e.getStatus().getCode());
439       }
440       assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT);
441     }
442 
443     assertEquals(
444         goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest));
445     assertStatsTrace(
446         "grpc.testing.TestService/UnaryCall",
447         Status.Code.OK,
448         Collections.singleton(expectCompressedRequest),
449         Collections.singleton(goldenResponse));
450 
451     assertEquals(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest));
452     assertStatsTrace(
453         "grpc.testing.TestService/UnaryCall",
454         Status.Code.OK,
455         Collections.singleton(expectUncompressedRequest),
456         Collections.singleton(goldenResponse));
457   }
458 
459   /**
460    * Tests if the server can send a compressed unary response. Ideally we would assert that the
461    * responses have the requested compression, but this is not supported by the API. Given a
462    * compliant server, this test will exercise the code path for receiving a compressed response but
463    * cannot itself verify that the response was compressed.
464    */
465   @Test
serverCompressedUnary()466   public void serverCompressedUnary() throws Exception {
467     assumeEnoughMemory();
468     final SimpleRequest responseShouldBeCompressed =
469         SimpleRequest.newBuilder()
470             .setResponseCompressed(BoolValue.newBuilder().setValue(true))
471             .setResponseSize(314159)
472             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
473             .build();
474     final SimpleRequest responseShouldBeUncompressed =
475         SimpleRequest.newBuilder()
476             .setResponseCompressed(BoolValue.newBuilder().setValue(false))
477             .setResponseSize(314159)
478             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
479             .build();
480     final SimpleResponse goldenResponse =
481         SimpleResponse.newBuilder()
482             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
483             .build();
484 
485     assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed));
486     assertStatsTrace(
487         "grpc.testing.TestService/UnaryCall",
488         Status.Code.OK,
489         Collections.singleton(responseShouldBeCompressed),
490         Collections.singleton(goldenResponse));
491 
492     assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed));
493     assertStatsTrace(
494         "grpc.testing.TestService/UnaryCall",
495         Status.Code.OK,
496         Collections.singleton(responseShouldBeUncompressed),
497         Collections.singleton(goldenResponse));
498   }
499 
500   @Test
serverStreaming()501   public void serverStreaming() throws Exception {
502     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
503         .setResponseType(PayloadType.COMPRESSABLE)
504         .addResponseParameters(ResponseParameters.newBuilder()
505             .setSize(31415))
506         .addResponseParameters(ResponseParameters.newBuilder()
507             .setSize(9))
508         .addResponseParameters(ResponseParameters.newBuilder()
509             .setSize(2653))
510         .addResponseParameters(ResponseParameters.newBuilder()
511             .setSize(58979))
512         .build();
513     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
514         StreamingOutputCallResponse.newBuilder()
515             .setPayload(Payload.newBuilder()
516                 .setType(PayloadType.COMPRESSABLE)
517                 .setBody(ByteString.copyFrom(new byte[31415])))
518             .build(),
519         StreamingOutputCallResponse.newBuilder()
520             .setPayload(Payload.newBuilder()
521                 .setType(PayloadType.COMPRESSABLE)
522                 .setBody(ByteString.copyFrom(new byte[9])))
523             .build(),
524         StreamingOutputCallResponse.newBuilder()
525             .setPayload(Payload.newBuilder()
526                 .setType(PayloadType.COMPRESSABLE)
527                 .setBody(ByteString.copyFrom(new byte[2653])))
528             .build(),
529         StreamingOutputCallResponse.newBuilder()
530             .setPayload(Payload.newBuilder()
531                 .setType(PayloadType.COMPRESSABLE)
532                 .setBody(ByteString.copyFrom(new byte[58979])))
533             .build());
534 
535     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
536     asyncStub.streamingOutputCall(request, recorder);
537     recorder.awaitCompletion();
538     assertSuccess(recorder);
539     assertEquals(goldenResponses, recorder.getValues());
540   }
541 
542   @Test
clientStreaming()543   public void clientStreaming() throws Exception {
544     final List<StreamingInputCallRequest> requests = Arrays.asList(
545         StreamingInputCallRequest.newBuilder()
546             .setPayload(Payload.newBuilder()
547                 .setBody(ByteString.copyFrom(new byte[27182])))
548             .build(),
549         StreamingInputCallRequest.newBuilder()
550             .setPayload(Payload.newBuilder()
551                 .setBody(ByteString.copyFrom(new byte[8])))
552             .build(),
553         StreamingInputCallRequest.newBuilder()
554             .setPayload(Payload.newBuilder()
555                 .setBody(ByteString.copyFrom(new byte[1828])))
556             .build(),
557         StreamingInputCallRequest.newBuilder()
558             .setPayload(Payload.newBuilder()
559                 .setBody(ByteString.copyFrom(new byte[45904])))
560             .build());
561     final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder()
562         .setAggregatedPayloadSize(74922)
563         .build();
564 
565     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
566     StreamObserver<StreamingInputCallRequest> requestObserver =
567         asyncStub.streamingInputCall(responseObserver);
568     for (StreamingInputCallRequest request : requests) {
569       requestObserver.onNext(request);
570     }
571     requestObserver.onCompleted();
572 
573     assertEquals(goldenResponse, responseObserver.firstValue().get());
574     responseObserver.awaitCompletion();
575     assertThat(responseObserver.getValues()).hasSize(1);
576     Throwable t = responseObserver.getError();
577     if (t != null) {
578       throw new AssertionError(t);
579     }
580   }
581 
582   /**
583    * Tests client per-message compression for streaming calls. The Java API does not support
584    * inspecting a message's compression level, so this is primarily intended to run against a gRPC
585    * C++ server.
586    */
clientCompressedStreaming(boolean probe)587   public void clientCompressedStreaming(boolean probe) throws Exception {
588     final StreamingInputCallRequest expectCompressedRequest =
589         StreamingInputCallRequest.newBuilder()
590             .setExpectCompressed(BoolValue.newBuilder().setValue(true))
591             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
592             .build();
593     final StreamingInputCallRequest expectUncompressedRequest =
594         StreamingInputCallRequest.newBuilder()
595             .setExpectCompressed(BoolValue.newBuilder().setValue(false))
596             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
597             .build();
598     final StreamingInputCallResponse goldenResponse =
599         StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
600 
601     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
602     StreamObserver<StreamingInputCallRequest> requestObserver =
603         asyncStub.streamingInputCall(responseObserver);
604 
605     if (probe) {
606       // Send a non-compressed message with expectCompress=true. Servers supporting this test case
607       // should return INVALID_ARGUMENT.
608       requestObserver.onNext(expectCompressedRequest);
609       responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
610       Throwable e = responseObserver.getError();
611       assertNotNull("expected INVALID_ARGUMENT", e);
612       assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
613     }
614 
615     // Start a new stream
616     responseObserver = StreamRecorder.create();
617     @SuppressWarnings("unchecked")
618     ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
619         (ClientCallStreamObserver)
620             asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
621     clientCallStreamObserver.setMessageCompression(true);
622     clientCallStreamObserver.onNext(expectCompressedRequest);
623     clientCallStreamObserver.setMessageCompression(false);
624     clientCallStreamObserver.onNext(expectUncompressedRequest);
625     clientCallStreamObserver.onCompleted();
626     responseObserver.awaitCompletion();
627     assertSuccess(responseObserver);
628     assertEquals(goldenResponse, responseObserver.firstValue().get());
629   }
630 
631   /**
632    * Tests server per-message compression in a streaming response. Ideally we would assert that the
633    * responses have the requested compression, but this is not supported by the API. Given a
634    * compliant server, this test will exercise the code path for receiving a compressed response but
635    * cannot itself verify that the response was compressed.
636    */
serverCompressedStreaming()637   public void serverCompressedStreaming() throws Exception {
638     final StreamingOutputCallRequest request =
639         StreamingOutputCallRequest.newBuilder()
640             .addResponseParameters(
641                 ResponseParameters.newBuilder()
642                     .setCompressed(BoolValue.newBuilder().setValue(true))
643                     .setSize(31415))
644             .addResponseParameters(
645                 ResponseParameters.newBuilder()
646                     .setCompressed(BoolValue.newBuilder().setValue(false))
647                     .setSize(92653))
648             .build();
649     final List<StreamingOutputCallResponse> goldenResponses =
650         Arrays.asList(
651             StreamingOutputCallResponse.newBuilder()
652                 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415])))
653                 .build(),
654             StreamingOutputCallResponse.newBuilder()
655                 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653])))
656                 .build());
657 
658     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
659     asyncStub.streamingOutputCall(request, recorder);
660     recorder.awaitCompletion();
661     assertSuccess(recorder);
662     assertEquals(goldenResponses, recorder.getValues());
663   }
664 
665   @Test
pingPong()666   public void pingPong() throws Exception {
667     final List<StreamingOutputCallRequest> requests = Arrays.asList(
668         StreamingOutputCallRequest.newBuilder()
669             .addResponseParameters(ResponseParameters.newBuilder()
670                 .setSize(31415))
671             .setPayload(Payload.newBuilder()
672                 .setBody(ByteString.copyFrom(new byte[27182])))
673             .build(),
674         StreamingOutputCallRequest.newBuilder()
675             .addResponseParameters(ResponseParameters.newBuilder()
676                 .setSize(9))
677             .setPayload(Payload.newBuilder()
678                 .setBody(ByteString.copyFrom(new byte[8])))
679             .build(),
680         StreamingOutputCallRequest.newBuilder()
681             .addResponseParameters(ResponseParameters.newBuilder()
682                 .setSize(2653))
683             .setPayload(Payload.newBuilder()
684                 .setBody(ByteString.copyFrom(new byte[1828])))
685             .build(),
686         StreamingOutputCallRequest.newBuilder()
687             .addResponseParameters(ResponseParameters.newBuilder()
688                 .setSize(58979))
689             .setPayload(Payload.newBuilder()
690                 .setBody(ByteString.copyFrom(new byte[45904])))
691             .build());
692     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
693         StreamingOutputCallResponse.newBuilder()
694             .setPayload(Payload.newBuilder()
695                 .setType(PayloadType.COMPRESSABLE)
696                 .setBody(ByteString.copyFrom(new byte[31415])))
697             .build(),
698         StreamingOutputCallResponse.newBuilder()
699             .setPayload(Payload.newBuilder()
700                 .setType(PayloadType.COMPRESSABLE)
701                 .setBody(ByteString.copyFrom(new byte[9])))
702             .build(),
703         StreamingOutputCallResponse.newBuilder()
704             .setPayload(Payload.newBuilder()
705                 .setType(PayloadType.COMPRESSABLE)
706                 .setBody(ByteString.copyFrom(new byte[2653])))
707             .build(),
708         StreamingOutputCallResponse.newBuilder()
709             .setPayload(Payload.newBuilder()
710                 .setType(PayloadType.COMPRESSABLE)
711                 .setBody(ByteString.copyFrom(new byte[58979])))
712             .build());
713 
714     final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(5);
715     StreamObserver<StreamingOutputCallRequest> requestObserver
716         = asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
717           @Override
718           public void onNext(StreamingOutputCallResponse response) {
719             queue.add(response);
720           }
721 
722           @Override
723           public void onError(Throwable t) {
724             queue.add(t);
725           }
726 
727           @Override
728           public void onCompleted() {
729             queue.add("Completed");
730           }
731         });
732     for (int i = 0; i < requests.size(); i++) {
733       assertNull(queue.peek());
734       requestObserver.onNext(requests.get(i));
735       Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
736       assertNotNull("Timed out waiting for response", actualResponse);
737       if (actualResponse instanceof Throwable) {
738         throw new AssertionError((Throwable) actualResponse);
739       }
740       assertEquals(goldenResponses.get(i), actualResponse);
741     }
742     requestObserver.onCompleted();
743     assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
744   }
745 
746   @Test
emptyStream()747   public void emptyStream() throws Exception {
748     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
749     StreamObserver<StreamingOutputCallRequest> requestObserver
750         = asyncStub.fullDuplexCall(responseObserver);
751     requestObserver.onCompleted();
752     responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
753   }
754 
755   @Test
cancelAfterBegin()756   public void cancelAfterBegin() throws Exception {
757     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
758     StreamObserver<StreamingInputCallRequest> requestObserver =
759         asyncStub.streamingInputCall(responseObserver);
760     requestObserver.onError(new RuntimeException());
761     responseObserver.awaitCompletion();
762     assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues());
763     assertEquals(Status.Code.CANCELLED,
764         Status.fromThrowable(responseObserver.getError()).getCode());
765 
766     if (metricsExpected()) {
767       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
768       checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall");
769       // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
770       // recorded.  The tracer stats rely on the stream being created, which is not always the case
771       // in this test.  Therefore we don't check the tracer stats.
772       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
773       checkEndTags(
774           clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
775           Status.CANCELLED.getCode());
776       // Do not check server-side metrics, because the status on the server side is undetermined.
777     }
778   }
779 
780   @Test
cancelAfterFirstResponse()781   public void cancelAfterFirstResponse() throws Exception {
782     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
783         .addResponseParameters(ResponseParameters.newBuilder()
784             .setSize(31415))
785         .setPayload(Payload.newBuilder()
786             .setBody(ByteString.copyFrom(new byte[27182])))
787         .build();
788     final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
789         .setPayload(Payload.newBuilder()
790             .setType(PayloadType.COMPRESSABLE)
791             .setBody(ByteString.copyFrom(new byte[31415])))
792         .build();
793 
794     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
795     StreamObserver<StreamingOutputCallRequest> requestObserver
796         = asyncStub.fullDuplexCall(responseObserver);
797     requestObserver.onNext(request);
798     assertEquals(goldenResponse, responseObserver.firstValue().get());
799     requestObserver.onError(new RuntimeException());
800     responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
801     assertEquals(1, responseObserver.getValues().size());
802     assertEquals(Status.Code.CANCELLED,
803                  Status.fromThrowable(responseObserver.getError()).getCode());
804 
805     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED);
806   }
807 
808   @Test
fullDuplexCallShouldSucceed()809   public void fullDuplexCallShouldSucceed() throws Exception {
810     // Build the request.
811     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
812     StreamingOutputCallRequest.Builder streamingOutputBuilder =
813         StreamingOutputCallRequest.newBuilder();
814     streamingOutputBuilder.setResponseType(COMPRESSABLE);
815     for (Integer size : responseSizes) {
816       streamingOutputBuilder.addResponseParameters(
817           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
818     }
819     final StreamingOutputCallRequest request = streamingOutputBuilder.build();
820 
821     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
822     StreamObserver<StreamingOutputCallRequest> requestStream =
823         asyncStub.fullDuplexCall(recorder);
824 
825     final int numRequests = 10;
826     List<StreamingOutputCallRequest> requests =
827         new ArrayList<>(numRequests);
828     for (int ix = numRequests; ix > 0; --ix) {
829       requests.add(request);
830       requestStream.onNext(request);
831     }
832     requestStream.onCompleted();
833     recorder.awaitCompletion();
834     assertSuccess(recorder);
835     assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
836     for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
837       StreamingOutputCallResponse response = recorder.getValues().get(ix);
838       assertEquals(COMPRESSABLE, response.getPayload().getType());
839       int length = response.getPayload().getBody().size();
840       int expectedSize = responseSizes.get(ix % responseSizes.size());
841       assertEquals("comparison failed at index " + ix, expectedSize, length);
842     }
843 
844     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests,
845         recorder.getValues());
846   }
847 
848   @Test
halfDuplexCallShouldSucceed()849   public void halfDuplexCallShouldSucceed() throws Exception {
850     // Build the request.
851     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
852     StreamingOutputCallRequest.Builder streamingOutputBuilder =
853         StreamingOutputCallRequest.newBuilder();
854     streamingOutputBuilder.setResponseType(COMPRESSABLE);
855     for (Integer size : responseSizes) {
856       streamingOutputBuilder.addResponseParameters(
857           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
858     }
859     final StreamingOutputCallRequest request = streamingOutputBuilder.build();
860 
861     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
862     StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
863 
864     final int numRequests = 10;
865     List<StreamingOutputCallRequest> requests =
866         new ArrayList<>(numRequests);
867     for (int ix = numRequests; ix > 0; --ix) {
868       requests.add(request);
869       requestStream.onNext(request);
870     }
871     requestStream.onCompleted();
872     recorder.awaitCompletion();
873     assertSuccess(recorder);
874     assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
875     for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
876       StreamingOutputCallResponse response = recorder.getValues().get(ix);
877       assertEquals(COMPRESSABLE, response.getPayload().getType());
878       int length = response.getPayload().getBody().size();
879       int expectedSize = responseSizes.get(ix % responseSizes.size());
880       assertEquals("comparison failed at index " + ix, expectedSize, length);
881     }
882   }
883 
884   @Test
serverStreamingShouldBeFlowControlled()885   public void serverStreamingShouldBeFlowControlled() throws Exception {
886     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
887         .setResponseType(COMPRESSABLE)
888         .addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
889         .addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
890         .build();
891     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
892         StreamingOutputCallResponse.newBuilder()
893             .setPayload(Payload.newBuilder()
894                 .setType(PayloadType.COMPRESSABLE)
895                 .setBody(ByteString.copyFrom(new byte[100000]))).build(),
896         StreamingOutputCallResponse.newBuilder()
897             .setPayload(Payload.newBuilder()
898                 .setType(PayloadType.COMPRESSABLE)
899                 .setBody(ByteString.copyFrom(new byte[100001]))).build());
900 
901     long start = System.nanoTime();
902 
903     final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(10);
904     ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
905         channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
906     call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
907       @Override
908       public void onHeaders(Metadata headers) {}
909 
910       @Override
911       public void onMessage(final StreamingOutputCallResponse message) {
912         queue.add(message);
913       }
914 
915       @Override
916       public void onClose(Status status, Metadata trailers) {
917         queue.add(status);
918       }
919     }, new Metadata());
920     call.sendMessage(request);
921     call.halfClose();
922 
923     // Time how long it takes to get the first response.
924     call.request(1);
925     assertEquals(goldenResponses.get(0),
926         queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
927     long firstCallDuration = System.nanoTime() - start;
928 
929     // Without giving additional flow control, make sure that we don't get another response. We wait
930     // until we are comfortable the next message isn't coming. We may have very low nanoTime
931     // resolution (like on Windows) or be using a testing, in-process transport where message
932     // handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time
933     // to at least 1ms.
934     assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS));
935 
936     // Make sure that everything still completes.
937     call.request(1);
938     assertEquals(goldenResponses.get(1),
939         queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
940     assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
941   }
942 
943   @Test
veryLargeRequest()944   public void veryLargeRequest() throws Exception {
945     assumeEnoughMemory();
946     final SimpleRequest request = SimpleRequest.newBuilder()
947         .setPayload(Payload.newBuilder()
948             .setType(PayloadType.COMPRESSABLE)
949             .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
950         .setResponseSize(10)
951         .setResponseType(PayloadType.COMPRESSABLE)
952         .build();
953     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
954         .setPayload(Payload.newBuilder()
955             .setType(PayloadType.COMPRESSABLE)
956             .setBody(ByteString.copyFrom(new byte[10])))
957         .build();
958 
959     assertEquals(goldenResponse, blockingStub.unaryCall(request));
960   }
961 
962   @Test
veryLargeResponse()963   public void veryLargeResponse() throws Exception {
964     assumeEnoughMemory();
965     final SimpleRequest request = SimpleRequest.newBuilder()
966         .setResponseSize(unaryPayloadLength())
967         .setResponseType(PayloadType.COMPRESSABLE)
968         .build();
969     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
970         .setPayload(Payload.newBuilder()
971             .setType(PayloadType.COMPRESSABLE)
972             .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
973         .build();
974 
975     assertEquals(goldenResponse, blockingStub.unaryCall(request));
976   }
977 
978   @Test
exchangeMetadataUnaryCall()979   public void exchangeMetadataUnaryCall() throws Exception {
980     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub;
981 
982     // Capture the metadata exchange
983     Metadata fixedHeaders = new Metadata();
984     // Send a context proto (as it's in the default extension registry)
985     Messages.SimpleContext contextValue =
986         Messages.SimpleContext.newBuilder().setValue("dog").build();
987     fixedHeaders.put(Util.METADATA_KEY, contextValue);
988     stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
989     // .. and expect it to be echoed back in trailers
990     AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
991     AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
992     stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
993 
994     assertNotNull(stub.emptyCall(EMPTY));
995 
996     // Assert that our side channel object is echoed back in both headers and trailers
997     Assert.assertEquals(contextValue, headersCapture.get().get(Util.METADATA_KEY));
998     Assert.assertEquals(contextValue, trailersCapture.get().get(Util.METADATA_KEY));
999   }
1000 
1001   @Test
exchangeMetadataStreamingCall()1002   public void exchangeMetadataStreamingCall() throws Exception {
1003     TestServiceGrpc.TestServiceStub stub = asyncStub;
1004 
1005     // Capture the metadata exchange
1006     Metadata fixedHeaders = new Metadata();
1007     // Send a context proto (as it's in the default extension registry)
1008     Messages.SimpleContext contextValue =
1009         Messages.SimpleContext.newBuilder().setValue("dog").build();
1010     fixedHeaders.put(Util.METADATA_KEY, contextValue);
1011     stub = MetadataUtils.attachHeaders(stub, fixedHeaders);
1012     // .. and expect it to be echoed back in trailers
1013     AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
1014     AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
1015     stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
1016 
1017     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
1018     Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
1019         Messages.StreamingOutputCallRequest.newBuilder();
1020     streamingOutputBuilder.setResponseType(COMPRESSABLE);
1021     for (Integer size : responseSizes) {
1022       streamingOutputBuilder.addResponseParameters(
1023           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
1024     }
1025     final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build();
1026 
1027     StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
1028     StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
1029         stub.fullDuplexCall(recorder);
1030 
1031     final int numRequests = 10;
1032     List<StreamingOutputCallRequest> requests =
1033         new ArrayList<>(numRequests);
1034 
1035     for (int ix = numRequests; ix > 0; --ix) {
1036       requests.add(request);
1037       requestStream.onNext(request);
1038     }
1039     requestStream.onCompleted();
1040     recorder.awaitCompletion();
1041     assertSuccess(recorder);
1042     org.junit.Assert.assertEquals(responseSizes.size() * numRequests, recorder.getValues().size());
1043 
1044     // Assert that our side channel object is echoed back in both headers and trailers
1045     Assert.assertEquals(contextValue, headersCapture.get().get(Util.METADATA_KEY));
1046     Assert.assertEquals(contextValue, trailersCapture.get().get(Util.METADATA_KEY));
1047   }
1048 
1049   @Test
sendsTimeoutHeader()1050   public void sendsTimeoutHeader() {
1051     Assume.assumeTrue("can not capture request headers on server side", server != null);
1052     long configuredTimeoutMinutes = 100;
1053     TestServiceGrpc.TestServiceBlockingStub stub =
1054         blockingStub.withDeadlineAfter(configuredTimeoutMinutes, TimeUnit.MINUTES);
1055     stub.emptyCall(EMPTY);
1056     long transferredTimeoutMinutes = TimeUnit.NANOSECONDS.toMinutes(
1057         requestHeadersCapture.get().get(GrpcUtil.TIMEOUT_KEY));
1058     Assert.assertTrue(
1059         "configuredTimeoutMinutes=" + configuredTimeoutMinutes
1060             + ", transferredTimeoutMinutes=" + transferredTimeoutMinutes,
1061         configuredTimeoutMinutes - transferredTimeoutMinutes >= 0
1062             && configuredTimeoutMinutes - transferredTimeoutMinutes <= 1);
1063   }
1064 
1065   @Test
deadlineNotExceeded()1066   public void deadlineNotExceeded() {
1067     // warm up the channel and JVM
1068     blockingStub.emptyCall(Empty.getDefaultInstance());
1069     blockingStub
1070         .withDeadlineAfter(10, TimeUnit.SECONDS)
1071         .streamingOutputCall(StreamingOutputCallRequest.newBuilder()
1072             .addResponseParameters(ResponseParameters.newBuilder()
1073                 .setIntervalUs(0))
1074                 .build()).next();
1075   }
1076 
1077   @Test
deadlineExceeded()1078   public void deadlineExceeded() throws Exception {
1079     // warm up the channel and JVM
1080     blockingStub.emptyCall(Empty.getDefaultInstance());
1081     TestServiceGrpc.TestServiceBlockingStub stub =
1082         blockingStub.withDeadlineAfter(10, TimeUnit.MILLISECONDS);
1083     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1084         .addResponseParameters(ResponseParameters.newBuilder()
1085             .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20)))
1086         .build();
1087     try {
1088       stub.streamingOutputCall(request).next();
1089       fail("Expected deadline to be exceeded");
1090     } catch (StatusRuntimeException ex) {
1091       assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode());
1092     }
1093 
1094     assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
1095     if (metricsExpected()) {
1096       // Stream may not have been created before deadline is exceeded, thus we don't test the tracer
1097       // stats.
1098       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1099       checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
1100       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1101       checkEndTags(
1102           clientEndRecord,
1103           "grpc.testing.TestService/StreamingOutputCall",
1104           Status.Code.DEADLINE_EXCEEDED);
1105       // Do not check server-side metrics, because the status on the server side is undetermined.
1106     }
1107   }
1108 
1109   @Test
deadlineExceededServerStreaming()1110   public void deadlineExceededServerStreaming() throws Exception {
1111     // warm up the channel and JVM
1112     blockingStub.emptyCall(Empty.getDefaultInstance());
1113     ResponseParameters.Builder responseParameters = ResponseParameters.newBuilder()
1114         .setSize(1)
1115         .setIntervalUs(10000);
1116     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1117         .setResponseType(PayloadType.COMPRESSABLE)
1118         .addResponseParameters(responseParameters)
1119         .addResponseParameters(responseParameters)
1120         .addResponseParameters(responseParameters)
1121         .addResponseParameters(responseParameters)
1122         .build();
1123     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
1124     asyncStub
1125         .withDeadlineAfter(30, TimeUnit.MILLISECONDS)
1126         .streamingOutputCall(request, recorder);
1127     recorder.awaitCompletion();
1128     assertEquals(Status.DEADLINE_EXCEEDED.getCode(),
1129         Status.fromThrowable(recorder.getError()).getCode());
1130     assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
1131     if (metricsExpected()) {
1132       // Stream may not have been created when deadline is exceeded, thus we don't check tracer
1133       // stats.
1134       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1135       checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall");
1136       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1137       checkEndTags(
1138           clientEndRecord,
1139           "grpc.testing.TestService/StreamingOutputCall",
1140           Status.Code.DEADLINE_EXCEEDED);
1141       // Do not check server-side metrics, because the status on the server side is undetermined.
1142     }
1143   }
1144 
1145   @Test
deadlineInPast()1146   public void deadlineInPast() throws Exception {
1147     // Test once with idle channel and once with active channel
1148     try {
1149       blockingStub
1150           .withDeadlineAfter(-10, TimeUnit.SECONDS)
1151           .emptyCall(Empty.getDefaultInstance());
1152       fail("Should have thrown");
1153     } catch (StatusRuntimeException ex) {
1154       assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
1155     }
1156 
1157     // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
1158     // recorded.  The tracer stats rely on the stream being created, which is not the case if
1159     // deadline is exceeded before the call is created. Therefore we don't check the tracer stats.
1160     if (metricsExpected()) {
1161       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1162       checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
1163       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1164       checkEndTags(
1165           clientEndRecord, "grpc.testing.TestService/EmptyCall",
1166           Status.DEADLINE_EXCEEDED.getCode());
1167     }
1168 
1169     // warm up the channel
1170     blockingStub.emptyCall(Empty.getDefaultInstance());
1171     try {
1172       blockingStub
1173           .withDeadlineAfter(-10, TimeUnit.SECONDS)
1174           .emptyCall(Empty.getDefaultInstance());
1175       fail("Should have thrown");
1176     } catch (StatusRuntimeException ex) {
1177       assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
1178     }
1179     assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
1180     if (metricsExpected()) {
1181       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1182       checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall");
1183       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1184       checkEndTags(
1185           clientEndRecord, "grpc.testing.TestService/EmptyCall",
1186           Status.DEADLINE_EXCEEDED.getCode());
1187     }
1188   }
1189 
1190   @Test
maxInboundSize_exact()1191   public void maxInboundSize_exact() {
1192     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1193         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1194         .build();
1195 
1196     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1197         TestServiceGrpc.getStreamingOutputCallMethod();
1198     ByteSizeMarshaller<StreamingOutputCallResponse> mar =
1199         new ByteSizeMarshaller<StreamingOutputCallResponse>(md.getResponseMarshaller());
1200     blockingServerStreamingCall(
1201         blockingStub.getChannel(),
1202         md.toBuilder(md.getRequestMarshaller(), mar).build(),
1203         blockingStub.getCallOptions(),
1204         request)
1205         .next();
1206 
1207     int size = mar.lastInSize;
1208 
1209     TestServiceGrpc.TestServiceBlockingStub stub =
1210         blockingStub.withMaxInboundMessageSize(size);
1211 
1212     stub.streamingOutputCall(request).next();
1213   }
1214 
1215   @Test
maxInboundSize_tooBig()1216   public void maxInboundSize_tooBig() {
1217     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1218         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1219         .build();
1220 
1221     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1222         TestServiceGrpc.getStreamingOutputCallMethod();
1223     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1224         new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
1225     blockingServerStreamingCall(
1226         blockingStub.getChannel(),
1227         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1228         blockingStub.getCallOptions(),
1229         request)
1230         .next();
1231 
1232     int size = mar.lastOutSize;
1233 
1234     TestServiceGrpc.TestServiceBlockingStub stub =
1235         blockingStub.withMaxInboundMessageSize(size - 1);
1236 
1237     try {
1238       stub.streamingOutputCall(request).next();
1239       fail();
1240     } catch (StatusRuntimeException ex) {
1241       Status s = ex.getStatus();
1242       assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.RESOURCE_EXHAUSTED);
1243       assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum");
1244     }
1245   }
1246 
1247   @Test
maxOutboundSize_exact()1248   public void maxOutboundSize_exact() {
1249     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1250         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1251         .build();
1252 
1253     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1254         TestServiceGrpc.getStreamingOutputCallMethod();
1255     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1256         new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
1257     blockingServerStreamingCall(
1258         blockingStub.getChannel(),
1259         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1260         blockingStub.getCallOptions(),
1261         request)
1262         .next();
1263 
1264     int size = mar.lastOutSize;
1265 
1266     TestServiceGrpc.TestServiceBlockingStub stub =
1267         blockingStub.withMaxOutboundMessageSize(size);
1268 
1269     stub.streamingOutputCall(request).next();
1270   }
1271 
1272   @Test
maxOutboundSize_tooBig()1273   public void maxOutboundSize_tooBig() {
1274     // set at least one field to ensure the size is non-zero.
1275     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1276         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1277         .build();
1278 
1279 
1280     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1281         TestServiceGrpc.getStreamingOutputCallMethod();
1282     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1283         new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller());
1284     blockingServerStreamingCall(
1285         blockingStub.getChannel(),
1286         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1287         blockingStub.getCallOptions(),
1288         request)
1289         .next();
1290 
1291     TestServiceGrpc.TestServiceBlockingStub stub =
1292         blockingStub.withMaxOutboundMessageSize(mar.lastOutSize - 1);
1293     try {
1294       stub.streamingOutputCall(request).next();
1295       fail();
1296     } catch (StatusRuntimeException ex) {
1297       Status s = ex.getStatus();
1298       assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.CANCELLED);
1299       assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large");
1300     }
1301   }
1302 
unaryPayloadLength()1303   protected int unaryPayloadLength() {
1304     // 10MiB.
1305     return 10485760;
1306   }
1307 
1308   @Test
gracefulShutdown()1309   public void gracefulShutdown() throws Exception {
1310     final List<StreamingOutputCallRequest> requests = Arrays.asList(
1311         StreamingOutputCallRequest.newBuilder()
1312             .addResponseParameters(ResponseParameters.newBuilder()
1313                 .setSize(3))
1314             .setPayload(Payload.newBuilder()
1315                 .setBody(ByteString.copyFrom(new byte[2])))
1316             .build(),
1317         StreamingOutputCallRequest.newBuilder()
1318             .addResponseParameters(ResponseParameters.newBuilder()
1319                 .setSize(1))
1320             .setPayload(Payload.newBuilder()
1321                 .setBody(ByteString.copyFrom(new byte[7])))
1322             .build(),
1323         StreamingOutputCallRequest.newBuilder()
1324             .addResponseParameters(ResponseParameters.newBuilder()
1325                 .setSize(4))
1326             .setPayload(Payload.newBuilder()
1327                 .setBody(ByteString.copyFrom(new byte[1])))
1328             .build());
1329     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
1330         StreamingOutputCallResponse.newBuilder()
1331             .setPayload(Payload.newBuilder()
1332                 .setType(PayloadType.COMPRESSABLE)
1333                 .setBody(ByteString.copyFrom(new byte[3])))
1334             .build(),
1335         StreamingOutputCallResponse.newBuilder()
1336             .setPayload(Payload.newBuilder()
1337                 .setType(PayloadType.COMPRESSABLE)
1338                 .setBody(ByteString.copyFrom(new byte[1])))
1339             .build(),
1340         StreamingOutputCallResponse.newBuilder()
1341             .setPayload(Payload.newBuilder()
1342                 .setType(PayloadType.COMPRESSABLE)
1343                 .setBody(ByteString.copyFrom(new byte[4])))
1344             .build());
1345 
1346     final ArrayBlockingQueue<StreamingOutputCallResponse> responses =
1347         new ArrayBlockingQueue<StreamingOutputCallResponse>(3);
1348     final SettableFuture<Void> completed = SettableFuture.create();
1349     final SettableFuture<Void> errorSeen = SettableFuture.create();
1350     StreamObserver<StreamingOutputCallResponse> responseObserver =
1351         new StreamObserver<StreamingOutputCallResponse>() {
1352 
1353           @Override
1354           public void onNext(StreamingOutputCallResponse value) {
1355             responses.add(value);
1356           }
1357 
1358           @Override
1359           public void onError(Throwable t) {
1360             errorSeen.set(null);
1361           }
1362 
1363           @Override
1364           public void onCompleted() {
1365             completed.set(null);
1366           }
1367         };
1368     StreamObserver<StreamingOutputCallRequest> requestObserver
1369         = asyncStub.fullDuplexCall(responseObserver);
1370     requestObserver.onNext(requests.get(0));
1371     assertEquals(
1372         goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1373     // Initiate graceful shutdown.
1374     channel.shutdown();
1375     requestObserver.onNext(requests.get(1));
1376     assertEquals(
1377         goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1378     // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't.
1379     requestObserver.onNext(requests.get(2));
1380     assertEquals(
1381         goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1382     assertFalse(completed.isDone());
1383     requestObserver.onCompleted();
1384     completed.get(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
1385     assertFalse(errorSeen.isDone());
1386   }
1387 
1388   @Test
customMetadata()1389   public void customMetadata() throws Exception {
1390     final int responseSize = 314159;
1391     final int requestSize = 271828;
1392     final SimpleRequest request = SimpleRequest.newBuilder()
1393         .setResponseSize(responseSize)
1394         .setResponseType(PayloadType.COMPRESSABLE)
1395         .setPayload(Payload.newBuilder()
1396             .setBody(ByteString.copyFrom(new byte[requestSize])))
1397         .build();
1398     final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
1399         .addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize))
1400         .setResponseType(PayloadType.COMPRESSABLE)
1401         .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize])))
1402         .build();
1403     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1404         .setPayload(Payload.newBuilder()
1405             .setType(PayloadType.COMPRESSABLE)
1406             .setBody(ByteString.copyFrom(new byte[responseSize])))
1407         .build();
1408     final StreamingOutputCallResponse goldenStreamingResponse =
1409         StreamingOutputCallResponse.newBuilder()
1410             .setPayload(Payload.newBuilder()
1411             .setType(PayloadType.COMPRESSABLE)
1412             .setBody(ByteString.copyFrom(new byte[responseSize])))
1413         .build();
1414     final byte[] trailingBytes =
1415         {(byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb};
1416 
1417     // Test UnaryCall
1418     Metadata metadata = new Metadata();
1419     metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value");
1420     metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
1421     TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub;
1422     blockingStub = MetadataUtils.attachHeaders(blockingStub, metadata);
1423     AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>();
1424     AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>();
1425     blockingStub = MetadataUtils.captureMetadata(blockingStub, headersCapture, trailersCapture);
1426     SimpleResponse response = blockingStub.unaryCall(request);
1427 
1428     assertEquals(goldenResponse, response);
1429     assertEquals("test_initial_metadata_value",
1430         headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
1431     assertTrue(
1432         Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY)));
1433     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
1434         Collections.singleton(request), Collections.singleton(goldenResponse));
1435 
1436     // Test FullDuplexCall
1437     metadata = new Metadata();
1438     metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value");
1439     metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
1440     TestServiceGrpc.TestServiceStub stub = asyncStub;
1441     stub = MetadataUtils.attachHeaders(stub, metadata);
1442     headersCapture = new AtomicReference<Metadata>();
1443     trailersCapture = new AtomicReference<Metadata>();
1444     stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture);
1445 
1446     StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
1447     StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
1448         stub.fullDuplexCall(recorder);
1449     requestStream.onNext(streamingRequest);
1450     requestStream.onCompleted();
1451     recorder.awaitCompletion();
1452 
1453     assertSuccess(recorder);
1454     assertEquals(goldenStreamingResponse, recorder.firstValue().get());
1455     assertEquals("test_initial_metadata_value",
1456         headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
1457     assertTrue(
1458         Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY)));
1459     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK,
1460         Collections.singleton(streamingRequest), Collections.singleton(goldenStreamingResponse));
1461   }
1462 
1463   @Test(timeout = 10000)
censusContextsPropagated()1464   public void censusContextsPropagated() {
1465     Assume.assumeTrue("Skip the test because server is not in the same process.", server != null);
1466     Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan();
1467     // A valid ID is guaranteed to be unique, so we can verify it is actually propagated.
1468     assertTrue(clientParentSpan.getContext().getTraceId().isValid());
1469     Context ctx =
1470         Context.ROOT.withValues(
1471             TAG_CONTEXT_KEY,
1472             tagger.emptyBuilder().put(
1473                 StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(),
1474             ContextUtils.CONTEXT_SPAN_KEY,
1475             clientParentSpan);
1476     Context origCtx = ctx.attach();
1477     try {
1478       blockingStub.unaryCall(SimpleRequest.getDefaultInstance());
1479       Context serverCtx = contextCapture.get();
1480       assertNotNull(serverCtx);
1481 
1482       FakeTagContext statsCtx = (FakeTagContext) TAG_CONTEXT_KEY.get(serverCtx);
1483       assertNotNull(statsCtx);
1484       Map<TagKey, TagValue> tags = statsCtx.getTags();
1485       boolean tagFound = false;
1486       for (Map.Entry<TagKey, TagValue> tag : tags.entrySet()) {
1487         if (tag.getKey().equals(StatsTestUtils.EXTRA_TAG)) {
1488           assertEquals(TagValue.create("extra value"), tag.getValue());
1489           tagFound = true;
1490         }
1491       }
1492       assertTrue("tag not found", tagFound);
1493 
1494       Span span = CONTEXT_SPAN_KEY.get(serverCtx);
1495       assertNotNull(span);
1496       SpanContext spanContext = span.getContext();
1497       assertEquals(clientParentSpan.getContext().getTraceId(), spanContext.getTraceId());
1498     } finally {
1499       ctx.detach(origCtx);
1500     }
1501   }
1502 
1503   @Test
statusCodeAndMessage()1504   public void statusCodeAndMessage() throws Exception {
1505     int errorCode = 2;
1506     String errorMessage = "test status message";
1507     EchoStatus responseStatus = EchoStatus.newBuilder()
1508         .setCode(errorCode)
1509         .setMessage(errorMessage)
1510         .build();
1511     SimpleRequest simpleRequest = SimpleRequest.newBuilder()
1512         .setResponseStatus(responseStatus)
1513         .build();
1514     StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
1515         .setResponseStatus(responseStatus)
1516         .build();
1517 
1518     // Test UnaryCall
1519     try {
1520       blockingStub.unaryCall(simpleRequest);
1521       fail();
1522     } catch (StatusRuntimeException e) {
1523       assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode());
1524       assertEquals(errorMessage, e.getStatus().getDescription());
1525     }
1526     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN);
1527 
1528     // Test FullDuplexCall
1529     @SuppressWarnings("unchecked")
1530     StreamObserver<StreamingOutputCallResponse> responseObserver =
1531         mock(StreamObserver.class);
1532     StreamObserver<StreamingOutputCallRequest> requestObserver
1533         = asyncStub.fullDuplexCall(responseObserver);
1534     requestObserver.onNext(streamingRequest);
1535     requestObserver.onCompleted();
1536 
1537     ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
1538     verify(responseObserver, timeout(operationTimeoutMillis())).onError(captor.capture());
1539     assertEquals(Status.UNKNOWN.getCode(), Status.fromThrowable(captor.getValue()).getCode());
1540     assertEquals(errorMessage, Status.fromThrowable(captor.getValue()).getDescription());
1541     verifyNoMoreInteractions(responseObserver);
1542     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN);
1543   }
1544 
1545   @Test
specialStatusMessage()1546   public void specialStatusMessage() throws Exception {
1547     int errorCode = 2;
1548     String errorMessage = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP ��\t\n";
1549     SimpleRequest simpleRequest = SimpleRequest.newBuilder()
1550         .setResponseStatus(EchoStatus.newBuilder()
1551             .setCode(errorCode)
1552             .setMessage(errorMessage)
1553             .build())
1554         .build();
1555 
1556     try {
1557       blockingStub.unaryCall(simpleRequest);
1558       fail();
1559     } catch (StatusRuntimeException e) {
1560       assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode());
1561       assertEquals(errorMessage, e.getStatus().getDescription());
1562     }
1563     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN);
1564   }
1565 
1566   /** Sends an rpc to an unimplemented method within TestService. */
1567   @Test
unimplementedMethod()1568   public void unimplementedMethod() {
1569     try {
1570       blockingStub.unimplementedCall(Empty.getDefaultInstance());
1571       fail();
1572     } catch (StatusRuntimeException e) {
1573       assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
1574     }
1575 
1576     assertClientStatsTrace("grpc.testing.TestService/UnimplementedCall",
1577         Status.Code.UNIMPLEMENTED);
1578   }
1579 
1580   /** Sends an rpc to an unimplemented service on the server. */
1581   @Test
unimplementedService()1582   public void unimplementedService() {
1583     UnimplementedServiceGrpc.UnimplementedServiceBlockingStub stub =
1584         UnimplementedServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor);
1585     try {
1586       stub.unimplementedCall(Empty.getDefaultInstance());
1587       fail();
1588     } catch (StatusRuntimeException e) {
1589       assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
1590     }
1591 
1592     assertStatsTrace("grpc.testing.UnimplementedService/UnimplementedCall",
1593         Status.Code.UNIMPLEMENTED);
1594   }
1595 
1596   /** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */
1597   @SuppressWarnings("MissingFail")
1598   @Test
timeoutOnSleepingServer()1599   public void timeoutOnSleepingServer() throws Exception {
1600     TestServiceGrpc.TestServiceStub stub =
1601         asyncStub.withDeadlineAfter(1, TimeUnit.MILLISECONDS);
1602 
1603     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
1604     StreamObserver<StreamingOutputCallRequest> requestObserver
1605         = stub.fullDuplexCall(responseObserver);
1606 
1607     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1608         .setPayload(Payload.newBuilder()
1609             .setBody(ByteString.copyFrom(new byte[27182])))
1610         .build();
1611     try {
1612       requestObserver.onNext(request);
1613     } catch (IllegalStateException expected) {
1614       // This can happen if the stream has already been terminated due to deadline exceeded.
1615     }
1616 
1617     assertTrue(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1618     assertEquals(0, responseObserver.getValues().size());
1619     assertEquals(Status.DEADLINE_EXCEEDED.getCode(),
1620                  Status.fromThrowable(responseObserver.getError()).getCode());
1621 
1622     if (metricsExpected()) {
1623       // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
1624       // recorded.  The tracer stats rely on the stream being created, which is not always the case
1625       // in this test, thus we will not check that.
1626       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1627       checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall");
1628       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1629       checkEndTags(
1630           clientEndRecord,
1631           "grpc.testing.TestService/FullDuplexCall",
1632           Status.DEADLINE_EXCEEDED.getCode());
1633     }
1634   }
1635 
1636   /** Sends a large unary rpc with service account credentials. */
serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)1637   public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
1638       throws Exception {
1639     // cast to ServiceAccountCredentials to double-check the right type of object was created.
1640     GoogleCredentials credentials =
1641         ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
1642     credentials = credentials.createScoped(Arrays.<String>asList(authScope));
1643     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1644         .withCallCredentials(MoreCallCredentials.from(credentials));
1645     final SimpleRequest request = SimpleRequest.newBuilder()
1646         .setFillUsername(true)
1647         .setFillOauthScope(true)
1648         .setResponseSize(314159)
1649         .setResponseType(PayloadType.COMPRESSABLE)
1650         .setPayload(Payload.newBuilder()
1651             .setBody(ByteString.copyFrom(new byte[271828])))
1652         .build();
1653 
1654     final SimpleResponse response = stub.unaryCall(request);
1655     assertFalse(response.getUsername().isEmpty());
1656     assertTrue("Received username: " + response.getUsername(),
1657         jsonKey.contains(response.getUsername()));
1658     assertFalse(response.getOauthScope().isEmpty());
1659     assertTrue("Received oauth scope: " + response.getOauthScope(),
1660         authScope.contains(response.getOauthScope()));
1661 
1662     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1663         .setOauthScope(response.getOauthScope())
1664         .setUsername(response.getUsername())
1665         .setPayload(Payload.newBuilder()
1666             .setType(PayloadType.COMPRESSABLE)
1667             .setBody(ByteString.copyFrom(new byte[314159])))
1668         .build();
1669     assertEquals(goldenResponse, response);
1670   }
1671 
1672   /** Sends a large unary rpc with compute engine credentials. */
computeEngineCreds(String serviceAccount, String oauthScope)1673   public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception {
1674     ComputeEngineCredentials credentials = ComputeEngineCredentials.create();
1675     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1676         .withCallCredentials(MoreCallCredentials.from(credentials));
1677     final SimpleRequest request = SimpleRequest.newBuilder()
1678         .setFillUsername(true)
1679         .setFillOauthScope(true)
1680         .setResponseSize(314159)
1681         .setResponseType(PayloadType.COMPRESSABLE)
1682         .setPayload(Payload.newBuilder()
1683             .setBody(ByteString.copyFrom(new byte[271828])))
1684         .build();
1685 
1686     final SimpleResponse response = stub.unaryCall(request);
1687     assertEquals(serviceAccount, response.getUsername());
1688     assertFalse(response.getOauthScope().isEmpty());
1689     assertTrue("Received oauth scope: " + response.getOauthScope(),
1690         oauthScope.contains(response.getOauthScope()));
1691 
1692     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1693         .setOauthScope(response.getOauthScope())
1694         .setUsername(response.getUsername())
1695         .setPayload(Payload.newBuilder()
1696             .setType(PayloadType.COMPRESSABLE)
1697             .setBody(ByteString.copyFrom(new byte[314159])))
1698         .build();
1699     assertEquals(goldenResponse, response);
1700   }
1701 
1702   /** Test JWT-based auth. */
jwtTokenCreds(InputStream serviceAccountJson)1703   public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
1704     final SimpleRequest request = SimpleRequest.newBuilder()
1705         .setResponseType(PayloadType.COMPRESSABLE)
1706         .setResponseSize(314159)
1707         .setPayload(Payload.newBuilder()
1708             .setBody(ByteString.copyFrom(new byte[271828])))
1709         .setFillUsername(true)
1710         .build();
1711 
1712     ServiceAccountCredentials credentials = (ServiceAccountCredentials)
1713         GoogleCredentials.fromStream(serviceAccountJson);
1714     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1715         .withCallCredentials(MoreCallCredentials.from(credentials));
1716     SimpleResponse response = stub.unaryCall(request);
1717     assertEquals(credentials.getClientEmail(), response.getUsername());
1718     assertEquals(314159, response.getPayload().getBody().size());
1719   }
1720 
1721   /** Sends a unary rpc with raw oauth2 access token credentials. */
oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)1722   public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)
1723       throws Exception {
1724     GoogleCredentials utilCredentials =
1725         GoogleCredentials.fromStream(credentialsStream);
1726     utilCredentials = utilCredentials.createScoped(Arrays.<String>asList(authScope));
1727     AccessToken accessToken = utilCredentials.refreshAccessToken();
1728 
1729     OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
1730 
1731     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1732         .withCallCredentials(MoreCallCredentials.from(credentials));
1733     final SimpleRequest request = SimpleRequest.newBuilder()
1734         .setFillUsername(true)
1735         .setFillOauthScope(true)
1736         .build();
1737 
1738     final SimpleResponse response = stub.unaryCall(request);
1739     assertFalse(response.getUsername().isEmpty());
1740     assertTrue("Received username: " + response.getUsername(),
1741         jsonKey.contains(response.getUsername()));
1742     assertFalse(response.getOauthScope().isEmpty());
1743     assertTrue("Received oauth scope: " + response.getOauthScope(),
1744         authScope.contains(response.getOauthScope()));
1745   }
1746 
1747   /** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */
perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)1748   public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)
1749       throws Exception {
1750     // In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once
1751     // for that purpose.
1752     // So, this test is identical to oauth2_auth_token test.
1753     oauth2AuthToken(jsonKey, credentialsStream, oauthScope);
1754   }
1755 
assertSuccess(StreamRecorder<?> recorder)1756   protected static void assertSuccess(StreamRecorder<?> recorder) {
1757     if (recorder.getError() != null) {
1758       throw new AssertionError(recorder.getError());
1759     }
1760   }
1761 
1762   /** Helper for getting remote address {@link io.grpc.ServerCall#getAttributes()} */
obtainRemoteClientAddr()1763   protected SocketAddress obtainRemoteClientAddr() {
1764     TestServiceGrpc.TestServiceBlockingStub stub =
1765         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
1766 
1767     stub.unaryCall(SimpleRequest.getDefaultInstance());
1768 
1769     return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
1770   }
1771 
1772   /** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */
assertX500SubjectDn(String tlsInfo)1773   protected void assertX500SubjectDn(String tlsInfo) {
1774     TestServiceGrpc.TestServiceBlockingStub stub =
1775         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
1776 
1777     stub.unaryCall(SimpleRequest.getDefaultInstance());
1778 
1779     List<Certificate> certificates = Lists.newArrayList();
1780     SSLSession sslSession =
1781         serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
1782     try {
1783       certificates = Arrays.asList(sslSession.getPeerCertificates());
1784     } catch (SSLPeerUnverifiedException e) {
1785       // Should never happen
1786       throw new AssertionError(e);
1787     }
1788 
1789     X509Certificate x509cert = (X509Certificate) certificates.get(0);
1790 
1791     assertEquals(1, certificates.size());
1792     assertEquals(tlsInfo, x509cert.getSubjectDN().toString());
1793   }
1794 
operationTimeoutMillis()1795   protected int operationTimeoutMillis() {
1796     return 5000;
1797   }
1798 
1799   /**
1800    * Some tests run on memory constrained environments.  Rather than OOM, just give up.  64 is
1801    * chosen as a maximum amount of memory a large test would need.
1802    */
assumeEnoughMemory()1803   private static void assumeEnoughMemory() {
1804     Runtime r = Runtime.getRuntime();
1805     long usedMem = r.totalMemory() - r.freeMemory();
1806     long actuallyFreeMemory = r.maxMemory() - usedMem;
1807     Assume.assumeTrue(
1808         actuallyFreeMemory + " is not sufficient to run this test",
1809         actuallyFreeMemory >= 64 * 1024 * 1024);
1810   }
1811 
1812   /**
1813    * Wrapper around {@link Mockito#verify}, to keep log spam down on failure.
1814    */
verify(T mock, VerificationMode mode)1815   private static <T> T verify(T mock, VerificationMode mode) {
1816     try {
1817       return Mockito.verify(mock, mode);
1818     } catch (final AssertionError e) {
1819       String msg = e.getMessage();
1820       if (msg.length() >= 256) {
1821         // AssertionError(String, Throwable) only present in Android API 19+
1822         throw new AssertionError(msg.substring(0, 256)) {
1823           @Override
1824           public synchronized Throwable getCause() {
1825             return e;
1826           }
1827         };
1828       }
1829       throw e;
1830     }
1831   }
1832 
1833   /**
1834    * Wrapper around {@link Mockito#verify}, to keep log spam down on failure.
1835    */
verifyNoMoreInteractions(Object... mocks)1836   private static void verifyNoMoreInteractions(Object... mocks) {
1837     try {
1838       Mockito.verifyNoMoreInteractions(mocks);
1839     } catch (final AssertionError e) {
1840       String msg = e.getMessage();
1841       if (msg.length() >= 256) {
1842         // AssertionError(String, Throwable) only present in Android API 19+
1843         throw new AssertionError(msg.substring(0, 256)) {
1844           @Override
1845           public synchronized Throwable getCause() {
1846             return e;
1847           }
1848         };
1849       }
1850       throw e;
1851     }
1852   }
1853 
1854   /**
1855    * Poll the next metrics record and check it against the provided information, including the
1856    * message sizes.
1857    */
assertStatsTrace(String method, Status.Code status, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1858   private void assertStatsTrace(String method, Status.Code status,
1859       Collection<? extends MessageLite> requests,
1860       Collection<? extends MessageLite> responses) {
1861     assertClientStatsTrace(method, status, requests, responses);
1862     assertServerStatsTrace(method, status, requests, responses);
1863   }
1864 
1865   /**
1866    * Poll the next metrics record and check it against the provided information, without checking
1867    * the message sizes.
1868    */
assertStatsTrace(String method, Status.Code status)1869   private void assertStatsTrace(String method, Status.Code status) {
1870     assertStatsTrace(method, status, null, null);
1871   }
1872 
assertClientStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1873   private void assertClientStatsTrace(String method, Status.Code code,
1874       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
1875     // Tracer-based stats
1876     TestClientStreamTracer tracer = clientStreamTracers.poll();
1877     assertNotNull(tracer);
1878     assertTrue(tracer.getOutboundHeaders());
1879     // assertClientStatsTrace() is called right after application receives status,
1880     // but streamClosed() may be called slightly later than that.  So we need a timeout.
1881     try {
1882       assertTrue(tracer.await(5, TimeUnit.SECONDS));
1883     } catch (InterruptedException e) {
1884       throw new AssertionError(e);
1885     }
1886     assertEquals(code, tracer.getStatus().getCode());
1887 
1888     if (requests != null && responses != null) {
1889       checkTracers(tracer, requests, responses);
1890     }
1891     if (metricsExpected()) {
1892       // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be
1893       // done before application receives status.
1894       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord();
1895       checkStartTags(clientStartRecord, method);
1896       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord();
1897       checkEndTags(clientEndRecord, method, code);
1898 
1899       if (requests != null && responses != null) {
1900         checkCensus(clientEndRecord, false, requests, responses);
1901       }
1902     }
1903   }
1904 
assertClientStatsTrace(String method, Status.Code status)1905   private void assertClientStatsTrace(String method, Status.Code status) {
1906     assertClientStatsTrace(method, status, null, null);
1907   }
1908 
1909   @SuppressWarnings("AssertionFailureIgnored") // Failure is checked in the end by the passed flag.
assertServerStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1910   private void assertServerStatsTrace(String method, Status.Code code,
1911       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
1912     if (server == null) {
1913       // Server is not in the same process.  We can't check server-side stats.
1914       return;
1915     }
1916 
1917     if (metricsExpected()) {
1918       MetricsRecord serverStartRecord;
1919       MetricsRecord serverEndRecord;
1920       try {
1921         // On the server, the stats is finalized in ServerStreamListener.closed(), which can be
1922         // run after the client receives the final status.  So we use a timeout.
1923         serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1924         serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1925       } catch (InterruptedException e) {
1926         throw new RuntimeException(e);
1927       }
1928       assertNotNull(serverStartRecord);
1929       assertNotNull(serverEndRecord);
1930       checkStartTags(serverStartRecord, method);
1931       checkEndTags(serverEndRecord, method, code);
1932       if (requests != null && responses != null) {
1933         checkCensus(serverEndRecord, true, requests, responses);
1934       }
1935     }
1936 
1937     ServerStreamTracerInfo tracerInfo;
1938     tracerInfo = serverStreamTracers.poll();
1939     assertNotNull(tracerInfo);
1940     assertEquals(method, tracerInfo.fullMethodName);
1941     assertNotNull(tracerInfo.tracer.contextCapture);
1942     // On the server, streamClosed() may be called after the client receives the final status.
1943     // So we use a timeout.
1944     try {
1945       assertTrue(tracerInfo.tracer.await(1, TimeUnit.SECONDS));
1946     } catch (InterruptedException e) {
1947       throw new AssertionError(e);
1948     }
1949     assertEquals(code, tracerInfo.tracer.getStatus().getCode());
1950     if (requests != null && responses != null) {
1951       checkTracers(tracerInfo.tracer, responses, requests);
1952     }
1953   }
1954 
checkStartTags(MetricsRecord record, String methodName)1955   private static void checkStartTags(MetricsRecord record, String methodName) {
1956     assertNotNull("record is not null", record);
1957     TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
1958     assertNotNull("method name tagged", methodNameTag);
1959     assertEquals("method names match", methodName, methodNameTag.asString());
1960   }
1961 
checkEndTags( MetricsRecord record, String methodName, Status.Code status)1962   private static void checkEndTags(
1963       MetricsRecord record, String methodName, Status.Code status) {
1964     assertNotNull("record is not null", record);
1965     TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD);
1966     assertNotNull("method name tagged", methodNameTag);
1967     assertEquals("method names match", methodName, methodNameTag.asString());
1968     TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS);
1969     assertNotNull("status tagged", statusTag);
1970     assertEquals(status.toString(), statusTag.asString());
1971   }
1972 
1973   /**
1974    * Check information recorded by tracers.
1975    */
checkTracers( TestStreamTracer tracer, Collection<? extends MessageLite> sentMessages, Collection<? extends MessageLite> receivedMessages)1976   private void checkTracers(
1977       TestStreamTracer tracer,
1978       Collection<? extends MessageLite> sentMessages,
1979       Collection<? extends MessageLite> receivedMessages) {
1980     long uncompressedSentSize = 0;
1981     int seqNo = 0;
1982     for (MessageLite msg : sentMessages) {
1983       assertThat(tracer.nextOutboundEvent()).isEqualTo(String.format("outboundMessage(%d)", seqNo));
1984       assertThat(tracer.nextOutboundEvent()).matches(
1985           String.format("outboundMessageSent\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
1986       seqNo++;
1987       uncompressedSentSize += msg.getSerializedSize();
1988     }
1989     assertNull(tracer.nextOutboundEvent());
1990     long uncompressedReceivedSize = 0;
1991     seqNo = 0;
1992     for (MessageLite msg : receivedMessages) {
1993       assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo));
1994       assertThat(tracer.nextInboundEvent()).matches(
1995           String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
1996       uncompressedReceivedSize += msg.getSerializedSize();
1997       seqNo++;
1998     }
1999     assertNull(tracer.nextInboundEvent());
2000     if (metricsExpected()) {
2001       assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize());
2002       assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize());
2003     }
2004   }
2005 
2006   /**
2007    * Check information recorded by Census.
2008    */
checkCensus(MetricsRecord record, boolean isServer, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2009   private void checkCensus(MetricsRecord record, boolean isServer,
2010       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
2011     int uncompressedRequestsSize = 0;
2012     for (MessageLite request : requests) {
2013       uncompressedRequestsSize += request.getSerializedSize();
2014     }
2015     int uncompressedResponsesSize = 0;
2016     for (MessageLite response : responses) {
2017       uncompressedResponsesSize += response.getSerializedSize();
2018     }
2019     if (isServer) {
2020       assertEquals(
2021           requests.size(),
2022           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT));
2023       assertEquals(
2024           responses.size(),
2025           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT));
2026       assertEquals(
2027           uncompressedRequestsSize,
2028           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
2029       assertEquals(
2030           uncompressedResponsesSize,
2031           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
2032       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY));
2033       // It's impossible to get the expected wire sizes because it may be compressed, so we just
2034       // check if they are recorded.
2035       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES));
2036       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES));
2037     } else {
2038       assertEquals(
2039           requests.size(),
2040           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT));
2041       assertEquals(
2042           responses.size(),
2043           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT));
2044       assertEquals(
2045           uncompressedRequestsSize,
2046           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
2047       assertEquals(
2048           uncompressedResponsesSize,
2049           record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
2050       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY));
2051       // It's impossible to get the expected wire sizes because it may be compressed, so we just
2052       // check if they are recorded.
2053       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES));
2054       assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES));
2055     }
2056   }
2057 
2058   /**
2059    * Captures the request attributes. Useful for testing ServerCalls.
2060    * {@link ServerCall#getAttributes()}
2061    */
recordServerCallInterceptor( final AtomicReference<ServerCall<?, ?>> serverCallCapture)2062   private static ServerInterceptor recordServerCallInterceptor(
2063       final AtomicReference<ServerCall<?, ?>> serverCallCapture) {
2064     return new ServerInterceptor() {
2065       @Override
2066       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
2067           ServerCall<ReqT, RespT> call,
2068           Metadata requestHeaders,
2069           ServerCallHandler<ReqT, RespT> next) {
2070         serverCallCapture.set(call);
2071         return next.startCall(call, requestHeaders);
2072       }
2073     };
2074   }
2075 
2076   private static ServerInterceptor recordContextInterceptor(
2077       final AtomicReference<Context> contextCapture) {
2078     return new ServerInterceptor() {
2079       @Override
2080       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
2081           ServerCall<ReqT, RespT> call,
2082           Metadata requestHeaders,
2083           ServerCallHandler<ReqT, RespT> next) {
2084         contextCapture.set(Context.current());
2085         return next.startCall(call, requestHeaders);
2086       }
2087     };
2088   }
2089 
2090   /**
2091    * A marshaller that record input and output sizes.
2092    */
2093   private static final class ByteSizeMarshaller<T> implements MethodDescriptor.Marshaller<T> {
2094 
2095     private final MethodDescriptor.Marshaller<T> delegate;
2096     volatile int lastOutSize;
2097     volatile int lastInSize;
2098 
2099     ByteSizeMarshaller(MethodDescriptor.Marshaller<T> delegate) {
2100       this.delegate = delegate;
2101     }
2102 
2103     @Override
2104     public InputStream stream(T value) {
2105       InputStream is = delegate.stream(value);
2106       ByteArrayOutputStream baos = new ByteArrayOutputStream();
2107       try {
2108         lastOutSize = (int) ByteStreams.copy(is, baos);
2109       } catch (IOException e) {
2110         throw new RuntimeException(e);
2111       }
2112       return new ByteArrayInputStream(baos.toByteArray());
2113     }
2114 
2115     @Override
2116     public T parse(InputStream stream) {
2117       ByteArrayOutputStream baos = new ByteArrayOutputStream();
2118       try {
2119         lastInSize = (int) ByteStreams.copy(stream, baos);
2120       } catch (IOException e) {
2121         throw new RuntimeException(e);
2122       }
2123       return delegate.parse(new ByteArrayInputStream(baos.toByteArray()));
2124     }
2125   }
2126 }
2127