• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static com.google.common.truth.Truth.assertWithMessage;
21 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNotEquals;
25 import static org.junit.Assert.assertNotNull;
26 import static org.junit.Assert.assertNull;
27 import static org.junit.Assert.assertSame;
28 import static org.junit.Assert.assertTrue;
29 import static org.junit.Assert.fail;
30 
31 import com.google.auth.oauth2.AccessToken;
32 import com.google.auth.oauth2.ComputeEngineCredentials;
33 import com.google.auth.oauth2.GoogleCredentials;
34 import com.google.auth.oauth2.OAuth2Credentials;
35 import com.google.auth.oauth2.ServiceAccountCredentials;
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Throwables;
38 import com.google.common.collect.ImmutableList;
39 import com.google.common.io.ByteStreams;
40 import com.google.common.util.concurrent.SettableFuture;
41 import com.google.protobuf.ByteString;
42 import com.google.protobuf.MessageLite;
43 import com.google.protobuf.StringValue;
44 import io.grpc.CallOptions;
45 import io.grpc.Channel;
46 import io.grpc.ClientCall;
47 import io.grpc.ClientInterceptor;
48 import io.grpc.ClientInterceptors;
49 import io.grpc.ClientStreamTracer;
50 import io.grpc.Context;
51 import io.grpc.Grpc;
52 import io.grpc.ManagedChannel;
53 import io.grpc.ManagedChannelBuilder;
54 import io.grpc.Metadata;
55 import io.grpc.MethodDescriptor;
56 import io.grpc.Server;
57 import io.grpc.ServerBuilder;
58 import io.grpc.ServerCall;
59 import io.grpc.ServerCallHandler;
60 import io.grpc.ServerInterceptor;
61 import io.grpc.ServerInterceptors;
62 import io.grpc.ServerStreamTracer;
63 import io.grpc.Status;
64 import io.grpc.StatusRuntimeException;
65 import io.grpc.auth.MoreCallCredentials;
66 import io.grpc.census.InternalCensusStatsAccessor;
67 import io.grpc.census.internal.DeprecatedCensusConstants;
68 import io.grpc.internal.GrpcUtil;
69 import io.grpc.internal.testing.StatsTestUtils;
70 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
71 import io.grpc.internal.testing.StatsTestUtils.FakeTagContext;
72 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
73 import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
74 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
75 import io.grpc.internal.testing.StreamRecorder;
76 import io.grpc.internal.testing.TestClientStreamTracer;
77 import io.grpc.internal.testing.TestServerStreamTracer;
78 import io.grpc.internal.testing.TestStreamTracer;
79 import io.grpc.stub.ClientCallStreamObserver;
80 import io.grpc.stub.ClientCalls;
81 import io.grpc.stub.MetadataUtils;
82 import io.grpc.stub.StreamObserver;
83 import io.grpc.testing.TestUtils;
84 import io.grpc.testing.integration.EmptyProtos.Empty;
85 import io.grpc.testing.integration.Messages.BoolValue;
86 import io.grpc.testing.integration.Messages.EchoStatus;
87 import io.grpc.testing.integration.Messages.Payload;
88 import io.grpc.testing.integration.Messages.ResponseParameters;
89 import io.grpc.testing.integration.Messages.SimpleRequest;
90 import io.grpc.testing.integration.Messages.SimpleResponse;
91 import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
92 import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
93 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
94 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
95 import io.grpc.testing.integration.Messages.TestOrcaReport;
96 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
97 import io.opencensus.stats.Measure;
98 import io.opencensus.stats.Measure.MeasureDouble;
99 import io.opencensus.stats.Measure.MeasureLong;
100 import io.opencensus.tags.TagKey;
101 import io.opencensus.tags.TagValue;
102 import io.opencensus.trace.Span;
103 import io.opencensus.trace.SpanContext;
104 import io.opencensus.trace.Tracing;
105 import java.io.ByteArrayInputStream;
106 import java.io.ByteArrayOutputStream;
107 import java.io.IOException;
108 import java.io.InputStream;
109 import java.net.SocketAddress;
110 import java.security.cert.Certificate;
111 import java.security.cert.X509Certificate;
112 import java.util.ArrayList;
113 import java.util.Arrays;
114 import java.util.Collection;
115 import java.util.Collections;
116 import java.util.Iterator;
117 import java.util.List;
118 import java.util.Locale;
119 import java.util.Map;
120 import java.util.concurrent.ArrayBlockingQueue;
121 import java.util.concurrent.BlockingQueue;
122 import java.util.concurrent.Executors;
123 import java.util.concurrent.LinkedBlockingQueue;
124 import java.util.concurrent.ScheduledExecutorService;
125 import java.util.concurrent.TimeUnit;
126 import java.util.concurrent.atomic.AtomicReference;
127 import java.util.logging.Level;
128 import java.util.logging.Logger;
129 import java.util.regex.Pattern;
130 import javax.annotation.Nullable;
131 import javax.net.ssl.SSLPeerUnverifiedException;
132 import javax.net.ssl.SSLSession;
133 import org.HdrHistogram.Histogram;
134 import org.junit.After;
135 import org.junit.Assert;
136 import org.junit.Assume;
137 import org.junit.Before;
138 import org.junit.Rule;
139 import org.junit.Test;
140 import org.junit.rules.DisableOnDebug;
141 import org.junit.rules.TestRule;
142 import org.junit.rules.Timeout;
143 
144 /**
145  * Abstract base class for all GRPC transport tests.
146  *
147  * <p> New tests should avoid using Mockito to support running on AppEngine.</p>
148  */
149 public abstract class AbstractInteropTest {
150   private static Logger logger = Logger.getLogger(AbstractInteropTest.class.getName());
151 
152   @Rule public final TestRule globalTimeout;
153 
154   /** Must be at least {@link #unaryPayloadLength()}, plus some to account for encoding overhead. */
155   public static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
156 
157   /**
158    * Use a small flow control to help detect flow control bugs. Don't use 64KiB to test
159    * SETTINGS/WINDOW_UPDATE exchange.
160    */
161   public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024;
162   private static final MeasureLong RETRIES_PER_CALL =
163       Measure.MeasureLong.create(
164           "grpc.io/client/retries_per_call", "Number of retries per call", "1");
165   private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL =
166       Measure.MeasureLong.create(
167           "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1");
168   private static final MeasureDouble RETRY_DELAY_PER_CALL =
169       Measure.MeasureDouble.create(
170           "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms");
171 
172   private static final FakeTagger tagger = new FakeTagger();
173   private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
174       new FakeTagContextBinarySerializer();
175 
176   private final AtomicReference<ServerCall<?, ?>> serverCallCapture =
177       new AtomicReference<>();
178   private final AtomicReference<ClientCall<?, ?>> clientCallCapture =
179       new AtomicReference<>();
180   private final AtomicReference<Metadata> requestHeadersCapture =
181       new AtomicReference<>();
182   private final AtomicReference<Context> contextCapture =
183       new AtomicReference<>();
184   private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
185   private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder();
186 
187   private ScheduledExecutorService testServiceExecutor;
188   private Server server;
189   private Server handshakerServer;
190 
191   private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers =
192       new LinkedBlockingQueue<>();
193 
194   static final CallOptions.Key<AtomicReference<TestOrcaReport>>
195       ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report");
196   static final CallOptions.Key<AtomicReference<TestOrcaReport>>
197       ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report");
198 
199   private static final class ServerStreamTracerInfo {
200     final String fullMethodName;
201     final InteropServerStreamTracer tracer;
202 
ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer)203     ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer) {
204       this.fullMethodName = fullMethodName;
205       this.tracer = tracer;
206     }
207 
208     private static final class InteropServerStreamTracer extends TestServerStreamTracer {
209       private volatile Context contextCapture;
210 
211       @Override
filterContext(Context context)212       public Context filterContext(Context context) {
213         contextCapture = context;
214         return super.filterContext(context);
215       }
216     }
217   }
218 
219   /**
220    * Constructor for tests.
221    */
AbstractInteropTest()222   protected AbstractInteropTest() {
223     TestRule timeout = Timeout.seconds(60);
224     try {
225       timeout = new DisableOnDebug(timeout);
226     } catch (Throwable t) {
227       // This can happen on Android, which lacks some standard Java class.
228       // Seen at https://github.com/grpc/grpc-java/pull/5832#issuecomment-499698086
229       logger.log(Level.FINE, "Debugging not disabled.", t);
230     }
231     globalTimeout = timeout;
232   }
233 
234   private final ServerStreamTracer.Factory serverStreamTracerFactory =
235       new ServerStreamTracer.Factory() {
236         @Override
237         public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
238           ServerStreamTracerInfo.InteropServerStreamTracer tracer
239               = new ServerStreamTracerInfo.InteropServerStreamTracer();
240           serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer));
241           return tracer;
242         }
243       };
244 
245   protected static final Empty EMPTY = Empty.getDefaultInstance();
246 
configBuilder(@ullable ServerBuilder<?> builder)247   private void configBuilder(@Nullable ServerBuilder<?> builder) {
248     if (builder == null) {
249       return;
250     }
251     testServiceExecutor = Executors.newScheduledThreadPool(2);
252 
253     List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder()
254         .add(recordServerCallInterceptor(serverCallCapture))
255         .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
256         .add(recordContextInterceptor(contextCapture))
257         .addAll(TestServiceImpl.interceptors())
258         .build();
259 
260     builder
261         .addService(
262             ServerInterceptors.intercept(
263                 new TestServiceImpl(testServiceExecutor),
264                 allInterceptors))
265         .addStreamTracerFactory(serverStreamTracerFactory);
266   }
267 
startServer(@ullable ServerBuilder<?> builder)268   protected void startServer(@Nullable ServerBuilder<?> builder) {
269     maybeStartHandshakerServer();
270     if (builder == null) {
271       server = null;
272       return;
273     }
274 
275     try {
276       server = builder.build().start();
277     } catch (IOException ex) {
278       throw new RuntimeException(ex);
279     }
280   }
281 
maybeStartHandshakerServer()282   private void maybeStartHandshakerServer() {
283     ServerBuilder<?> handshakerServerBuilder = getHandshakerServerBuilder();
284     if (handshakerServerBuilder != null) {
285       try {
286         handshakerServer = handshakerServerBuilder.build().start();
287       } catch (IOException ex) {
288         throw new RuntimeException(ex);
289       }
290     }
291   }
292 
stopServer()293   private void stopServer() {
294     if (server != null) {
295       server.shutdownNow();
296     }
297     if (testServiceExecutor != null) {
298       testServiceExecutor.shutdown();
299     }
300     if (handshakerServer != null) {
301       handshakerServer.shutdownNow();
302     }
303   }
304 
305   @VisibleForTesting
getListenAddress()306   final SocketAddress getListenAddress() {
307     return server.getListenSockets().iterator().next();
308   }
309 
310   protected ManagedChannel channel;
311   protected TestServiceGrpc.TestServiceBlockingStub blockingStub;
312   protected TestServiceGrpc.TestServiceStub asyncStub;
313 
314   private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers =
315       new LinkedBlockingQueue<>();
316 
317   private final ClientStreamTracer.Factory clientStreamTracerFactory =
318       new ClientStreamTracer.Factory() {
319         @Override
320         public ClientStreamTracer newClientStreamTracer(
321             ClientStreamTracer.StreamInfo info, Metadata headers) {
322           TestClientStreamTracer tracer = new TestClientStreamTracer();
323           clientStreamTracers.add(tracer);
324           return tracer;
325         }
326       };
327   private final ClientInterceptor tracerSetupInterceptor = new ClientInterceptor() {
328         @Override
329         public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
330             MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
331           return next.newCall(
332               method, callOptions.withStreamTracerFactory(clientStreamTracerFactory));
333         }
334       };
335 
336   /**
337    * Must be called by the subclass setup method if overridden.
338    */
339   @Before
setUp()340   public void setUp() {
341     ServerBuilder<?> serverBuilder = getServerBuilder();
342     configBuilder(serverBuilder);
343     startServer(serverBuilder);
344     channel = createChannel();
345 
346     blockingStub =
347         TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor);
348     asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor);
349 
350     ClientInterceptor[] additionalInterceptors = getAdditionalInterceptors();
351     if (additionalInterceptors != null) {
352       blockingStub = blockingStub.withInterceptors(additionalInterceptors);
353       asyncStub = asyncStub.withInterceptors(additionalInterceptors);
354     }
355 
356     requestHeadersCapture.set(null);
357   }
358 
359   /** Clean up. */
360   @After
tearDown()361   public void tearDown() {
362     if (channel != null) {
363       channel.shutdownNow();
364       try {
365         channel.awaitTermination(1, TimeUnit.SECONDS);
366       } catch (InterruptedException ie) {
367         logger.log(Level.FINE, "Interrupted while waiting for channel termination", ie);
368         // Best effort. If there is an interruption, we want to continue cleaning up, but quickly
369         Thread.currentThread().interrupt();
370       }
371     }
372     stopServer();
373   }
374 
createChannel()375   protected ManagedChannel createChannel() {
376     return createChannelBuilder().build();
377   }
378 
createChannelBuilder()379   protected abstract ManagedChannelBuilder<?> createChannelBuilder();
380 
381   @Nullable
getAdditionalInterceptors()382   protected ClientInterceptor[] getAdditionalInterceptors() {
383     return null;
384   }
385 
386   /**
387    * Returns the server builder used to create server for each test run.  Return {@code null} if
388    * it shouldn't start a server in the same process.
389    */
390   @Nullable
getServerBuilder()391   protected ServerBuilder<?> getServerBuilder() {
392     return null;
393   }
394 
395   @Nullable
getHandshakerServerBuilder()396   protected ServerBuilder<?> getHandshakerServerBuilder() {
397     return null;
398   }
399 
createCensusStatsClientInterceptor()400   protected final ClientInterceptor createCensusStatsClientInterceptor() {
401     return
402         InternalCensusStatsAccessor
403             .getClientInterceptor(
404                 tagger, tagContextBinarySerializer, clientStatsRecorder,
405                 GrpcUtil.STOPWATCH_SUPPLIER,
406                 true, true, true,
407                 /* recordRealTimeMetrics= */ false,
408                 /* recordRetryMetrics= */ true);
409   }
410 
createCustomCensusTracerFactory()411   protected final ServerStreamTracer.Factory createCustomCensusTracerFactory() {
412     return InternalCensusStatsAccessor.getServerStreamTracerFactory(
413         tagger, tagContextBinarySerializer, serverStatsRecorder,
414         GrpcUtil.STOPWATCH_SUPPLIER,
415         true, true, true, false /* real-time metrics */);
416   }
417 
418   /**
419    * Override this when custom census module presence is different from {@link #metricsExpected()}.
420    */
customCensusModulePresent()421   protected boolean customCensusModulePresent() {
422     return metricsExpected();
423   }
424 
425   /**
426    * Return true if exact metric values should be checked.
427    */
metricsExpected()428   protected boolean metricsExpected() {
429     return true;
430   }
431 
432   @Test
emptyUnary()433   public void emptyUnary() throws Exception {
434     assertEquals(EMPTY, blockingStub.emptyCall(EMPTY));
435   }
436 
437   @Test
emptyUnaryWithRetriableStream()438   public void emptyUnaryWithRetriableStream() throws Exception {
439     channel.shutdown();
440     channel = createChannelBuilder().enableRetry().build();
441     assertEquals(EMPTY, TestServiceGrpc.newBlockingStub(channel).emptyCall(EMPTY));
442   }
443 
444   /** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
cacheableUnary()445   public void cacheableUnary() {
446     // THIS TEST IS BROKEN. Enabling safe just on the MethodDescriptor does nothing by itself. This
447     // test would need to enable GET on the channel.
448     // Set safe to true.
449     MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod =
450         TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
451     // Set fake user IP since some proxies (GFE) won't cache requests from localhost.
452     Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
453     Metadata metadata = new Metadata();
454     metadata.put(userIpKey, "1.2.3.4");
455     Channel channelWithUserIpKey =
456         ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
457     SimpleRequest requests1And2 =
458         SimpleRequest.newBuilder()
459             .setPayload(
460                 Payload.newBuilder()
461                     .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
462             .build();
463     SimpleRequest request3 =
464         SimpleRequest.newBuilder()
465             .setPayload(
466                 Payload.newBuilder()
467                     .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
468             .build();
469 
470     SimpleResponse response1 =
471         ClientCalls.blockingUnaryCall(
472             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
473     SimpleResponse response2 =
474         ClientCalls.blockingUnaryCall(
475             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
476     SimpleResponse response3 =
477         ClientCalls.blockingUnaryCall(
478             channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
479 
480     assertEquals(response1, response2);
481     assertNotEquals(response1, response3);
482     // THIS TEST IS BROKEN. See comment at start of method.
483   }
484 
485   @Test
largeUnary()486   public void largeUnary() throws Exception {
487     assumeEnoughMemory();
488     final SimpleRequest request = SimpleRequest.newBuilder()
489         .setResponseSize(314159)
490         .setPayload(Payload.newBuilder()
491             .setBody(ByteString.copyFrom(new byte[271828])))
492         .build();
493     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
494         .setPayload(Payload.newBuilder()
495             .setBody(ByteString.copyFrom(new byte[314159])))
496         .build();
497 
498     assertResponse(goldenResponse, blockingStub.unaryCall(request));
499 
500     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
501         Collections.singleton(request), Collections.singleton(goldenResponse));
502   }
503 
504   /**
505    * Tests client per-message compression for unary calls. The Java API does not support inspecting
506    * a message's compression level, so this is primarily intended to run against a gRPC C++ server.
507    */
clientCompressedUnary(boolean probe)508   public void clientCompressedUnary(boolean probe) throws Exception {
509     assumeEnoughMemory();
510     final SimpleRequest expectCompressedRequest =
511         SimpleRequest.newBuilder()
512             .setExpectCompressed(BoolValue.newBuilder().setValue(true))
513             .setResponseSize(314159)
514             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
515             .build();
516     final SimpleRequest expectUncompressedRequest =
517         SimpleRequest.newBuilder()
518             .setExpectCompressed(BoolValue.newBuilder().setValue(false))
519             .setResponseSize(314159)
520             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
521             .build();
522     final SimpleResponse goldenResponse =
523         SimpleResponse.newBuilder()
524             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
525             .build();
526 
527     if (probe) {
528       // Send a non-compressed message with expectCompress=true. Servers supporting this test case
529       // should return INVALID_ARGUMENT.
530       try {
531         blockingStub.unaryCall(expectCompressedRequest);
532         fail("expected INVALID_ARGUMENT");
533       } catch (StatusRuntimeException e) {
534         assertEquals(Status.INVALID_ARGUMENT.getCode(), e.getStatus().getCode());
535       }
536       assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT);
537     }
538 
539     assertResponse(
540         goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest));
541     assertStatsTrace(
542         "grpc.testing.TestService/UnaryCall",
543         Status.Code.OK,
544         Collections.singleton(expectCompressedRequest),
545         Collections.singleton(goldenResponse));
546 
547     assertResponse(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest));
548     assertStatsTrace(
549         "grpc.testing.TestService/UnaryCall",
550         Status.Code.OK,
551         Collections.singleton(expectUncompressedRequest),
552         Collections.singleton(goldenResponse));
553   }
554 
555   /**
556    * Tests if the server can send a compressed unary response. Ideally we would assert that the
557    * responses have the requested compression, but this is not supported by the API. Given a
558    * compliant server, this test will exercise the code path for receiving a compressed response but
559    * cannot itself verify that the response was compressed.
560    */
561   @Test
serverCompressedUnary()562   public void serverCompressedUnary() throws Exception {
563     assumeEnoughMemory();
564     final SimpleRequest responseShouldBeCompressed =
565         SimpleRequest.newBuilder()
566             .setResponseCompressed(BoolValue.newBuilder().setValue(true))
567             .setResponseSize(314159)
568             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
569             .build();
570     final SimpleRequest responseShouldBeUncompressed =
571         SimpleRequest.newBuilder()
572             .setResponseCompressed(BoolValue.newBuilder().setValue(false))
573             .setResponseSize(314159)
574             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
575             .build();
576     final SimpleResponse goldenResponse =
577         SimpleResponse.newBuilder()
578             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
579             .build();
580 
581     assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed));
582     assertStatsTrace(
583         "grpc.testing.TestService/UnaryCall",
584         Status.Code.OK,
585         Collections.singleton(responseShouldBeCompressed),
586         Collections.singleton(goldenResponse));
587 
588     assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed));
589     assertStatsTrace(
590         "grpc.testing.TestService/UnaryCall",
591         Status.Code.OK,
592         Collections.singleton(responseShouldBeUncompressed),
593         Collections.singleton(goldenResponse));
594   }
595 
596   /**
597    * Assuming "pick_first" policy is used, tests that all requests are sent to the same server.
598    */
pickFirstUnary()599   public void pickFirstUnary() throws Exception {
600     SimpleRequest request = SimpleRequest.newBuilder()
601         .setResponseSize(1)
602         .setFillServerId(true)
603         .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1])))
604         .build();
605 
606     SimpleResponse firstResponse = blockingStub.unaryCall(request);
607     // Increase the chance of all servers are connected, in case the channel should be doing
608     // round_robin instead.
609     Thread.sleep(5000);
610     for (int i = 0; i < 100; i++) {
611       SimpleResponse response = blockingStub.unaryCall(request);
612       assertThat(response.getServerId()).isEqualTo(firstResponse.getServerId());
613     }
614   }
615 
616   @Test
serverStreaming()617   public void serverStreaming() throws Exception {
618     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
619         .addResponseParameters(ResponseParameters.newBuilder()
620             .setSize(31415))
621         .addResponseParameters(ResponseParameters.newBuilder()
622             .setSize(9))
623         .addResponseParameters(ResponseParameters.newBuilder()
624             .setSize(2653))
625         .addResponseParameters(ResponseParameters.newBuilder()
626             .setSize(58979))
627         .build();
628     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
629         StreamingOutputCallResponse.newBuilder()
630             .setPayload(Payload.newBuilder()
631                 .setBody(ByteString.copyFrom(new byte[31415])))
632             .build(),
633         StreamingOutputCallResponse.newBuilder()
634             .setPayload(Payload.newBuilder()
635                 .setBody(ByteString.copyFrom(new byte[9])))
636             .build(),
637         StreamingOutputCallResponse.newBuilder()
638             .setPayload(Payload.newBuilder()
639                 .setBody(ByteString.copyFrom(new byte[2653])))
640             .build(),
641         StreamingOutputCallResponse.newBuilder()
642             .setPayload(Payload.newBuilder()
643                 .setBody(ByteString.copyFrom(new byte[58979])))
644             .build());
645 
646     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
647     asyncStub.streamingOutputCall(request, recorder);
648     recorder.awaitCompletion();
649     assertSuccess(recorder);
650     assertResponses(goldenResponses, recorder.getValues());
651   }
652 
653   @Test
clientStreaming()654   public void clientStreaming() throws Exception {
655     final List<StreamingInputCallRequest> requests = Arrays.asList(
656         StreamingInputCallRequest.newBuilder()
657             .setPayload(Payload.newBuilder()
658                 .setBody(ByteString.copyFrom(new byte[27182])))
659             .build(),
660         StreamingInputCallRequest.newBuilder()
661             .setPayload(Payload.newBuilder()
662                 .setBody(ByteString.copyFrom(new byte[8])))
663             .build(),
664         StreamingInputCallRequest.newBuilder()
665             .setPayload(Payload.newBuilder()
666                 .setBody(ByteString.copyFrom(new byte[1828])))
667             .build(),
668         StreamingInputCallRequest.newBuilder()
669             .setPayload(Payload.newBuilder()
670                 .setBody(ByteString.copyFrom(new byte[45904])))
671             .build());
672     final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder()
673         .setAggregatedPayloadSize(74922)
674         .build();
675 
676     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
677     StreamObserver<StreamingInputCallRequest> requestObserver =
678         asyncStub.streamingInputCall(responseObserver);
679     for (StreamingInputCallRequest request : requests) {
680       requestObserver.onNext(request);
681     }
682     requestObserver.onCompleted();
683 
684     assertEquals(goldenResponse, responseObserver.firstValue().get());
685     responseObserver.awaitCompletion();
686     assertThat(responseObserver.getValues()).hasSize(1);
687     Throwable t = responseObserver.getError();
688     if (t != null) {
689       throw new AssertionError(t);
690     }
691   }
692 
693   /**
694    * Tests client per-message compression for streaming calls. The Java API does not support
695    * inspecting a message's compression level, so this is primarily intended to run against a gRPC
696    * C++ server.
697    */
clientCompressedStreaming(boolean probe)698   public void clientCompressedStreaming(boolean probe) throws Exception {
699     final StreamingInputCallRequest expectCompressedRequest =
700         StreamingInputCallRequest.newBuilder()
701             .setExpectCompressed(BoolValue.newBuilder().setValue(true))
702             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
703             .build();
704     final StreamingInputCallRequest expectUncompressedRequest =
705         StreamingInputCallRequest.newBuilder()
706             .setExpectCompressed(BoolValue.newBuilder().setValue(false))
707             .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
708             .build();
709     final StreamingInputCallResponse goldenResponse =
710         StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
711 
712     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
713     StreamObserver<StreamingInputCallRequest> requestObserver =
714         asyncStub.streamingInputCall(responseObserver);
715 
716     if (probe) {
717       // Send a non-compressed message with expectCompress=true. Servers supporting this test case
718       // should return INVALID_ARGUMENT.
719       requestObserver.onNext(expectCompressedRequest);
720       responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
721       Throwable e = responseObserver.getError();
722       assertNotNull("expected INVALID_ARGUMENT", e);
723       assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
724     }
725 
726     // Start a new stream
727     responseObserver = StreamRecorder.create();
728     @SuppressWarnings("unchecked")
729     ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver =
730         (ClientCallStreamObserver)
731             asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
732     clientCallStreamObserver.setMessageCompression(true);
733     clientCallStreamObserver.onNext(expectCompressedRequest);
734     clientCallStreamObserver.setMessageCompression(false);
735     clientCallStreamObserver.onNext(expectUncompressedRequest);
736     clientCallStreamObserver.onCompleted();
737     responseObserver.awaitCompletion();
738     assertSuccess(responseObserver);
739     assertEquals(goldenResponse, responseObserver.firstValue().get());
740   }
741 
742   /**
743    * Tests server per-message compression in a streaming response. Ideally we would assert that the
744    * responses have the requested compression, but this is not supported by the API. Given a
745    * compliant server, this test will exercise the code path for receiving a compressed response but
746    * cannot itself verify that the response was compressed.
747    */
serverCompressedStreaming()748   public void serverCompressedStreaming() throws Exception {
749     final StreamingOutputCallRequest request =
750         StreamingOutputCallRequest.newBuilder()
751             .addResponseParameters(
752                 ResponseParameters.newBuilder()
753                     .setCompressed(BoolValue.newBuilder().setValue(true))
754                     .setSize(31415))
755             .addResponseParameters(
756                 ResponseParameters.newBuilder()
757                     .setCompressed(BoolValue.newBuilder().setValue(false))
758                     .setSize(92653))
759             .build();
760     final List<StreamingOutputCallResponse> goldenResponses =
761         Arrays.asList(
762             StreamingOutputCallResponse.newBuilder()
763                 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415])))
764                 .build(),
765             StreamingOutputCallResponse.newBuilder()
766                 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653])))
767                 .build());
768 
769     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
770     asyncStub.streamingOutputCall(request, recorder);
771     recorder.awaitCompletion();
772     assertSuccess(recorder);
773     assertResponses(goldenResponses, recorder.getValues());
774   }
775 
776   @Test
pingPong()777   public void pingPong() throws Exception {
778     final List<StreamingOutputCallRequest> requests = Arrays.asList(
779         StreamingOutputCallRequest.newBuilder()
780             .addResponseParameters(ResponseParameters.newBuilder()
781                 .setSize(31415))
782             .setPayload(Payload.newBuilder()
783                 .setBody(ByteString.copyFrom(new byte[27182])))
784             .build(),
785         StreamingOutputCallRequest.newBuilder()
786             .addResponseParameters(ResponseParameters.newBuilder()
787                 .setSize(9))
788             .setPayload(Payload.newBuilder()
789                 .setBody(ByteString.copyFrom(new byte[8])))
790             .build(),
791         StreamingOutputCallRequest.newBuilder()
792             .addResponseParameters(ResponseParameters.newBuilder()
793                 .setSize(2653))
794             .setPayload(Payload.newBuilder()
795                 .setBody(ByteString.copyFrom(new byte[1828])))
796             .build(),
797         StreamingOutputCallRequest.newBuilder()
798             .addResponseParameters(ResponseParameters.newBuilder()
799                 .setSize(58979))
800             .setPayload(Payload.newBuilder()
801                 .setBody(ByteString.copyFrom(new byte[45904])))
802             .build());
803     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
804         StreamingOutputCallResponse.newBuilder()
805             .setPayload(Payload.newBuilder()
806                 .setBody(ByteString.copyFrom(new byte[31415])))
807             .build(),
808         StreamingOutputCallResponse.newBuilder()
809             .setPayload(Payload.newBuilder()
810                 .setBody(ByteString.copyFrom(new byte[9])))
811             .build(),
812         StreamingOutputCallResponse.newBuilder()
813             .setPayload(Payload.newBuilder()
814                 .setBody(ByteString.copyFrom(new byte[2653])))
815             .build(),
816         StreamingOutputCallResponse.newBuilder()
817             .setPayload(Payload.newBuilder()
818                 .setBody(ByteString.copyFrom(new byte[58979])))
819             .build());
820 
821     final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(5);
822     StreamObserver<StreamingOutputCallRequest> requestObserver
823         = asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
824           @Override
825           public void onNext(StreamingOutputCallResponse response) {
826             queue.add(response);
827           }
828 
829           @Override
830           public void onError(Throwable t) {
831             queue.add(t);
832           }
833 
834           @Override
835           public void onCompleted() {
836             queue.add("Completed");
837           }
838         });
839     for (int i = 0; i < requests.size(); i++) {
840       assertNull(queue.peek());
841       requestObserver.onNext(requests.get(i));
842       Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
843       assertNotNull("Timed out waiting for response", actualResponse);
844       if (actualResponse instanceof Throwable) {
845         throw new AssertionError(actualResponse);
846       }
847       assertResponse(goldenResponses.get(i), (StreamingOutputCallResponse) actualResponse);
848     }
849     requestObserver.onCompleted();
850     assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
851   }
852 
853   @Test
emptyStream()854   public void emptyStream() throws Exception {
855     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
856     StreamObserver<StreamingOutputCallRequest> requestObserver
857         = asyncStub.fullDuplexCall(responseObserver);
858     requestObserver.onCompleted();
859     responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
860     assertSuccess(responseObserver);
861     assertTrue("Expected an empty stream", responseObserver.getValues().isEmpty());
862   }
863 
864   @Test
cancelAfterBegin()865   public void cancelAfterBegin() throws Exception {
866     StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create();
867     StreamObserver<StreamingInputCallRequest> requestObserver =
868         asyncStub.streamingInputCall(responseObserver);
869     requestObserver.onError(new RuntimeException());
870     responseObserver.awaitCompletion();
871     assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues());
872     assertEquals(Status.Code.CANCELLED,
873         Status.fromThrowable(responseObserver.getError()).getCode());
874 
875     if (metricsExpected()) {
876       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
877       checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall", true);
878       // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
879       // recorded.  The tracer stats rely on the stream being created, which is not always the case
880       // in this test.  Therefore we don't check the tracer stats.
881       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
882       checkEndTags(
883           clientEndRecord, "grpc.testing.TestService/StreamingInputCall",
884           Status.CANCELLED.getCode(), true);
885       // Do not check server-side metrics, because the status on the server side is undetermined.
886     }
887   }
888 
889   @Test
cancelAfterFirstResponse()890   public void cancelAfterFirstResponse() throws Exception {
891     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
892         .addResponseParameters(ResponseParameters.newBuilder()
893             .setSize(31415))
894         .setPayload(Payload.newBuilder()
895             .setBody(ByteString.copyFrom(new byte[27182])))
896         .build();
897     final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder()
898         .setPayload(Payload.newBuilder()
899             .setBody(ByteString.copyFrom(new byte[31415])))
900         .build();
901 
902     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
903     StreamObserver<StreamingOutputCallRequest> requestObserver
904         = asyncStub.fullDuplexCall(responseObserver);
905     requestObserver.onNext(request);
906     assertResponse(goldenResponse, responseObserver.firstValue().get());
907     requestObserver.onError(new RuntimeException());
908     responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
909     assertEquals(1, responseObserver.getValues().size());
910     assertEquals(Status.Code.CANCELLED,
911                  Status.fromThrowable(responseObserver.getError()).getCode());
912 
913     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED);
914   }
915 
916   @Test
fullDuplexCallShouldSucceed()917   public void fullDuplexCallShouldSucceed() throws Exception {
918     // Build the request.
919     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
920     StreamingOutputCallRequest.Builder streamingOutputBuilder =
921         StreamingOutputCallRequest.newBuilder();
922     for (Integer size : responseSizes) {
923       streamingOutputBuilder.addResponseParameters(
924           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
925     }
926     final StreamingOutputCallRequest request = streamingOutputBuilder.build();
927 
928     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
929     StreamObserver<StreamingOutputCallRequest> requestStream =
930         asyncStub.fullDuplexCall(recorder);
931 
932     final int numRequests = 10;
933     List<StreamingOutputCallRequest> requests =
934         new ArrayList<>(numRequests);
935     for (int ix = numRequests; ix > 0; --ix) {
936       requests.add(request);
937       requestStream.onNext(request);
938     }
939     requestStream.onCompleted();
940     recorder.awaitCompletion();
941     assertSuccess(recorder);
942     assertEquals(responseSizes.size() * (long) numRequests, recorder.getValues().size());
943     for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
944       StreamingOutputCallResponse response = recorder.getValues().get(ix);
945       int length = response.getPayload().getBody().size();
946       int expectedSize = responseSizes.get(ix % responseSizes.size());
947       assertEquals("comparison failed at index " + ix, expectedSize, length);
948     }
949 
950     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests,
951         recorder.getValues());
952   }
953 
954   @Test
halfDuplexCallShouldSucceed()955   public void halfDuplexCallShouldSucceed() throws Exception {
956     // Build the request.
957     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
958     StreamingOutputCallRequest.Builder streamingOutputBuilder =
959         StreamingOutputCallRequest.newBuilder();
960     for (Integer size : responseSizes) {
961       streamingOutputBuilder.addResponseParameters(
962           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
963     }
964     final StreamingOutputCallRequest request = streamingOutputBuilder.build();
965 
966     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
967     StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder);
968 
969     final int numRequests = 10;
970     for (int ix = numRequests; ix > 0; --ix) {
971       requestStream.onNext(request);
972     }
973     requestStream.onCompleted();
974     recorder.awaitCompletion();
975     assertSuccess(recorder);
976     assertEquals(responseSizes.size() * (long) numRequests, recorder.getValues().size());
977     for (int ix = 0; ix < recorder.getValues().size(); ++ix) {
978       StreamingOutputCallResponse response = recorder.getValues().get(ix);
979       int length = response.getPayload().getBody().size();
980       int expectedSize = responseSizes.get(ix % responseSizes.size());
981       assertEquals("comparison failed at index " + ix, expectedSize, length);
982     }
983   }
984 
985   @Test
serverStreamingShouldBeFlowControlled()986   public void serverStreamingShouldBeFlowControlled() throws Exception {
987     final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
988         .addResponseParameters(ResponseParameters.newBuilder().setSize(100000))
989         .addResponseParameters(ResponseParameters.newBuilder().setSize(100001))
990         .build();
991     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
992         StreamingOutputCallResponse.newBuilder()
993             .setPayload(Payload.newBuilder()
994                 .setBody(ByteString.copyFrom(new byte[100000]))).build(),
995         StreamingOutputCallResponse.newBuilder()
996             .setPayload(Payload.newBuilder()
997                 .setBody(ByteString.copyFrom(new byte[100001]))).build());
998 
999     long start = System.nanoTime();
1000 
1001     final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);
1002     ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
1003         channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT);
1004     call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
1005       @Override
1006       public void onHeaders(Metadata headers) {}
1007 
1008       @Override
1009       public void onMessage(final StreamingOutputCallResponse message) {
1010         queue.add(message);
1011       }
1012 
1013       @Override
1014       public void onClose(Status status, Metadata trailers) {
1015         queue.add(status);
1016       }
1017     }, new Metadata());
1018     call.sendMessage(request);
1019     call.halfClose();
1020 
1021     // Time how long it takes to get the first response.
1022     call.request(1);
1023     Object response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
1024     assertTrue(response instanceof StreamingOutputCallResponse);
1025     assertResponse(goldenResponses.get(0), (StreamingOutputCallResponse) response);
1026     long firstCallDuration = System.nanoTime() - start;
1027 
1028     // Without giving additional flow control, make sure that we don't get another response. We wait
1029     // until we are comfortable the next message isn't coming. We may have very low nanoTime
1030     // resolution (like on Windows) or be using a testing, in-process transport where message
1031     // handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time
1032     // to at least 1ms.
1033     assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS));
1034 
1035     // Make sure that everything still completes.
1036     call.request(1);
1037     response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
1038     assertTrue(response instanceof StreamingOutputCallResponse);
1039     assertResponse(goldenResponses.get(1), (StreamingOutputCallResponse) response);
1040     assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1041   }
1042 
1043   @Test
veryLargeRequest()1044   public void veryLargeRequest() throws Exception {
1045     assumeEnoughMemory();
1046     final SimpleRequest request = SimpleRequest.newBuilder()
1047         .setPayload(Payload.newBuilder()
1048             .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
1049         .setResponseSize(10)
1050         .build();
1051     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1052         .setPayload(Payload.newBuilder()
1053             .setBody(ByteString.copyFrom(new byte[10])))
1054         .build();
1055 
1056     assertResponse(goldenResponse, blockingStub.unaryCall(request));
1057   }
1058 
1059   @Test
veryLargeResponse()1060   public void veryLargeResponse() throws Exception {
1061     assumeEnoughMemory();
1062     final SimpleRequest request = SimpleRequest.newBuilder()
1063         .setResponseSize(unaryPayloadLength())
1064         .build();
1065     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1066         .setPayload(Payload.newBuilder()
1067             .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()])))
1068         .build();
1069 
1070     assertResponse(goldenResponse, blockingStub.unaryCall(request));
1071   }
1072 
1073   @Test
exchangeMetadataUnaryCall()1074   public void exchangeMetadataUnaryCall() throws Exception {
1075     // Capture the metadata exchange
1076     Metadata fixedHeaders = new Metadata();
1077     // Send a metadata proto
1078     StringValue metadataValue = StringValue.newBuilder().setValue("dog").build();
1079     fixedHeaders.put(Util.METADATA_KEY, metadataValue);
1080     // .. and expect it to be echoed back in trailers
1081     AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
1082     AtomicReference<Metadata> headersCapture = new AtomicReference<>();
1083     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub.withInterceptors(
1084         MetadataUtils.newAttachHeadersInterceptor(fixedHeaders),
1085         MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture));
1086 
1087     assertNotNull(stub.emptyCall(EMPTY));
1088 
1089     // Assert that our side channel object is echoed back in both headers and trailers
1090     Assert.assertEquals(metadataValue, headersCapture.get().get(Util.METADATA_KEY));
1091     Assert.assertEquals(metadataValue, trailersCapture.get().get(Util.METADATA_KEY));
1092   }
1093 
1094   @Test
exchangeMetadataStreamingCall()1095   public void exchangeMetadataStreamingCall() throws Exception {
1096     // Capture the metadata exchange
1097     Metadata fixedHeaders = new Metadata();
1098     // Send a metadata proto
1099     StringValue metadataValue = StringValue.newBuilder().setValue("dog").build();
1100     fixedHeaders.put(Util.METADATA_KEY, metadataValue);
1101     // .. and expect it to be echoed back in trailers
1102     AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
1103     AtomicReference<Metadata> headersCapture = new AtomicReference<>();
1104     TestServiceGrpc.TestServiceStub stub = asyncStub.withInterceptors(
1105         MetadataUtils.newAttachHeadersInterceptor(fixedHeaders),
1106         MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture));
1107 
1108     List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200);
1109     Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder =
1110         Messages.StreamingOutputCallRequest.newBuilder();
1111     for (Integer size : responseSizes) {
1112       streamingOutputBuilder.addResponseParameters(
1113           ResponseParameters.newBuilder().setSize(size).setIntervalUs(0));
1114     }
1115     final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build();
1116 
1117     StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
1118     StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
1119         stub.fullDuplexCall(recorder);
1120 
1121     final int numRequests = 10;
1122     for (int ix = numRequests; ix > 0; --ix) {
1123       requestStream.onNext(request);
1124     }
1125     requestStream.onCompleted();
1126     recorder.awaitCompletion();
1127     assertSuccess(recorder);
1128     org.junit.Assert.assertEquals(
1129         responseSizes.size() * (long) numRequests, recorder.getValues().size());
1130 
1131     // Assert that our side channel object is echoed back in both headers and trailers
1132     Assert.assertEquals(metadataValue, headersCapture.get().get(Util.METADATA_KEY));
1133     Assert.assertEquals(metadataValue, trailersCapture.get().get(Util.METADATA_KEY));
1134   }
1135 
1136   @Test
sendsTimeoutHeader()1137   public void sendsTimeoutHeader() {
1138     Assume.assumeTrue("can not capture request headers on server side", server != null);
1139     long configuredTimeoutMinutes = 100;
1140     TestServiceGrpc.TestServiceBlockingStub stub =
1141         blockingStub.withDeadlineAfter(configuredTimeoutMinutes, TimeUnit.MINUTES);
1142     stub.emptyCall(EMPTY);
1143     long transferredTimeoutMinutes = TimeUnit.NANOSECONDS.toMinutes(
1144         requestHeadersCapture.get().get(GrpcUtil.TIMEOUT_KEY));
1145     Assert.assertTrue(
1146         "configuredTimeoutMinutes=" + configuredTimeoutMinutes
1147             + ", transferredTimeoutMinutes=" + transferredTimeoutMinutes,
1148         configuredTimeoutMinutes - transferredTimeoutMinutes >= 0
1149             && configuredTimeoutMinutes - transferredTimeoutMinutes <= 1);
1150   }
1151 
1152   @Test
deadlineNotExceeded()1153   public void deadlineNotExceeded() {
1154     // warm up the channel and JVM
1155     blockingStub.emptyCall(Empty.getDefaultInstance());
1156     blockingStub
1157         .withDeadlineAfter(10, TimeUnit.SECONDS)
1158         .streamingOutputCall(StreamingOutputCallRequest.newBuilder()
1159             .addResponseParameters(ResponseParameters.newBuilder()
1160                 .setIntervalUs(0))
1161                 .build()).next();
1162   }
1163 
1164   @Test
deadlineExceeded()1165   public void deadlineExceeded() throws Exception {
1166     // warm up the channel and JVM
1167     blockingStub.emptyCall(Empty.getDefaultInstance());
1168     TestServiceGrpc.TestServiceBlockingStub stub =
1169         blockingStub.withDeadlineAfter(1, TimeUnit.SECONDS);
1170     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1171         .addResponseParameters(ResponseParameters.newBuilder()
1172             .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20)))
1173         .build();
1174     try {
1175       stub.streamingOutputCall(request).next();
1176       fail("Expected deadline to be exceeded");
1177     } catch (StatusRuntimeException ex) {
1178       assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode());
1179       String desc = ex.getStatus().getDescription();
1180       assertTrue(desc,
1181           // There is a race between client and server-side deadline expiration.
1182           // If client expires first, it'd generate this message
1183           Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc)
1184           // If server expires first, it'd reset the stream and client would generate a different
1185           // message
1186           || desc.startsWith("ClientCall was cancelled at or after deadline."));
1187     }
1188 
1189     assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
1190     if (metricsExpected()) {
1191       // Stream may not have been created before deadline is exceeded, thus we don't test the tracer
1192       // stats.
1193       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1194       checkStartTags(
1195           clientStartRecord, "grpc.testing.TestService/StreamingOutputCall", true);
1196       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1197       checkEndTags(
1198           clientEndRecord,
1199           "grpc.testing.TestService/StreamingOutputCall",
1200           Status.Code.DEADLINE_EXCEEDED, true);
1201       // Do not check server-side metrics, because the status on the server side is undetermined.
1202     }
1203   }
1204 
1205   @Test
deadlineExceededServerStreaming()1206   public void deadlineExceededServerStreaming() throws Exception {
1207     // warm up the channel and JVM
1208     blockingStub.emptyCall(Empty.getDefaultInstance());
1209     assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK);
1210     ResponseParameters.Builder responseParameters = ResponseParameters.newBuilder()
1211         .setSize(1)
1212         .setIntervalUs(10000);
1213     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1214         .addResponseParameters(responseParameters)
1215         .addResponseParameters(responseParameters)
1216         .addResponseParameters(responseParameters)
1217         .addResponseParameters(responseParameters)
1218         .build();
1219     StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create();
1220     asyncStub
1221         .withDeadlineAfter(30, TimeUnit.MILLISECONDS)
1222         .streamingOutputCall(request, recorder);
1223     recorder.awaitCompletion();
1224     assertEquals(Status.DEADLINE_EXCEEDED.getCode(),
1225         Status.fromThrowable(recorder.getError()).getCode());
1226     if (metricsExpected()) {
1227       // Stream may not have been created when deadline is exceeded, thus we don't check tracer
1228       // stats.
1229       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1230       checkStartTags(
1231           clientStartRecord, "grpc.testing.TestService/StreamingOutputCall", true);
1232       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1233       checkEndTags(
1234           clientEndRecord,
1235           "grpc.testing.TestService/StreamingOutputCall",
1236           Status.Code.DEADLINE_EXCEEDED, true);
1237       // Do not check server-side metrics, because the status on the server side is undetermined.
1238     }
1239   }
1240 
1241   @Test
deadlineInPast()1242   public void deadlineInPast() throws Exception {
1243     // Test once with idle channel and once with active channel
1244     try {
1245       blockingStub
1246           .withDeadlineAfter(-10, TimeUnit.SECONDS)
1247           .emptyCall(Empty.getDefaultInstance());
1248       fail("Should have thrown");
1249     } catch (StatusRuntimeException ex) {
1250       assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
1251       assertThat(ex.getStatus().getDescription())
1252         .startsWith("ClientCall started after CallOptions deadline was exceeded");
1253     }
1254 
1255     // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
1256     // recorded.  The tracer stats rely on the stream being created, which is not the case if
1257     // deadline is exceeded before the call is created. Therefore we don't check the tracer stats.
1258     if (metricsExpected()) {
1259       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1260       checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall", true);
1261       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1262       checkEndTags(
1263           clientEndRecord, "grpc.testing.TestService/EmptyCall",
1264           Status.DEADLINE_EXCEEDED.getCode(), true);
1265       assertZeroRetryRecorded();
1266     }
1267 
1268     // warm up the channel
1269     blockingStub.emptyCall(Empty.getDefaultInstance());
1270     if (metricsExpected()) {
1271       // clientStartRecord
1272       clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1273       // clientEndRecord
1274       clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1275       assertZeroRetryRecorded();
1276     }
1277     try {
1278       blockingStub
1279           .withDeadlineAfter(-10, TimeUnit.SECONDS)
1280           .emptyCall(Empty.getDefaultInstance());
1281       fail("Should have thrown");
1282     } catch (StatusRuntimeException ex) {
1283       assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode());
1284       assertThat(ex.getStatus().getDescription())
1285         .startsWith("ClientCall started after CallOptions deadline was exceeded");
1286     }
1287     if (metricsExpected()) {
1288       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1289       checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall", true);
1290       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1291       checkEndTags(
1292           clientEndRecord, "grpc.testing.TestService/EmptyCall",
1293           Status.DEADLINE_EXCEEDED.getCode(), true);
1294       assertZeroRetryRecorded();
1295     }
1296   }
1297 
1298   @Test
maxInboundSize_exact()1299   public void maxInboundSize_exact() {
1300     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1301         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1302         .build();
1303 
1304     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1305         TestServiceGrpc.getStreamingOutputCallMethod();
1306     ByteSizeMarshaller<StreamingOutputCallResponse> mar =
1307         new ByteSizeMarshaller<>(md.getResponseMarshaller());
1308     blockingServerStreamingCall(
1309         blockingStub.getChannel(),
1310         md.toBuilder(md.getRequestMarshaller(), mar).build(),
1311         blockingStub.getCallOptions(),
1312         request)
1313         .next();
1314 
1315     int size = mar.lastInSize;
1316 
1317     TestServiceGrpc.TestServiceBlockingStub stub =
1318         blockingStub.withMaxInboundMessageSize(size);
1319 
1320     stub.streamingOutputCall(request).next();
1321   }
1322 
1323   @Test
maxInboundSize_tooBig()1324   public void maxInboundSize_tooBig() {
1325     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1326         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1327         .build();
1328 
1329     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1330         TestServiceGrpc.getStreamingOutputCallMethod();
1331     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1332         new ByteSizeMarshaller<>(md.getRequestMarshaller());
1333     blockingServerStreamingCall(
1334         blockingStub.getChannel(),
1335         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1336         blockingStub.getCallOptions(),
1337         request)
1338         .next();
1339 
1340     int size = mar.lastOutSize;
1341 
1342     TestServiceGrpc.TestServiceBlockingStub stub =
1343         blockingStub.withMaxInboundMessageSize(size - 1);
1344 
1345     try {
1346       stub.streamingOutputCall(request).next();
1347       fail();
1348     } catch (StatusRuntimeException ex) {
1349       Status s = ex.getStatus();
1350       assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.RESOURCE_EXHAUSTED);
1351       assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum");
1352     }
1353   }
1354 
1355   @Test
maxOutboundSize_exact()1356   public void maxOutboundSize_exact() {
1357     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1358         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1359         .build();
1360 
1361     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1362         TestServiceGrpc.getStreamingOutputCallMethod();
1363     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1364         new ByteSizeMarshaller<>(md.getRequestMarshaller());
1365     blockingServerStreamingCall(
1366         blockingStub.getChannel(),
1367         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1368         blockingStub.getCallOptions(),
1369         request)
1370         .next();
1371 
1372     int size = mar.lastOutSize;
1373 
1374     TestServiceGrpc.TestServiceBlockingStub stub =
1375         blockingStub.withMaxOutboundMessageSize(size);
1376 
1377     stub.streamingOutputCall(request).next();
1378   }
1379 
1380   @Test
maxOutboundSize_tooBig()1381   public void maxOutboundSize_tooBig() {
1382     // set at least one field to ensure the size is non-zero.
1383     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1384         .addResponseParameters(ResponseParameters.newBuilder().setSize(1))
1385         .build();
1386 
1387 
1388     MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md =
1389         TestServiceGrpc.getStreamingOutputCallMethod();
1390     ByteSizeMarshaller<StreamingOutputCallRequest> mar =
1391         new ByteSizeMarshaller<>(md.getRequestMarshaller());
1392     blockingServerStreamingCall(
1393         blockingStub.getChannel(),
1394         md.toBuilder(mar, md.getResponseMarshaller()).build(),
1395         blockingStub.getCallOptions(),
1396         request)
1397         .next();
1398 
1399     TestServiceGrpc.TestServiceBlockingStub stub =
1400         blockingStub.withMaxOutboundMessageSize(mar.lastOutSize - 1);
1401     try {
1402       stub.streamingOutputCall(request).next();
1403       fail();
1404     } catch (StatusRuntimeException ex) {
1405       Status s = ex.getStatus();
1406       assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.CANCELLED);
1407       assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large");
1408     }
1409   }
1410 
unaryPayloadLength()1411   protected int unaryPayloadLength() {
1412     // 10MiB.
1413     return 10485760;
1414   }
1415 
1416   @Test
gracefulShutdown()1417   public void gracefulShutdown() throws Exception {
1418     final List<StreamingOutputCallRequest> requests = Arrays.asList(
1419         StreamingOutputCallRequest.newBuilder()
1420             .addResponseParameters(ResponseParameters.newBuilder()
1421                 .setSize(3))
1422             .setPayload(Payload.newBuilder()
1423                 .setBody(ByteString.copyFrom(new byte[2])))
1424             .build(),
1425         StreamingOutputCallRequest.newBuilder()
1426             .addResponseParameters(ResponseParameters.newBuilder()
1427                 .setSize(1))
1428             .setPayload(Payload.newBuilder()
1429                 .setBody(ByteString.copyFrom(new byte[7])))
1430             .build(),
1431         StreamingOutputCallRequest.newBuilder()
1432             .addResponseParameters(ResponseParameters.newBuilder()
1433                 .setSize(4))
1434             .setPayload(Payload.newBuilder()
1435                 .setBody(ByteString.copyFrom(new byte[1])))
1436             .build());
1437     final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
1438         StreamingOutputCallResponse.newBuilder()
1439             .setPayload(Payload.newBuilder()
1440                 .setBody(ByteString.copyFrom(new byte[3])))
1441             .build(),
1442         StreamingOutputCallResponse.newBuilder()
1443             .setPayload(Payload.newBuilder()
1444                 .setBody(ByteString.copyFrom(new byte[1])))
1445             .build(),
1446         StreamingOutputCallResponse.newBuilder()
1447             .setPayload(Payload.newBuilder()
1448                 .setBody(ByteString.copyFrom(new byte[4])))
1449             .build());
1450 
1451     final ArrayBlockingQueue<StreamingOutputCallResponse> responses =
1452         new ArrayBlockingQueue<>(3);
1453     final SettableFuture<Void> completed = SettableFuture.create();
1454     final SettableFuture<Void> errorSeen = SettableFuture.create();
1455     StreamObserver<StreamingOutputCallResponse> responseObserver =
1456         new StreamObserver<StreamingOutputCallResponse>() {
1457 
1458           @Override
1459           public void onNext(StreamingOutputCallResponse value) {
1460             responses.add(value);
1461           }
1462 
1463           @Override
1464           public void onError(Throwable t) {
1465             errorSeen.set(null);
1466           }
1467 
1468           @Override
1469           public void onCompleted() {
1470             completed.set(null);
1471           }
1472         };
1473     StreamObserver<StreamingOutputCallRequest> requestObserver
1474         = asyncStub.fullDuplexCall(responseObserver);
1475     requestObserver.onNext(requests.get(0));
1476     assertResponse(
1477         goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1478     // Initiate graceful shutdown.
1479     channel.shutdown();
1480     requestObserver.onNext(requests.get(1));
1481     assertResponse(
1482         goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1483     // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't.
1484     requestObserver.onNext(requests.get(2));
1485     assertResponse(
1486         goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1487     assertFalse(completed.isDone());
1488     requestObserver.onCompleted();
1489     completed.get(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
1490     assertFalse(errorSeen.isDone());
1491   }
1492 
1493   @Test
customMetadata()1494   public void customMetadata() throws Exception {
1495     final int responseSize = 314159;
1496     final int requestSize = 271828;
1497     final SimpleRequest request = SimpleRequest.newBuilder()
1498         .setResponseSize(responseSize)
1499         .setPayload(Payload.newBuilder()
1500             .setBody(ByteString.copyFrom(new byte[requestSize])))
1501         .build();
1502     final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
1503         .addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize))
1504         .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize])))
1505         .build();
1506     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1507         .setPayload(Payload.newBuilder()
1508             .setBody(ByteString.copyFrom(new byte[responseSize])))
1509         .build();
1510     final StreamingOutputCallResponse goldenStreamingResponse =
1511         StreamingOutputCallResponse.newBuilder()
1512             .setPayload(Payload.newBuilder()
1513             .setBody(ByteString.copyFrom(new byte[responseSize])))
1514         .build();
1515     final byte[] trailingBytes =
1516         {(byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb};
1517 
1518     // Test UnaryCall
1519     Metadata metadata = new Metadata();
1520     metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value");
1521     metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
1522     AtomicReference<Metadata> headersCapture = new AtomicReference<>();
1523     AtomicReference<Metadata> trailersCapture = new AtomicReference<>();
1524     TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub.withInterceptors(
1525         MetadataUtils.newAttachHeadersInterceptor(metadata),
1526         MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture));
1527     SimpleResponse response = blockingStub.unaryCall(request);
1528 
1529     assertResponse(goldenResponse, response);
1530     assertEquals("test_initial_metadata_value",
1531         headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
1532     assertTrue(
1533         Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY)));
1534     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
1535         Collections.singleton(request), Collections.singleton(goldenResponse));
1536 
1537     // Test FullDuplexCall
1538     metadata = new Metadata();
1539     metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value");
1540     metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes);
1541     headersCapture = new AtomicReference<>();
1542     trailersCapture = new AtomicReference<>();
1543     TestServiceGrpc.TestServiceStub stub = asyncStub.withInterceptors(
1544         MetadataUtils.newAttachHeadersInterceptor(metadata),
1545         MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture));
1546 
1547     StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create();
1548     StreamObserver<Messages.StreamingOutputCallRequest> requestStream =
1549         stub.fullDuplexCall(recorder);
1550     requestStream.onNext(streamingRequest);
1551     requestStream.onCompleted();
1552     recorder.awaitCompletion();
1553 
1554     assertSuccess(recorder);
1555     assertResponse(goldenStreamingResponse, recorder.firstValue().get());
1556     assertEquals("test_initial_metadata_value",
1557         headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY));
1558     assertTrue(
1559         Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY)));
1560     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK,
1561         Collections.singleton(streamingRequest), Collections.singleton(goldenStreamingResponse));
1562   }
1563 
1564   @SuppressWarnings("deprecation")
1565   @Test(timeout = 10000)
censusContextsPropagated()1566   public void censusContextsPropagated() {
1567     Assume.assumeTrue("Skip the test because server is not in the same process.", server != null);
1568     Assume.assumeTrue(customCensusModulePresent());
1569     Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan();
1570     // A valid ID is guaranteed to be unique, so we can verify it is actually propagated.
1571     assertTrue(clientParentSpan.getContext().getTraceId().isValid());
1572     Context ctx =
1573         io.opencensus.tags.unsafe.ContextUtils.withValue(
1574             Context.ROOT,
1575             tagger
1576                 .emptyBuilder()
1577                 .putLocal(StatsTestUtils.EXTRA_TAG, TagValue.create("extra value"))
1578                 .build());
1579     ctx = io.opencensus.trace.unsafe.ContextUtils.withValue(ctx, clientParentSpan);
1580     Context origCtx = ctx.attach();
1581     try {
1582       blockingStub.unaryCall(SimpleRequest.getDefaultInstance());
1583       Context serverCtx = contextCapture.get();
1584       assertNotNull(serverCtx);
1585 
1586       FakeTagContext statsCtx =
1587           (FakeTagContext) io.opencensus.tags.unsafe.ContextUtils.getValue(serverCtx);
1588       assertNotNull(statsCtx);
1589       Map<TagKey, TagValue> tags = statsCtx.getTags();
1590       boolean tagFound = false;
1591       for (Map.Entry<TagKey, TagValue> tag : tags.entrySet()) {
1592         if (tag.getKey().equals(StatsTestUtils.EXTRA_TAG)) {
1593           assertEquals(TagValue.create("extra value"), tag.getValue());
1594           tagFound = true;
1595         }
1596       }
1597       assertTrue("tag not found", tagFound);
1598 
1599       Span span = io.opencensus.trace.unsafe.ContextUtils.getValue(serverCtx);
1600       assertNotNull(span);
1601       SpanContext spanContext = span.getContext();
1602       assertEquals(clientParentSpan.getContext().getTraceId(), spanContext.getTraceId());
1603     } finally {
1604       ctx.detach(origCtx);
1605     }
1606   }
1607 
1608   @Test
statusCodeAndMessage()1609   public void statusCodeAndMessage() throws Exception {
1610     int errorCode = 2;
1611     String errorMessage = "test status message";
1612     EchoStatus responseStatus = EchoStatus.newBuilder()
1613         .setCode(errorCode)
1614         .setMessage(errorMessage)
1615         .build();
1616     SimpleRequest simpleRequest = SimpleRequest.newBuilder()
1617         .setResponseStatus(responseStatus)
1618         .build();
1619     StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder()
1620         .setResponseStatus(responseStatus)
1621         .build();
1622 
1623     // Test UnaryCall
1624     try {
1625       blockingStub.unaryCall(simpleRequest);
1626       fail();
1627     } catch (StatusRuntimeException e) {
1628       assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode());
1629       assertEquals(errorMessage, e.getStatus().getDescription());
1630     }
1631     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN);
1632 
1633     // Test FullDuplexCall
1634     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
1635     StreamObserver<StreamingOutputCallRequest> requestObserver
1636         = asyncStub.fullDuplexCall(responseObserver);
1637     requestObserver.onNext(streamingRequest);
1638     requestObserver.onCompleted();
1639 
1640     assertThat(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS))
1641         .isTrue();
1642     assertThat(responseObserver.getError()).isNotNull();
1643     Status status = Status.fromThrowable(responseObserver.getError());
1644     assertEquals(Status.UNKNOWN.getCode(), status.getCode());
1645     assertEquals(errorMessage, status.getDescription());
1646     assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN);
1647   }
1648 
1649   @Test
specialStatusMessage()1650   public void specialStatusMessage() throws Exception {
1651     int errorCode = 2;
1652     String errorMessage = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP ��\t\n";
1653     SimpleRequest simpleRequest = SimpleRequest.newBuilder()
1654         .setResponseStatus(EchoStatus.newBuilder()
1655             .setCode(errorCode)
1656             .setMessage(errorMessage)
1657             .build())
1658         .build();
1659 
1660     try {
1661       blockingStub.unaryCall(simpleRequest);
1662       fail();
1663     } catch (StatusRuntimeException e) {
1664       assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode());
1665       assertEquals(errorMessage, e.getStatus().getDescription());
1666     }
1667     assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN);
1668   }
1669 
1670   /** Sends an rpc to an unimplemented method within TestService. */
1671   @Test
unimplementedMethod()1672   public void unimplementedMethod() {
1673     try {
1674       blockingStub.unimplementedCall(Empty.getDefaultInstance());
1675       fail();
1676     } catch (StatusRuntimeException e) {
1677       assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
1678     }
1679 
1680     assertClientStatsTrace("grpc.testing.TestService/UnimplementedCall",
1681         Status.Code.UNIMPLEMENTED);
1682   }
1683 
1684   /** Sends an rpc to an unimplemented service on the server. */
1685   @Test
unimplementedService()1686   public void unimplementedService() {
1687     UnimplementedServiceGrpc.UnimplementedServiceBlockingStub stub =
1688         UnimplementedServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor);
1689     try {
1690       stub.unimplementedCall(Empty.getDefaultInstance());
1691       fail();
1692     } catch (StatusRuntimeException e) {
1693       assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode());
1694     }
1695 
1696     assertStatsTrace("grpc.testing.UnimplementedService/UnimplementedCall",
1697         Status.Code.UNIMPLEMENTED);
1698   }
1699 
1700   /** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */
1701   @SuppressWarnings("MissingFail")
1702   @Test
timeoutOnSleepingServer()1703   public void timeoutOnSleepingServer() throws Exception {
1704     TestServiceGrpc.TestServiceStub stub =
1705         asyncStub.withDeadlineAfter(1, TimeUnit.MILLISECONDS);
1706 
1707     StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
1708     StreamObserver<StreamingOutputCallRequest> requestObserver
1709         = stub.fullDuplexCall(responseObserver);
1710 
1711     StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1712         .setPayload(Payload.newBuilder()
1713             .setBody(ByteString.copyFrom(new byte[27182])))
1714         .build();
1715     try {
1716       requestObserver.onNext(request);
1717     } catch (IllegalStateException expected) {
1718       // This can happen if the stream has already been terminated due to deadline exceeded.
1719     }
1720 
1721     assertTrue(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS));
1722     assertEquals(0, responseObserver.getValues().size());
1723     assertEquals(Status.DEADLINE_EXCEEDED.getCode(),
1724                  Status.fromThrowable(responseObserver.getError()).getCode());
1725 
1726     if (metricsExpected()) {
1727       // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be
1728       // recorded.  The tracer stats rely on the stream being created, which is not always the case
1729       // in this test, thus we will not check that.
1730       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1731       checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall", true);
1732       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
1733       checkEndTags(
1734           clientEndRecord,
1735           "grpc.testing.TestService/FullDuplexCall",
1736           Status.DEADLINE_EXCEEDED.getCode(), true);
1737     }
1738   }
1739 
1740   /**
1741    * Verifies remote server address and local client address are available from ClientCall
1742    * Attributes via ClientInterceptor.
1743    */
1744   @Test
getServerAddressAndLocalAddressFromClient()1745   public void getServerAddressAndLocalAddressFromClient() {
1746     assertNotNull(obtainRemoteServerAddr());
1747     assertNotNull(obtainLocalClientAddr());
1748   }
1749 
1750   /**
1751    *  Test backend metrics per query reporting: expect the test client LB policy to receive load
1752    *  reports.
1753    */
testOrcaPerRpc()1754   public void testOrcaPerRpc() throws Exception {
1755     AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
1756     TestOrcaReport answer = TestOrcaReport.newBuilder()
1757         .setCpuUtilization(0.8210)
1758         .setMemoryUtilization(0.5847)
1759         .putRequestCost("cost", 3456.32)
1760         .putUtilization("util", 0.30499)
1761         .build();
1762     blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall(
1763         SimpleRequest.newBuilder().setOrcaPerQueryReport(answer).build());
1764     assertThat(reportHolder.get()).isEqualTo(answer);
1765   }
1766 
1767   /**
1768    *  Test backend metrics OOB reporting: expect the test client LB policy to receive load reports.
1769    */
testOrcaOob()1770   public void testOrcaOob() throws Exception {
1771     AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>();
1772     final TestOrcaReport answer = TestOrcaReport.newBuilder()
1773         .setCpuUtilization(0.8210)
1774         .setMemoryUtilization(0.5847)
1775         .putUtilization("util", 0.30499)
1776         .build();
1777     final TestOrcaReport answer2 = TestOrcaReport.newBuilder()
1778         .setCpuUtilization(0.29309)
1779         .setMemoryUtilization(0.2)
1780         .putUtilization("util", 0.2039)
1781         .build();
1782 
1783     final int retryLimit = 5;
1784     BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
1785     final Object lastItem = new Object();
1786     StreamObserver<StreamingOutputCallRequest> streamObserver =
1787         asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {
1788 
1789           @Override
1790           public void onNext(StreamingOutputCallResponse value) {
1791             queue.add(value);
1792           }
1793 
1794           @Override
1795           public void onError(Throwable t) {
1796             queue.add(t);
1797           }
1798 
1799           @Override
1800           public void onCompleted() {
1801             queue.add(lastItem);
1802           }
1803         });
1804 
1805     streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
1806         .setOrcaOobReport(answer)
1807         .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
1808     assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
1809     int i = 0;
1810     for (; i < retryLimit; i++) {
1811       Thread.sleep(1000);
1812       blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
1813       if (answer.equals(reportHolder.get())) {
1814         break;
1815       }
1816     }
1817     assertThat(i).isLessThan(retryLimit);
1818     streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
1819         .setOrcaOobReport(answer2)
1820         .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
1821     assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
1822 
1823     for (i = 0; i < retryLimit; i++) {
1824       Thread.sleep(1000);
1825       blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY);
1826       if (reportHolder.get().equals(answer2)) {
1827         break;
1828       }
1829     }
1830     assertThat(i).isLessThan(retryLimit);
1831     streamObserver.onCompleted();
1832     assertThat(queue.take()).isSameInstanceAs(lastItem);
1833   }
1834 
1835   /** Sends a large unary rpc with service account credentials. */
serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)1836   public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)
1837       throws Exception {
1838     // cast to ServiceAccountCredentials to double-check the right type of object was created.
1839     GoogleCredentials credentials =
1840         ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream));
1841     credentials = credentials.createScoped(Arrays.asList(authScope));
1842     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1843         .withCallCredentials(MoreCallCredentials.from(credentials));
1844     final SimpleRequest request = SimpleRequest.newBuilder()
1845         .setFillUsername(true)
1846         .setFillOauthScope(true)
1847         .setResponseSize(314159)
1848         .setPayload(Payload.newBuilder()
1849             .setBody(ByteString.copyFrom(new byte[271828])))
1850         .build();
1851 
1852     final SimpleResponse response = stub.unaryCall(request);
1853     assertFalse(response.getUsername().isEmpty());
1854     assertTrue("Received username: " + response.getUsername(),
1855         jsonKey.contains(response.getUsername()));
1856     assertFalse(response.getOauthScope().isEmpty());
1857     assertTrue("Received oauth scope: " + response.getOauthScope(),
1858         authScope.contains(response.getOauthScope()));
1859 
1860     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1861         .setOauthScope(response.getOauthScope())
1862         .setUsername(response.getUsername())
1863         .setPayload(Payload.newBuilder()
1864             .setBody(ByteString.copyFrom(new byte[314159])))
1865         .build();
1866     assertResponse(goldenResponse, response);
1867   }
1868 
1869   /** Sends a large unary rpc with compute engine credentials. */
computeEngineCreds(String serviceAccount, String oauthScope)1870   public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception {
1871     ComputeEngineCredentials credentials = ComputeEngineCredentials.create();
1872     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1873         .withCallCredentials(MoreCallCredentials.from(credentials));
1874     final SimpleRequest request = SimpleRequest.newBuilder()
1875         .setFillUsername(true)
1876         .setFillOauthScope(true)
1877         .setResponseSize(314159)
1878         .setPayload(Payload.newBuilder()
1879             .setBody(ByteString.copyFrom(new byte[271828])))
1880         .build();
1881 
1882     final SimpleResponse response = stub.unaryCall(request);
1883     assertEquals(serviceAccount, response.getUsername());
1884     assertFalse(response.getOauthScope().isEmpty());
1885     assertTrue("Received oauth scope: " + response.getOauthScope(),
1886         oauthScope.contains(response.getOauthScope()));
1887 
1888     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1889         .setOauthScope(response.getOauthScope())
1890         .setUsername(response.getUsername())
1891         .setPayload(Payload.newBuilder()
1892             .setBody(ByteString.copyFrom(new byte[314159])))
1893         .build();
1894     assertResponse(goldenResponse, response);
1895   }
1896 
1897   /** Sends an unary rpc with ComputeEngineChannelBuilder. */
computeEngineChannelCredentials( String defaultServiceAccount, TestServiceGrpc.TestServiceBlockingStub computeEngineStub)1898   public void computeEngineChannelCredentials(
1899       String defaultServiceAccount,
1900       TestServiceGrpc.TestServiceBlockingStub computeEngineStub) throws Exception {
1901     final SimpleRequest request = SimpleRequest.newBuilder()
1902         .setFillUsername(true)
1903         .setResponseSize(314159)
1904         .setPayload(Payload.newBuilder()
1905         .setBody(ByteString.copyFrom(new byte[271828])))
1906         .build();
1907     final SimpleResponse response = computeEngineStub.unaryCall(request);
1908     assertEquals(defaultServiceAccount, response.getUsername());
1909     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1910         .setUsername(defaultServiceAccount)
1911         .setPayload(Payload.newBuilder()
1912         .setBody(ByteString.copyFrom(new byte[314159])))
1913         .build();
1914     assertResponse(goldenResponse, response);
1915   }
1916 
1917   /** Test JWT-based auth. */
jwtTokenCreds(InputStream serviceAccountJson)1918   public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception {
1919     final SimpleRequest request = SimpleRequest.newBuilder()
1920         .setResponseSize(314159)
1921         .setPayload(Payload.newBuilder()
1922             .setBody(ByteString.copyFrom(new byte[271828])))
1923         .setFillUsername(true)
1924         .build();
1925 
1926     ServiceAccountCredentials credentials = (ServiceAccountCredentials)
1927         GoogleCredentials.fromStream(serviceAccountJson);
1928     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1929         .withCallCredentials(MoreCallCredentials.from(credentials));
1930     SimpleResponse response = stub.unaryCall(request);
1931     assertEquals(credentials.getClientEmail(), response.getUsername());
1932     assertEquals(314159, response.getPayload().getBody().size());
1933   }
1934 
1935   /** Sends a unary rpc with raw oauth2 access token credentials. */
oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)1936   public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)
1937       throws Exception {
1938     GoogleCredentials utilCredentials =
1939         GoogleCredentials.fromStream(credentialsStream);
1940     utilCredentials = utilCredentials.createScoped(Arrays.asList(authScope));
1941     AccessToken accessToken = utilCredentials.refreshAccessToken();
1942 
1943     OAuth2Credentials credentials = OAuth2Credentials.create(accessToken);
1944 
1945     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
1946         .withCallCredentials(MoreCallCredentials.from(credentials));
1947     final SimpleRequest request = SimpleRequest.newBuilder()
1948         .setFillUsername(true)
1949         .setFillOauthScope(true)
1950         .build();
1951 
1952     final SimpleResponse response = stub.unaryCall(request);
1953     assertFalse(response.getUsername().isEmpty());
1954     assertTrue("Received username: " + response.getUsername(),
1955         jsonKey.contains(response.getUsername()));
1956     assertFalse(response.getOauthScope().isEmpty());
1957     assertTrue("Received oauth scope: " + response.getOauthScope(),
1958         authScope.contains(response.getOauthScope()));
1959   }
1960 
1961   /** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */
perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)1962   public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)
1963       throws Exception {
1964     // In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once
1965     // for that purpose.
1966     // So, this test is identical to oauth2_auth_token test.
1967     oauth2AuthToken(jsonKey, credentialsStream, oauthScope);
1968   }
1969 
1970   /** Sends an unary rpc with "google default credentials". */
googleDefaultCredentials( String defaultServiceAccount, TestServiceGrpc.TestServiceBlockingStub googleDefaultStub)1971   public void googleDefaultCredentials(
1972       String defaultServiceAccount,
1973       TestServiceGrpc.TestServiceBlockingStub googleDefaultStub) throws Exception {
1974     final SimpleRequest request = SimpleRequest.newBuilder()
1975         .setFillUsername(true)
1976         .setResponseSize(314159)
1977         .setPayload(Payload.newBuilder()
1978             .setBody(ByteString.copyFrom(new byte[271828])))
1979         .build();
1980     final SimpleResponse response = googleDefaultStub.unaryCall(request);
1981     assertEquals(defaultServiceAccount, response.getUsername());
1982 
1983     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
1984         .setUsername(defaultServiceAccount)
1985         .setPayload(Payload.newBuilder()
1986             .setBody(ByteString.copyFrom(new byte[314159])))
1987         .build();
1988     assertResponse(goldenResponse, response);
1989   }
1990 
1991   private static class SoakIterationResult {
SoakIterationResult(long latencyMs, Status status)1992     public SoakIterationResult(long latencyMs, Status status) {
1993       this.latencyMs = latencyMs;
1994       this.status = status;
1995     }
1996 
getLatencyMs()1997     public long getLatencyMs() {
1998       return latencyMs;
1999     }
2000 
getStatus()2001     public Status getStatus() {
2002       return status;
2003     }
2004 
2005     private long latencyMs = -1;
2006     private Status status = Status.OK;
2007   }
2008 
performOneSoakIteration( TestServiceGrpc.TestServiceBlockingStub soakStub)2009   private SoakIterationResult performOneSoakIteration(
2010       TestServiceGrpc.TestServiceBlockingStub soakStub) throws Exception {
2011     long startNs = System.nanoTime();
2012     Status status = Status.OK;
2013     try {
2014       final SimpleRequest request =
2015           SimpleRequest.newBuilder()
2016               .setResponseSize(314159)
2017               .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
2018               .build();
2019       final SimpleResponse goldenResponse =
2020           SimpleResponse.newBuilder()
2021               .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
2022               .build();
2023       assertResponse(goldenResponse, soakStub.unaryCall(request));
2024     } catch (StatusRuntimeException e) {
2025       status = e.getStatus();
2026     }
2027     long elapsedNs = System.nanoTime() - startNs;
2028     return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status);
2029   }
2030 
2031   /**
2032     * Runs large unary RPCs in a loop with configurable failure thresholds
2033     * and channel creation behavior.
2034    */
performSoakTest( String serverUri, boolean resetChannelPerIteration, int soakIterations, int maxFailures, int maxAcceptablePerIterationLatencyMs, int minTimeMsBetweenRpcs, int overallTimeoutSeconds)2035   public void performSoakTest(
2036       String serverUri,
2037       boolean resetChannelPerIteration,
2038       int soakIterations,
2039       int maxFailures,
2040       int maxAcceptablePerIterationLatencyMs,
2041       int minTimeMsBetweenRpcs,
2042       int overallTimeoutSeconds)
2043       throws Exception {
2044     int iterationsDone = 0;
2045     int totalFailures = 0;
2046     Histogram latencies = new Histogram(4 /* number of significant value digits */);
2047     long startNs = System.nanoTime();
2048     ManagedChannel soakChannel = createChannel();
2049     TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc
2050         .newBlockingStub(soakChannel)
2051         .withInterceptors(recordClientCallInterceptor(clientCallCapture));
2052     for (int i = 0; i < soakIterations; i++) {
2053       if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
2054         break;
2055       }
2056       long earliestNextStartNs = System.nanoTime()
2057           + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
2058       if (resetChannelPerIteration) {
2059         soakChannel.shutdownNow();
2060         soakChannel.awaitTermination(10, TimeUnit.SECONDS);
2061         soakChannel = createChannel();
2062         soakStub = TestServiceGrpc
2063             .newBlockingStub(soakChannel)
2064             .withInterceptors(recordClientCallInterceptor(clientCallCapture));
2065       }
2066       SoakIterationResult result = performOneSoakIteration(soakStub);
2067       SocketAddress peer = clientCallCapture
2068           .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
2069       StringBuilder logStr = new StringBuilder(
2070           String.format(
2071               Locale.US,
2072               "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
2073               i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
2074       if (!result.getStatus().equals(Status.OK)) {
2075         totalFailures++;
2076         logStr.append(String.format(" failed: %s", result.getStatus()));
2077       } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
2078         totalFailures++;
2079         logStr.append(
2080             " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
2081       } else {
2082         logStr.append(" succeeded");
2083       }
2084       System.err.println(logStr.toString());
2085       iterationsDone++;
2086       latencies.recordValue(result.getLatencyMs());
2087       long remainingNs = earliestNextStartNs - System.nanoTime();
2088       if (remainingNs > 0) {
2089         TimeUnit.NANOSECONDS.sleep(remainingNs);
2090       }
2091     }
2092     soakChannel.shutdownNow();
2093     soakChannel.awaitTermination(10, TimeUnit.SECONDS);
2094     System.err.println(
2095         String.format(
2096             Locale.US,
2097             "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. "
2098                 + "p50: %d ms, p90: %d ms, p100: %d ms",
2099             serverUri,
2100             iterationsDone,
2101             soakIterations,
2102             totalFailures,
2103             latencies.getValueAtPercentile(50),
2104             latencies.getValueAtPercentile(90),
2105             latencies.getValueAtPercentile(100)));
2106     // check if we timed out
2107     String timeoutErrorMessage =
2108         String.format(
2109             Locale.US,
2110             "(server_uri: %s) soak test consumed all %d seconds of time and quit early, "
2111                 + "only having ran %d out of desired %d iterations.",
2112             serverUri,
2113             overallTimeoutSeconds,
2114             iterationsDone,
2115             soakIterations);
2116     assertEquals(timeoutErrorMessage, iterationsDone, soakIterations);
2117     // check if we had too many failures
2118     String tooManyFailuresErrorMessage =
2119         String.format(
2120             Locale.US,
2121             "(server_uri: %s) soak test total failures: %d exceeds max failures "
2122                 + "threshold: %d.",
2123             serverUri, totalFailures, maxFailures);
2124     assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
2125   }
2126 
assertSuccess(StreamRecorder<?> recorder)2127   protected static void assertSuccess(StreamRecorder<?> recorder) {
2128     if (recorder.getError() != null) {
2129       throw new AssertionError(recorder.getError());
2130     }
2131   }
2132 
2133   /** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()}. */
obtainRemoteClientAddr()2134   protected SocketAddress obtainRemoteClientAddr() {
2135     TestServiceGrpc.TestServiceBlockingStub stub =
2136         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
2137 
2138     stub.unaryCall(SimpleRequest.getDefaultInstance());
2139 
2140     return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
2141   }
2142 
2143   /** Helper for getting remote address from {@link io.grpc.ClientCall#getAttributes()}. */
obtainRemoteServerAddr()2144   protected SocketAddress obtainRemoteServerAddr() {
2145     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
2146         .withInterceptors(recordClientCallInterceptor(clientCallCapture))
2147         .withDeadlineAfter(5, TimeUnit.SECONDS);
2148 
2149     stub.unaryCall(SimpleRequest.getDefaultInstance());
2150 
2151     return clientCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
2152   }
2153 
2154   /** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()}. */
obtainLocalServerAddr()2155   protected SocketAddress obtainLocalServerAddr() {
2156     TestServiceGrpc.TestServiceBlockingStub stub =
2157         blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS);
2158 
2159     stub.unaryCall(SimpleRequest.getDefaultInstance());
2160 
2161     return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
2162   }
2163 
2164   /** Helper for getting local address from {@link io.grpc.ClientCall#getAttributes()}. */
obtainLocalClientAddr()2165   protected SocketAddress obtainLocalClientAddr() {
2166     TestServiceGrpc.TestServiceBlockingStub stub = blockingStub
2167         .withInterceptors(recordClientCallInterceptor(clientCallCapture))
2168         .withDeadlineAfter(5, TimeUnit.SECONDS);
2169 
2170     stub.unaryCall(SimpleRequest.getDefaultInstance());
2171 
2172     return clientCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
2173   }
2174 
2175   /** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()}. */
assertX500SubjectDn(String tlsInfo)2176   protected void assertX500SubjectDn(String tlsInfo) {
2177     TestServiceGrpc.TestServiceBlockingStub stub =
2178         blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS);
2179 
2180     stub.unaryCall(SimpleRequest.getDefaultInstance());
2181 
2182     List<Certificate> certificates;
2183     SSLSession sslSession =
2184         serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION);
2185     try {
2186       certificates = Arrays.asList(sslSession.getPeerCertificates());
2187     } catch (SSLPeerUnverifiedException e) {
2188       // Should never happen
2189       throw new AssertionError(e);
2190     }
2191 
2192     X509Certificate x509cert = (X509Certificate) certificates.get(0);
2193 
2194     assertEquals(1, certificates.size());
2195     assertEquals(tlsInfo, x509cert.getSubjectDN().toString());
2196   }
2197 
operationTimeoutMillis()2198   protected int operationTimeoutMillis() {
2199     return 5000;
2200   }
2201 
2202   /**
2203    * Some tests run on memory constrained environments.  Rather than OOM, just give up.  64 is
2204    * chosen as a maximum amount of memory a large test would need.
2205    */
assumeEnoughMemory()2206   protected static void assumeEnoughMemory() {
2207     Runtime r = Runtime.getRuntime();
2208     long usedMem = r.totalMemory() - r.freeMemory();
2209     long actuallyFreeMemory = r.maxMemory() - usedMem;
2210     Assume.assumeTrue(
2211         actuallyFreeMemory + " is not sufficient to run this test",
2212         actuallyFreeMemory >= 64 * 1024 * 1024);
2213   }
2214 
2215   /**
2216    * Poll the next metrics record and check it against the provided information, including the
2217    * message sizes.
2218    */
assertStatsTrace(String method, Status.Code status, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2219   private void assertStatsTrace(String method, Status.Code status,
2220       Collection<? extends MessageLite> requests,
2221       Collection<? extends MessageLite> responses) {
2222     assertClientStatsTrace(method, status, requests, responses);
2223     assertServerStatsTrace(method, status, requests, responses);
2224   }
2225 
2226   /**
2227    * Poll the next metrics record and check it against the provided information, without checking
2228    * the message sizes.
2229    */
assertStatsTrace(String method, Status.Code status)2230   private void assertStatsTrace(String method, Status.Code status) {
2231     assertStatsTrace(method, status, null, null);
2232   }
2233 
assertZeroRetryRecorded()2234   private void assertZeroRetryRecorded() {
2235     MetricsRecord retryRecord = clientStatsRecorder.pollRecord();
2236     assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0);
2237     assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0);
2238     assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D);
2239   }
2240 
assertClientStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2241   private void assertClientStatsTrace(String method, Status.Code code,
2242       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
2243     // Tracer-based stats
2244     TestClientStreamTracer tracer = clientStreamTracers.poll();
2245     assertNotNull(tracer);
2246     assertTrue(tracer.getOutboundHeaders());
2247     // assertClientStatsTrace() is called right after application receives status,
2248     // but streamClosed() may be called slightly later than that.  So we need a timeout.
2249     try {
2250       assertTrue(tracer.await(5, TimeUnit.SECONDS));
2251     } catch (InterruptedException e) {
2252       throw new AssertionError(e);
2253     }
2254     assertEquals(code, tracer.getStatus().getCode());
2255 
2256     if (requests != null && responses != null) {
2257       checkTracers(tracer, requests, responses);
2258     }
2259     if (metricsExpected()) {
2260       // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be
2261       // done before application receives status.
2262       MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord();
2263       checkStartTags(clientStartRecord, method, true);
2264       MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord();
2265       checkEndTags(clientEndRecord, method, code, true);
2266 
2267       if (requests != null && responses != null) {
2268         checkCensus(clientEndRecord, false, requests, responses);
2269       }
2270       assertZeroRetryRecorded();
2271     }
2272   }
2273 
assertClientStatsTrace(String method, Status.Code status)2274   private void assertClientStatsTrace(String method, Status.Code status) {
2275     assertClientStatsTrace(method, status, null, null);
2276   }
2277 
2278   @SuppressWarnings("AssertionFailureIgnored") // Failure is checked in the end by the passed flag.
assertServerStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2279   private void assertServerStatsTrace(String method, Status.Code code,
2280       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
2281     if (server == null) {
2282       // Server is not in the same process.  We can't check server-side stats.
2283       return;
2284     }
2285 
2286     if (metricsExpected()) {
2287       MetricsRecord serverStartRecord;
2288       MetricsRecord serverEndRecord;
2289       try {
2290         // On the server, the stats is finalized in ServerStreamListener.closed(), which can be
2291         // run after the client receives the final status.  So we use a timeout.
2292         serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
2293         serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS);
2294       } catch (InterruptedException e) {
2295         throw new RuntimeException(e);
2296       }
2297       assertNotNull(serverStartRecord);
2298       assertNotNull(serverEndRecord);
2299       checkStartTags(serverStartRecord, method, false);
2300       checkEndTags(serverEndRecord, method, code, false);
2301       if (requests != null && responses != null) {
2302         checkCensus(serverEndRecord, true, requests, responses);
2303       }
2304     }
2305 
2306     ServerStreamTracerInfo tracerInfo;
2307     tracerInfo = serverStreamTracers.poll();
2308     assertNotNull(tracerInfo);
2309     assertEquals(method, tracerInfo.fullMethodName);
2310     assertNotNull(tracerInfo.tracer.contextCapture);
2311     // On the server, streamClosed() may be called after the client receives the final status.
2312     // So we use a timeout.
2313     try {
2314       assertTrue(tracerInfo.tracer.await(1, TimeUnit.SECONDS));
2315     } catch (InterruptedException e) {
2316       throw new AssertionError(e);
2317     }
2318     assertEquals(code, tracerInfo.tracer.getStatus().getCode());
2319     if (requests != null && responses != null) {
2320       checkTracers(tracerInfo.tracer, responses, requests);
2321     }
2322   }
2323 
checkStartTags(MetricsRecord record, String methodName, boolean clientSide)2324   private static void checkStartTags(MetricsRecord record, String methodName, boolean clientSide) {
2325     assertNotNull("record is not null", record);
2326 
2327     TagKey methodNameTagKey = clientSide
2328         ? RpcMeasureConstants.GRPC_CLIENT_METHOD
2329         : RpcMeasureConstants.GRPC_SERVER_METHOD;
2330     TagValue methodNameTag = record.tags.get(methodNameTagKey);
2331     assertNotNull("method name tagged", methodNameTag);
2332     assertEquals("method names match", methodName, methodNameTag.asString());
2333   }
2334 
checkEndTags( MetricsRecord record, String methodName, Status.Code status, boolean clientSide)2335   private static void checkEndTags(
2336       MetricsRecord record, String methodName, Status.Code status, boolean clientSide) {
2337     assertNotNull("record is not null", record);
2338 
2339     TagKey methodNameTagKey = clientSide
2340         ? RpcMeasureConstants.GRPC_CLIENT_METHOD
2341         : RpcMeasureConstants.GRPC_SERVER_METHOD;
2342     TagValue methodNameTag = record.tags.get(methodNameTagKey);
2343     assertNotNull("method name tagged", methodNameTag);
2344     assertEquals("method names match", methodName, methodNameTag.asString());
2345 
2346     TagKey statusTagKey = clientSide
2347         ? RpcMeasureConstants.GRPC_CLIENT_STATUS
2348         : RpcMeasureConstants.GRPC_SERVER_STATUS;
2349     TagValue statusTag = record.tags.get(statusTagKey);
2350     assertNotNull("status tagged", statusTag);
2351     assertEquals(status.toString(), statusTag.asString());
2352   }
2353 
2354   /**
2355    * Check information recorded by tracers.
2356    */
checkTracers( TestStreamTracer tracer, Collection<? extends MessageLite> sentMessages, Collection<? extends MessageLite> receivedMessages)2357   private void checkTracers(
2358       TestStreamTracer tracer,
2359       Collection<? extends MessageLite> sentMessages,
2360       Collection<? extends MessageLite> receivedMessages) {
2361     long uncompressedSentSize = 0;
2362     int seqNo = 0;
2363     for (MessageLite msg : sentMessages) {
2364       assertThat(tracer.nextOutboundEvent())
2365           .isEqualTo(String.format(Locale.US, "outboundMessage(%d)", seqNo));
2366       assertThat(tracer.nextOutboundEvent()).matches(
2367           String.format(Locale.US, "outboundMessageSent\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
2368       seqNo++;
2369       uncompressedSentSize += msg.getSerializedSize();
2370     }
2371     assertNull(tracer.nextOutboundEvent());
2372     long uncompressedReceivedSize = 0;
2373     seqNo = 0;
2374     for (MessageLite msg : receivedMessages) {
2375       assertThat(tracer.nextInboundEvent())
2376           .isEqualTo(String.format(Locale.US, "inboundMessage(%d)", seqNo));
2377       assertThat(tracer.nextInboundEvent()).matches(
2378           String.format(Locale.US, "inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo));
2379       uncompressedReceivedSize += msg.getSerializedSize();
2380       seqNo++;
2381     }
2382     assertNull(tracer.nextInboundEvent());
2383     if (metricsExpected()) {
2384       assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize());
2385       assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize());
2386     }
2387   }
2388 
2389   /**
2390    * Check information recorded by Census.
2391    */
checkCensus(MetricsRecord record, boolean isServer, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2392   private void checkCensus(MetricsRecord record, boolean isServer,
2393       Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) {
2394     int uncompressedRequestsSize = 0;
2395     for (MessageLite request : requests) {
2396       uncompressedRequestsSize += request.getSerializedSize();
2397     }
2398     int uncompressedResponsesSize = 0;
2399     for (MessageLite response : responses) {
2400       uncompressedResponsesSize += response.getSerializedSize();
2401     }
2402     if (isServer) {
2403       assertEquals(
2404           requests.size(),
2405           record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_SERVER_RECEIVED_MESSAGES_PER_RPC));
2406       assertEquals(
2407           responses.size(),
2408           record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_SERVER_SENT_MESSAGES_PER_RPC));
2409       assertEquals(
2410           uncompressedRequestsSize,
2411           record.getMetricAsLongOrFail(
2412               DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES));
2413       assertEquals(
2414           uncompressedResponsesSize,
2415           record.getMetricAsLongOrFail(
2416               DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES));
2417       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_SERVER_LATENCY));
2418       // It's impossible to get the expected wire sizes because it may be compressed, so we just
2419       // check if they are recorded.
2420       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_RECEIVED_BYTES_PER_RPC));
2421       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_RPC));
2422     } else {
2423       assertEquals(
2424           requests.size(),
2425           record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_RPC));
2426       assertEquals(
2427           responses.size(),
2428           record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_RPC));
2429       assertEquals(
2430           uncompressedRequestsSize,
2431           record.getMetricAsLongOrFail(
2432               DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES));
2433       assertEquals(
2434           uncompressedResponsesSize,
2435           record.getMetricAsLongOrFail(
2436               DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES));
2437       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY));
2438       // It's impossible to get the expected wire sizes because it may be compressed, so we just
2439       // check if they are recorded.
2440       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC));
2441       assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_RPC));
2442     }
2443   }
2444 
2445   // Helper methods for responses containing Payload since proto equals does not ignore deprecated
2446   // fields (PayloadType).
assertResponses( Collection<StreamingOutputCallResponse> expected, Collection<StreamingOutputCallResponse> actual)2447   private void assertResponses(
2448       Collection<StreamingOutputCallResponse> expected,
2449       Collection<StreamingOutputCallResponse> actual) {
2450     assertSame(expected.size(), actual.size());
2451     Iterator<StreamingOutputCallResponse> expectedIter = expected.iterator();
2452     Iterator<StreamingOutputCallResponse> actualIter = actual.iterator();
2453 
2454     while (expectedIter.hasNext()) {
2455       assertResponse(expectedIter.next(), actualIter.next());
2456     }
2457   }
2458 
assertResponse( StreamingOutputCallResponse expected, StreamingOutputCallResponse actual)2459   private void assertResponse(
2460       StreamingOutputCallResponse expected, StreamingOutputCallResponse actual) {
2461     if (expected == null || actual == null) {
2462       assertEquals(expected, actual);
2463     } else {
2464       assertPayload(expected.getPayload(), actual.getPayload());
2465     }
2466   }
2467 
assertResponse(SimpleResponse expected, SimpleResponse actual)2468   private void assertResponse(SimpleResponse expected, SimpleResponse actual) {
2469     assertPayload(expected.getPayload(), actual.getPayload());
2470     assertEquals(expected.getUsername(), actual.getUsername());
2471     assertEquals(expected.getOauthScope(), actual.getOauthScope());
2472   }
2473 
assertPayload(Payload expected, Payload actual)2474   private void assertPayload(Payload expected, Payload actual) {
2475     // Compare non deprecated fields in Payload, to make this test forward compatible.
2476     if (expected == null || actual == null) {
2477       assertEquals(expected, actual);
2478     } else {
2479       assertEquals(expected.getBody(), actual.getBody());
2480     }
2481   }
2482 
2483   /**
2484    * Captures the request attributes. Useful for testing ServerCalls.
2485    * {@link ServerCall#getAttributes()}
2486    */
recordServerCallInterceptor( final AtomicReference<ServerCall<?, ?>> serverCallCapture)2487   private static ServerInterceptor recordServerCallInterceptor(
2488       final AtomicReference<ServerCall<?, ?>> serverCallCapture) {
2489     return new ServerInterceptor() {
2490       @Override
2491       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
2492           ServerCall<ReqT, RespT> call,
2493           Metadata requestHeaders,
2494           ServerCallHandler<ReqT, RespT> next) {
2495         serverCallCapture.set(call);
2496         return next.startCall(call, requestHeaders);
2497       }
2498     };
2499   }
2500 
2501   /**
2502    * Captures the request attributes. Useful for testing ClientCalls.
2503    * {@link ClientCall#getAttributes()}
2504    */
2505   private static ClientInterceptor recordClientCallInterceptor(
2506       final AtomicReference<ClientCall<?, ?>> clientCallCapture) {
2507     return new ClientInterceptor() {
2508       @Override
2509       public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
2510           MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
2511         ClientCall<ReqT, RespT> clientCall = next.newCall(method,callOptions);
2512         clientCallCapture.set(clientCall);
2513         return clientCall;
2514       }
2515     };
2516   }
2517 
2518   private static ServerInterceptor recordContextInterceptor(
2519       final AtomicReference<Context> contextCapture) {
2520     return new ServerInterceptor() {
2521       @Override
2522       public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
2523           ServerCall<ReqT, RespT> call,
2524           Metadata requestHeaders,
2525           ServerCallHandler<ReqT, RespT> next) {
2526         contextCapture.set(Context.current());
2527         return next.startCall(call, requestHeaders);
2528       }
2529     };
2530   }
2531 
2532   /**
2533    * A marshaller that record input and output sizes.
2534    */
2535   private static final class ByteSizeMarshaller<T> implements MethodDescriptor.Marshaller<T> {
2536 
2537     private final MethodDescriptor.Marshaller<T> delegate;
2538     volatile int lastOutSize;
2539     volatile int lastInSize;
2540 
2541     ByteSizeMarshaller(MethodDescriptor.Marshaller<T> delegate) {
2542       this.delegate = delegate;
2543     }
2544 
2545     @Override
2546     public InputStream stream(T value) {
2547       InputStream is = delegate.stream(value);
2548       ByteArrayOutputStream baos = new ByteArrayOutputStream();
2549       try {
2550         lastOutSize = (int) ByteStreams.copy(is, baos);
2551       } catch (IOException e) {
2552         throw new RuntimeException(e);
2553       }
2554       return new ByteArrayInputStream(baos.toByteArray());
2555     }
2556 
2557     @Override
2558     public T parse(InputStream stream) {
2559       ByteArrayOutputStream baos = new ByteArrayOutputStream();
2560       try {
2561         lastInSize = (int) ByteStreams.copy(stream, baos);
2562       } catch (IOException e) {
2563         throw new RuntimeException(e);
2564       }
2565       return delegate.parse(new ByteArrayInputStream(baos.toByteArray()));
2566     }
2567   }
2568 }
2569