1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static com.google.common.truth.Truth.assertWithMessage; 21 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNotEquals; 25 import static org.junit.Assert.assertNotNull; 26 import static org.junit.Assert.assertNull; 27 import static org.junit.Assert.assertSame; 28 import static org.junit.Assert.assertTrue; 29 import static org.junit.Assert.fail; 30 31 import com.google.auth.oauth2.AccessToken; 32 import com.google.auth.oauth2.ComputeEngineCredentials; 33 import com.google.auth.oauth2.GoogleCredentials; 34 import com.google.auth.oauth2.OAuth2Credentials; 35 import com.google.auth.oauth2.ServiceAccountCredentials; 36 import com.google.common.annotations.VisibleForTesting; 37 import com.google.common.base.Throwables; 38 import com.google.common.collect.ImmutableList; 39 import com.google.common.io.ByteStreams; 40 import com.google.common.util.concurrent.SettableFuture; 41 import com.google.protobuf.ByteString; 42 import com.google.protobuf.MessageLite; 43 import com.google.protobuf.StringValue; 44 import io.grpc.CallOptions; 45 import io.grpc.Channel; 46 import io.grpc.ClientCall; 47 import io.grpc.ClientInterceptor; 48 import io.grpc.ClientInterceptors; 49 import io.grpc.ClientStreamTracer; 50 import io.grpc.Context; 51 import io.grpc.Grpc; 52 import io.grpc.ManagedChannel; 53 import io.grpc.ManagedChannelBuilder; 54 import io.grpc.Metadata; 55 import io.grpc.MethodDescriptor; 56 import io.grpc.Server; 57 import io.grpc.ServerBuilder; 58 import io.grpc.ServerCall; 59 import io.grpc.ServerCallHandler; 60 import io.grpc.ServerInterceptor; 61 import io.grpc.ServerInterceptors; 62 import io.grpc.ServerStreamTracer; 63 import io.grpc.Status; 64 import io.grpc.StatusRuntimeException; 65 import io.grpc.auth.MoreCallCredentials; 66 import io.grpc.census.InternalCensusStatsAccessor; 67 import io.grpc.census.internal.DeprecatedCensusConstants; 68 import io.grpc.internal.GrpcUtil; 69 import io.grpc.internal.testing.StatsTestUtils; 70 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; 71 import io.grpc.internal.testing.StatsTestUtils.FakeTagContext; 72 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; 73 import io.grpc.internal.testing.StatsTestUtils.FakeTagger; 74 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; 75 import io.grpc.internal.testing.StreamRecorder; 76 import io.grpc.internal.testing.TestClientStreamTracer; 77 import io.grpc.internal.testing.TestServerStreamTracer; 78 import io.grpc.internal.testing.TestStreamTracer; 79 import io.grpc.stub.ClientCallStreamObserver; 80 import io.grpc.stub.ClientCalls; 81 import io.grpc.stub.MetadataUtils; 82 import io.grpc.stub.StreamObserver; 83 import io.grpc.testing.TestUtils; 84 import io.grpc.testing.integration.EmptyProtos.Empty; 85 import io.grpc.testing.integration.Messages.BoolValue; 86 import io.grpc.testing.integration.Messages.EchoStatus; 87 import io.grpc.testing.integration.Messages.Payload; 88 import io.grpc.testing.integration.Messages.ResponseParameters; 89 import io.grpc.testing.integration.Messages.SimpleRequest; 90 import io.grpc.testing.integration.Messages.SimpleResponse; 91 import io.grpc.testing.integration.Messages.StreamingInputCallRequest; 92 import io.grpc.testing.integration.Messages.StreamingInputCallResponse; 93 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 94 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 95 import io.grpc.testing.integration.Messages.TestOrcaReport; 96 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; 97 import io.opencensus.stats.Measure; 98 import io.opencensus.stats.Measure.MeasureDouble; 99 import io.opencensus.stats.Measure.MeasureLong; 100 import io.opencensus.tags.TagKey; 101 import io.opencensus.tags.TagValue; 102 import io.opencensus.trace.Span; 103 import io.opencensus.trace.SpanContext; 104 import io.opencensus.trace.Tracing; 105 import java.io.ByteArrayInputStream; 106 import java.io.ByteArrayOutputStream; 107 import java.io.IOException; 108 import java.io.InputStream; 109 import java.net.SocketAddress; 110 import java.security.cert.Certificate; 111 import java.security.cert.X509Certificate; 112 import java.util.ArrayList; 113 import java.util.Arrays; 114 import java.util.Collection; 115 import java.util.Collections; 116 import java.util.Iterator; 117 import java.util.List; 118 import java.util.Locale; 119 import java.util.Map; 120 import java.util.concurrent.ArrayBlockingQueue; 121 import java.util.concurrent.BlockingQueue; 122 import java.util.concurrent.Executors; 123 import java.util.concurrent.LinkedBlockingQueue; 124 import java.util.concurrent.ScheduledExecutorService; 125 import java.util.concurrent.TimeUnit; 126 import java.util.concurrent.atomic.AtomicReference; 127 import java.util.logging.Level; 128 import java.util.logging.Logger; 129 import java.util.regex.Pattern; 130 import javax.annotation.Nullable; 131 import javax.net.ssl.SSLPeerUnverifiedException; 132 import javax.net.ssl.SSLSession; 133 import org.HdrHistogram.Histogram; 134 import org.junit.After; 135 import org.junit.Assert; 136 import org.junit.Assume; 137 import org.junit.Before; 138 import org.junit.Rule; 139 import org.junit.Test; 140 import org.junit.rules.DisableOnDebug; 141 import org.junit.rules.TestRule; 142 import org.junit.rules.Timeout; 143 144 /** 145 * Abstract base class for all GRPC transport tests. 146 * 147 * <p> New tests should avoid using Mockito to support running on AppEngine.</p> 148 */ 149 public abstract class AbstractInteropTest { 150 private static Logger logger = Logger.getLogger(AbstractInteropTest.class.getName()); 151 152 @Rule public final TestRule globalTimeout; 153 154 /** Must be at least {@link #unaryPayloadLength()}, plus some to account for encoding overhead. */ 155 public static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024; 156 157 /** 158 * Use a small flow control to help detect flow control bugs. Don't use 64KiB to test 159 * SETTINGS/WINDOW_UPDATE exchange. 160 */ 161 public static final int TEST_FLOW_CONTROL_WINDOW = 65 * 1024; 162 private static final MeasureLong RETRIES_PER_CALL = 163 Measure.MeasureLong.create( 164 "grpc.io/client/retries_per_call", "Number of retries per call", "1"); 165 private static final MeasureLong TRANSPARENT_RETRIES_PER_CALL = 166 Measure.MeasureLong.create( 167 "grpc.io/client/transparent_retries_per_call", "Transparent retries per call", "1"); 168 private static final MeasureDouble RETRY_DELAY_PER_CALL = 169 Measure.MeasureDouble.create( 170 "grpc.io/client/retry_delay_per_call", "Retry delay per call", "ms"); 171 172 private static final FakeTagger tagger = new FakeTagger(); 173 private static final FakeTagContextBinarySerializer tagContextBinarySerializer = 174 new FakeTagContextBinarySerializer(); 175 176 private final AtomicReference<ServerCall<?, ?>> serverCallCapture = 177 new AtomicReference<>(); 178 private final AtomicReference<ClientCall<?, ?>> clientCallCapture = 179 new AtomicReference<>(); 180 private final AtomicReference<Metadata> requestHeadersCapture = 181 new AtomicReference<>(); 182 private final AtomicReference<Context> contextCapture = 183 new AtomicReference<>(); 184 private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); 185 private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder(); 186 187 private ScheduledExecutorService testServiceExecutor; 188 private Server server; 189 private Server handshakerServer; 190 191 private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers = 192 new LinkedBlockingQueue<>(); 193 194 static final CallOptions.Key<AtomicReference<TestOrcaReport>> 195 ORCA_RPC_REPORT_KEY = CallOptions.Key.create("orca-rpc-report"); 196 static final CallOptions.Key<AtomicReference<TestOrcaReport>> 197 ORCA_OOB_REPORT_KEY = CallOptions.Key.create("orca-oob-report"); 198 199 private static final class ServerStreamTracerInfo { 200 final String fullMethodName; 201 final InteropServerStreamTracer tracer; 202 ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer)203 ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer) { 204 this.fullMethodName = fullMethodName; 205 this.tracer = tracer; 206 } 207 208 private static final class InteropServerStreamTracer extends TestServerStreamTracer { 209 private volatile Context contextCapture; 210 211 @Override filterContext(Context context)212 public Context filterContext(Context context) { 213 contextCapture = context; 214 return super.filterContext(context); 215 } 216 } 217 } 218 219 /** 220 * Constructor for tests. 221 */ AbstractInteropTest()222 protected AbstractInteropTest() { 223 TestRule timeout = Timeout.seconds(60); 224 try { 225 timeout = new DisableOnDebug(timeout); 226 } catch (Throwable t) { 227 // This can happen on Android, which lacks some standard Java class. 228 // Seen at https://github.com/grpc/grpc-java/pull/5832#issuecomment-499698086 229 logger.log(Level.FINE, "Debugging not disabled.", t); 230 } 231 globalTimeout = timeout; 232 } 233 234 private final ServerStreamTracer.Factory serverStreamTracerFactory = 235 new ServerStreamTracer.Factory() { 236 @Override 237 public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { 238 ServerStreamTracerInfo.InteropServerStreamTracer tracer 239 = new ServerStreamTracerInfo.InteropServerStreamTracer(); 240 serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer)); 241 return tracer; 242 } 243 }; 244 245 protected static final Empty EMPTY = Empty.getDefaultInstance(); 246 configBuilder(@ullable ServerBuilder<?> builder)247 private void configBuilder(@Nullable ServerBuilder<?> builder) { 248 if (builder == null) { 249 return; 250 } 251 testServiceExecutor = Executors.newScheduledThreadPool(2); 252 253 List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder() 254 .add(recordServerCallInterceptor(serverCallCapture)) 255 .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture)) 256 .add(recordContextInterceptor(contextCapture)) 257 .addAll(TestServiceImpl.interceptors()) 258 .build(); 259 260 builder 261 .addService( 262 ServerInterceptors.intercept( 263 new TestServiceImpl(testServiceExecutor), 264 allInterceptors)) 265 .addStreamTracerFactory(serverStreamTracerFactory); 266 } 267 startServer(@ullable ServerBuilder<?> builder)268 protected void startServer(@Nullable ServerBuilder<?> builder) { 269 maybeStartHandshakerServer(); 270 if (builder == null) { 271 server = null; 272 return; 273 } 274 275 try { 276 server = builder.build().start(); 277 } catch (IOException ex) { 278 throw new RuntimeException(ex); 279 } 280 } 281 maybeStartHandshakerServer()282 private void maybeStartHandshakerServer() { 283 ServerBuilder<?> handshakerServerBuilder = getHandshakerServerBuilder(); 284 if (handshakerServerBuilder != null) { 285 try { 286 handshakerServer = handshakerServerBuilder.build().start(); 287 } catch (IOException ex) { 288 throw new RuntimeException(ex); 289 } 290 } 291 } 292 stopServer()293 private void stopServer() { 294 if (server != null) { 295 server.shutdownNow(); 296 } 297 if (testServiceExecutor != null) { 298 testServiceExecutor.shutdown(); 299 } 300 if (handshakerServer != null) { 301 handshakerServer.shutdownNow(); 302 } 303 } 304 305 @VisibleForTesting getListenAddress()306 final SocketAddress getListenAddress() { 307 return server.getListenSockets().iterator().next(); 308 } 309 310 protected ManagedChannel channel; 311 protected TestServiceGrpc.TestServiceBlockingStub blockingStub; 312 protected TestServiceGrpc.TestServiceStub asyncStub; 313 314 private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers = 315 new LinkedBlockingQueue<>(); 316 317 private final ClientStreamTracer.Factory clientStreamTracerFactory = 318 new ClientStreamTracer.Factory() { 319 @Override 320 public ClientStreamTracer newClientStreamTracer( 321 ClientStreamTracer.StreamInfo info, Metadata headers) { 322 TestClientStreamTracer tracer = new TestClientStreamTracer(); 323 clientStreamTracers.add(tracer); 324 return tracer; 325 } 326 }; 327 private final ClientInterceptor tracerSetupInterceptor = new ClientInterceptor() { 328 @Override 329 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 330 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 331 return next.newCall( 332 method, callOptions.withStreamTracerFactory(clientStreamTracerFactory)); 333 } 334 }; 335 336 /** 337 * Must be called by the subclass setup method if overridden. 338 */ 339 @Before setUp()340 public void setUp() { 341 ServerBuilder<?> serverBuilder = getServerBuilder(); 342 configBuilder(serverBuilder); 343 startServer(serverBuilder); 344 channel = createChannel(); 345 346 blockingStub = 347 TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); 348 asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor); 349 350 ClientInterceptor[] additionalInterceptors = getAdditionalInterceptors(); 351 if (additionalInterceptors != null) { 352 blockingStub = blockingStub.withInterceptors(additionalInterceptors); 353 asyncStub = asyncStub.withInterceptors(additionalInterceptors); 354 } 355 356 requestHeadersCapture.set(null); 357 } 358 359 /** Clean up. */ 360 @After tearDown()361 public void tearDown() { 362 if (channel != null) { 363 channel.shutdownNow(); 364 try { 365 channel.awaitTermination(1, TimeUnit.SECONDS); 366 } catch (InterruptedException ie) { 367 logger.log(Level.FINE, "Interrupted while waiting for channel termination", ie); 368 // Best effort. If there is an interruption, we want to continue cleaning up, but quickly 369 Thread.currentThread().interrupt(); 370 } 371 } 372 stopServer(); 373 } 374 createChannel()375 protected ManagedChannel createChannel() { 376 return createChannelBuilder().build(); 377 } 378 createChannelBuilder()379 protected abstract ManagedChannelBuilder<?> createChannelBuilder(); 380 381 @Nullable getAdditionalInterceptors()382 protected ClientInterceptor[] getAdditionalInterceptors() { 383 return null; 384 } 385 386 /** 387 * Returns the server builder used to create server for each test run. Return {@code null} if 388 * it shouldn't start a server in the same process. 389 */ 390 @Nullable getServerBuilder()391 protected ServerBuilder<?> getServerBuilder() { 392 return null; 393 } 394 395 @Nullable getHandshakerServerBuilder()396 protected ServerBuilder<?> getHandshakerServerBuilder() { 397 return null; 398 } 399 createCensusStatsClientInterceptor()400 protected final ClientInterceptor createCensusStatsClientInterceptor() { 401 return 402 InternalCensusStatsAccessor 403 .getClientInterceptor( 404 tagger, tagContextBinarySerializer, clientStatsRecorder, 405 GrpcUtil.STOPWATCH_SUPPLIER, 406 true, true, true, 407 /* recordRealTimeMetrics= */ false, 408 /* recordRetryMetrics= */ true); 409 } 410 createCustomCensusTracerFactory()411 protected final ServerStreamTracer.Factory createCustomCensusTracerFactory() { 412 return InternalCensusStatsAccessor.getServerStreamTracerFactory( 413 tagger, tagContextBinarySerializer, serverStatsRecorder, 414 GrpcUtil.STOPWATCH_SUPPLIER, 415 true, true, true, false /* real-time metrics */); 416 } 417 418 /** 419 * Override this when custom census module presence is different from {@link #metricsExpected()}. 420 */ customCensusModulePresent()421 protected boolean customCensusModulePresent() { 422 return metricsExpected(); 423 } 424 425 /** 426 * Return true if exact metric values should be checked. 427 */ metricsExpected()428 protected boolean metricsExpected() { 429 return true; 430 } 431 432 @Test emptyUnary()433 public void emptyUnary() throws Exception { 434 assertEquals(EMPTY, blockingStub.emptyCall(EMPTY)); 435 } 436 437 @Test emptyUnaryWithRetriableStream()438 public void emptyUnaryWithRetriableStream() throws Exception { 439 channel.shutdown(); 440 channel = createChannelBuilder().enableRetry().build(); 441 assertEquals(EMPTY, TestServiceGrpc.newBlockingStub(channel).emptyCall(EMPTY)); 442 } 443 444 /** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */ cacheableUnary()445 public void cacheableUnary() { 446 // THIS TEST IS BROKEN. Enabling safe just on the MethodDescriptor does nothing by itself. This 447 // test would need to enable GET on the channel. 448 // Set safe to true. 449 MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod = 450 TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build(); 451 // Set fake user IP since some proxies (GFE) won't cache requests from localhost. 452 Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER); 453 Metadata metadata = new Metadata(); 454 metadata.put(userIpKey, "1.2.3.4"); 455 Channel channelWithUserIpKey = 456 ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata)); 457 SimpleRequest requests1And2 = 458 SimpleRequest.newBuilder() 459 .setPayload( 460 Payload.newBuilder() 461 .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime())))) 462 .build(); 463 SimpleRequest request3 = 464 SimpleRequest.newBuilder() 465 .setPayload( 466 Payload.newBuilder() 467 .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime())))) 468 .build(); 469 470 SimpleResponse response1 = 471 ClientCalls.blockingUnaryCall( 472 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2); 473 SimpleResponse response2 = 474 ClientCalls.blockingUnaryCall( 475 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2); 476 SimpleResponse response3 = 477 ClientCalls.blockingUnaryCall( 478 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3); 479 480 assertEquals(response1, response2); 481 assertNotEquals(response1, response3); 482 // THIS TEST IS BROKEN. See comment at start of method. 483 } 484 485 @Test largeUnary()486 public void largeUnary() throws Exception { 487 assumeEnoughMemory(); 488 final SimpleRequest request = SimpleRequest.newBuilder() 489 .setResponseSize(314159) 490 .setPayload(Payload.newBuilder() 491 .setBody(ByteString.copyFrom(new byte[271828]))) 492 .build(); 493 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 494 .setPayload(Payload.newBuilder() 495 .setBody(ByteString.copyFrom(new byte[314159]))) 496 .build(); 497 498 assertResponse(goldenResponse, blockingStub.unaryCall(request)); 499 500 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK, 501 Collections.singleton(request), Collections.singleton(goldenResponse)); 502 } 503 504 /** 505 * Tests client per-message compression for unary calls. The Java API does not support inspecting 506 * a message's compression level, so this is primarily intended to run against a gRPC C++ server. 507 */ clientCompressedUnary(boolean probe)508 public void clientCompressedUnary(boolean probe) throws Exception { 509 assumeEnoughMemory(); 510 final SimpleRequest expectCompressedRequest = 511 SimpleRequest.newBuilder() 512 .setExpectCompressed(BoolValue.newBuilder().setValue(true)) 513 .setResponseSize(314159) 514 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 515 .build(); 516 final SimpleRequest expectUncompressedRequest = 517 SimpleRequest.newBuilder() 518 .setExpectCompressed(BoolValue.newBuilder().setValue(false)) 519 .setResponseSize(314159) 520 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 521 .build(); 522 final SimpleResponse goldenResponse = 523 SimpleResponse.newBuilder() 524 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) 525 .build(); 526 527 if (probe) { 528 // Send a non-compressed message with expectCompress=true. Servers supporting this test case 529 // should return INVALID_ARGUMENT. 530 try { 531 blockingStub.unaryCall(expectCompressedRequest); 532 fail("expected INVALID_ARGUMENT"); 533 } catch (StatusRuntimeException e) { 534 assertEquals(Status.INVALID_ARGUMENT.getCode(), e.getStatus().getCode()); 535 } 536 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT); 537 } 538 539 assertResponse( 540 goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest)); 541 assertStatsTrace( 542 "grpc.testing.TestService/UnaryCall", 543 Status.Code.OK, 544 Collections.singleton(expectCompressedRequest), 545 Collections.singleton(goldenResponse)); 546 547 assertResponse(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest)); 548 assertStatsTrace( 549 "grpc.testing.TestService/UnaryCall", 550 Status.Code.OK, 551 Collections.singleton(expectUncompressedRequest), 552 Collections.singleton(goldenResponse)); 553 } 554 555 /** 556 * Tests if the server can send a compressed unary response. Ideally we would assert that the 557 * responses have the requested compression, but this is not supported by the API. Given a 558 * compliant server, this test will exercise the code path for receiving a compressed response but 559 * cannot itself verify that the response was compressed. 560 */ 561 @Test serverCompressedUnary()562 public void serverCompressedUnary() throws Exception { 563 assumeEnoughMemory(); 564 final SimpleRequest responseShouldBeCompressed = 565 SimpleRequest.newBuilder() 566 .setResponseCompressed(BoolValue.newBuilder().setValue(true)) 567 .setResponseSize(314159) 568 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 569 .build(); 570 final SimpleRequest responseShouldBeUncompressed = 571 SimpleRequest.newBuilder() 572 .setResponseCompressed(BoolValue.newBuilder().setValue(false)) 573 .setResponseSize(314159) 574 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 575 .build(); 576 final SimpleResponse goldenResponse = 577 SimpleResponse.newBuilder() 578 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) 579 .build(); 580 581 assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed)); 582 assertStatsTrace( 583 "grpc.testing.TestService/UnaryCall", 584 Status.Code.OK, 585 Collections.singleton(responseShouldBeCompressed), 586 Collections.singleton(goldenResponse)); 587 588 assertResponse(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed)); 589 assertStatsTrace( 590 "grpc.testing.TestService/UnaryCall", 591 Status.Code.OK, 592 Collections.singleton(responseShouldBeUncompressed), 593 Collections.singleton(goldenResponse)); 594 } 595 596 /** 597 * Assuming "pick_first" policy is used, tests that all requests are sent to the same server. 598 */ pickFirstUnary()599 public void pickFirstUnary() throws Exception { 600 SimpleRequest request = SimpleRequest.newBuilder() 601 .setResponseSize(1) 602 .setFillServerId(true) 603 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[1]))) 604 .build(); 605 606 SimpleResponse firstResponse = blockingStub.unaryCall(request); 607 // Increase the chance of all servers are connected, in case the channel should be doing 608 // round_robin instead. 609 Thread.sleep(5000); 610 for (int i = 0; i < 100; i++) { 611 SimpleResponse response = blockingStub.unaryCall(request); 612 assertThat(response.getServerId()).isEqualTo(firstResponse.getServerId()); 613 } 614 } 615 616 @Test serverStreaming()617 public void serverStreaming() throws Exception { 618 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 619 .addResponseParameters(ResponseParameters.newBuilder() 620 .setSize(31415)) 621 .addResponseParameters(ResponseParameters.newBuilder() 622 .setSize(9)) 623 .addResponseParameters(ResponseParameters.newBuilder() 624 .setSize(2653)) 625 .addResponseParameters(ResponseParameters.newBuilder() 626 .setSize(58979)) 627 .build(); 628 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 629 StreamingOutputCallResponse.newBuilder() 630 .setPayload(Payload.newBuilder() 631 .setBody(ByteString.copyFrom(new byte[31415]))) 632 .build(), 633 StreamingOutputCallResponse.newBuilder() 634 .setPayload(Payload.newBuilder() 635 .setBody(ByteString.copyFrom(new byte[9]))) 636 .build(), 637 StreamingOutputCallResponse.newBuilder() 638 .setPayload(Payload.newBuilder() 639 .setBody(ByteString.copyFrom(new byte[2653]))) 640 .build(), 641 StreamingOutputCallResponse.newBuilder() 642 .setPayload(Payload.newBuilder() 643 .setBody(ByteString.copyFrom(new byte[58979]))) 644 .build()); 645 646 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 647 asyncStub.streamingOutputCall(request, recorder); 648 recorder.awaitCompletion(); 649 assertSuccess(recorder); 650 assertResponses(goldenResponses, recorder.getValues()); 651 } 652 653 @Test clientStreaming()654 public void clientStreaming() throws Exception { 655 final List<StreamingInputCallRequest> requests = Arrays.asList( 656 StreamingInputCallRequest.newBuilder() 657 .setPayload(Payload.newBuilder() 658 .setBody(ByteString.copyFrom(new byte[27182]))) 659 .build(), 660 StreamingInputCallRequest.newBuilder() 661 .setPayload(Payload.newBuilder() 662 .setBody(ByteString.copyFrom(new byte[8]))) 663 .build(), 664 StreamingInputCallRequest.newBuilder() 665 .setPayload(Payload.newBuilder() 666 .setBody(ByteString.copyFrom(new byte[1828]))) 667 .build(), 668 StreamingInputCallRequest.newBuilder() 669 .setPayload(Payload.newBuilder() 670 .setBody(ByteString.copyFrom(new byte[45904]))) 671 .build()); 672 final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder() 673 .setAggregatedPayloadSize(74922) 674 .build(); 675 676 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 677 StreamObserver<StreamingInputCallRequest> requestObserver = 678 asyncStub.streamingInputCall(responseObserver); 679 for (StreamingInputCallRequest request : requests) { 680 requestObserver.onNext(request); 681 } 682 requestObserver.onCompleted(); 683 684 assertEquals(goldenResponse, responseObserver.firstValue().get()); 685 responseObserver.awaitCompletion(); 686 assertThat(responseObserver.getValues()).hasSize(1); 687 Throwable t = responseObserver.getError(); 688 if (t != null) { 689 throw new AssertionError(t); 690 } 691 } 692 693 /** 694 * Tests client per-message compression for streaming calls. The Java API does not support 695 * inspecting a message's compression level, so this is primarily intended to run against a gRPC 696 * C++ server. 697 */ clientCompressedStreaming(boolean probe)698 public void clientCompressedStreaming(boolean probe) throws Exception { 699 final StreamingInputCallRequest expectCompressedRequest = 700 StreamingInputCallRequest.newBuilder() 701 .setExpectCompressed(BoolValue.newBuilder().setValue(true)) 702 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) 703 .build(); 704 final StreamingInputCallRequest expectUncompressedRequest = 705 StreamingInputCallRequest.newBuilder() 706 .setExpectCompressed(BoolValue.newBuilder().setValue(false)) 707 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) 708 .build(); 709 final StreamingInputCallResponse goldenResponse = 710 StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build(); 711 712 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 713 StreamObserver<StreamingInputCallRequest> requestObserver = 714 asyncStub.streamingInputCall(responseObserver); 715 716 if (probe) { 717 // Send a non-compressed message with expectCompress=true. Servers supporting this test case 718 // should return INVALID_ARGUMENT. 719 requestObserver.onNext(expectCompressedRequest); 720 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 721 Throwable e = responseObserver.getError(); 722 assertNotNull("expected INVALID_ARGUMENT", e); 723 assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode()); 724 } 725 726 // Start a new stream 727 responseObserver = StreamRecorder.create(); 728 @SuppressWarnings("unchecked") 729 ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver = 730 (ClientCallStreamObserver) 731 asyncStub.withCompression("gzip").streamingInputCall(responseObserver); 732 clientCallStreamObserver.setMessageCompression(true); 733 clientCallStreamObserver.onNext(expectCompressedRequest); 734 clientCallStreamObserver.setMessageCompression(false); 735 clientCallStreamObserver.onNext(expectUncompressedRequest); 736 clientCallStreamObserver.onCompleted(); 737 responseObserver.awaitCompletion(); 738 assertSuccess(responseObserver); 739 assertEquals(goldenResponse, responseObserver.firstValue().get()); 740 } 741 742 /** 743 * Tests server per-message compression in a streaming response. Ideally we would assert that the 744 * responses have the requested compression, but this is not supported by the API. Given a 745 * compliant server, this test will exercise the code path for receiving a compressed response but 746 * cannot itself verify that the response was compressed. 747 */ serverCompressedStreaming()748 public void serverCompressedStreaming() throws Exception { 749 final StreamingOutputCallRequest request = 750 StreamingOutputCallRequest.newBuilder() 751 .addResponseParameters( 752 ResponseParameters.newBuilder() 753 .setCompressed(BoolValue.newBuilder().setValue(true)) 754 .setSize(31415)) 755 .addResponseParameters( 756 ResponseParameters.newBuilder() 757 .setCompressed(BoolValue.newBuilder().setValue(false)) 758 .setSize(92653)) 759 .build(); 760 final List<StreamingOutputCallResponse> goldenResponses = 761 Arrays.asList( 762 StreamingOutputCallResponse.newBuilder() 763 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) 764 .build(), 765 StreamingOutputCallResponse.newBuilder() 766 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653]))) 767 .build()); 768 769 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 770 asyncStub.streamingOutputCall(request, recorder); 771 recorder.awaitCompletion(); 772 assertSuccess(recorder); 773 assertResponses(goldenResponses, recorder.getValues()); 774 } 775 776 @Test pingPong()777 public void pingPong() throws Exception { 778 final List<StreamingOutputCallRequest> requests = Arrays.asList( 779 StreamingOutputCallRequest.newBuilder() 780 .addResponseParameters(ResponseParameters.newBuilder() 781 .setSize(31415)) 782 .setPayload(Payload.newBuilder() 783 .setBody(ByteString.copyFrom(new byte[27182]))) 784 .build(), 785 StreamingOutputCallRequest.newBuilder() 786 .addResponseParameters(ResponseParameters.newBuilder() 787 .setSize(9)) 788 .setPayload(Payload.newBuilder() 789 .setBody(ByteString.copyFrom(new byte[8]))) 790 .build(), 791 StreamingOutputCallRequest.newBuilder() 792 .addResponseParameters(ResponseParameters.newBuilder() 793 .setSize(2653)) 794 .setPayload(Payload.newBuilder() 795 .setBody(ByteString.copyFrom(new byte[1828]))) 796 .build(), 797 StreamingOutputCallRequest.newBuilder() 798 .addResponseParameters(ResponseParameters.newBuilder() 799 .setSize(58979)) 800 .setPayload(Payload.newBuilder() 801 .setBody(ByteString.copyFrom(new byte[45904]))) 802 .build()); 803 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 804 StreamingOutputCallResponse.newBuilder() 805 .setPayload(Payload.newBuilder() 806 .setBody(ByteString.copyFrom(new byte[31415]))) 807 .build(), 808 StreamingOutputCallResponse.newBuilder() 809 .setPayload(Payload.newBuilder() 810 .setBody(ByteString.copyFrom(new byte[9]))) 811 .build(), 812 StreamingOutputCallResponse.newBuilder() 813 .setPayload(Payload.newBuilder() 814 .setBody(ByteString.copyFrom(new byte[2653]))) 815 .build(), 816 StreamingOutputCallResponse.newBuilder() 817 .setPayload(Payload.newBuilder() 818 .setBody(ByteString.copyFrom(new byte[58979]))) 819 .build()); 820 821 final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(5); 822 StreamObserver<StreamingOutputCallRequest> requestObserver 823 = asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() { 824 @Override 825 public void onNext(StreamingOutputCallResponse response) { 826 queue.add(response); 827 } 828 829 @Override 830 public void onError(Throwable t) { 831 queue.add(t); 832 } 833 834 @Override 835 public void onCompleted() { 836 queue.add("Completed"); 837 } 838 }); 839 for (int i = 0; i < requests.size(); i++) { 840 assertNull(queue.peek()); 841 requestObserver.onNext(requests.get(i)); 842 Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 843 assertNotNull("Timed out waiting for response", actualResponse); 844 if (actualResponse instanceof Throwable) { 845 throw new AssertionError(actualResponse); 846 } 847 assertResponse(goldenResponses.get(i), (StreamingOutputCallResponse) actualResponse); 848 } 849 requestObserver.onCompleted(); 850 assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 851 } 852 853 @Test emptyStream()854 public void emptyStream() throws Exception { 855 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 856 StreamObserver<StreamingOutputCallRequest> requestObserver 857 = asyncStub.fullDuplexCall(responseObserver); 858 requestObserver.onCompleted(); 859 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 860 assertSuccess(responseObserver); 861 assertTrue("Expected an empty stream", responseObserver.getValues().isEmpty()); 862 } 863 864 @Test cancelAfterBegin()865 public void cancelAfterBegin() throws Exception { 866 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 867 StreamObserver<StreamingInputCallRequest> requestObserver = 868 asyncStub.streamingInputCall(responseObserver); 869 requestObserver.onError(new RuntimeException()); 870 responseObserver.awaitCompletion(); 871 assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues()); 872 assertEquals(Status.Code.CANCELLED, 873 Status.fromThrowable(responseObserver.getError()).getCode()); 874 875 if (metricsExpected()) { 876 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 877 checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall", true); 878 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 879 // recorded. The tracer stats rely on the stream being created, which is not always the case 880 // in this test. Therefore we don't check the tracer stats. 881 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 882 checkEndTags( 883 clientEndRecord, "grpc.testing.TestService/StreamingInputCall", 884 Status.CANCELLED.getCode(), true); 885 // Do not check server-side metrics, because the status on the server side is undetermined. 886 } 887 } 888 889 @Test cancelAfterFirstResponse()890 public void cancelAfterFirstResponse() throws Exception { 891 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 892 .addResponseParameters(ResponseParameters.newBuilder() 893 .setSize(31415)) 894 .setPayload(Payload.newBuilder() 895 .setBody(ByteString.copyFrom(new byte[27182]))) 896 .build(); 897 final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder() 898 .setPayload(Payload.newBuilder() 899 .setBody(ByteString.copyFrom(new byte[31415]))) 900 .build(); 901 902 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 903 StreamObserver<StreamingOutputCallRequest> requestObserver 904 = asyncStub.fullDuplexCall(responseObserver); 905 requestObserver.onNext(request); 906 assertResponse(goldenResponse, responseObserver.firstValue().get()); 907 requestObserver.onError(new RuntimeException()); 908 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 909 assertEquals(1, responseObserver.getValues().size()); 910 assertEquals(Status.Code.CANCELLED, 911 Status.fromThrowable(responseObserver.getError()).getCode()); 912 913 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED); 914 } 915 916 @Test fullDuplexCallShouldSucceed()917 public void fullDuplexCallShouldSucceed() throws Exception { 918 // Build the request. 919 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 920 StreamingOutputCallRequest.Builder streamingOutputBuilder = 921 StreamingOutputCallRequest.newBuilder(); 922 for (Integer size : responseSizes) { 923 streamingOutputBuilder.addResponseParameters( 924 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 925 } 926 final StreamingOutputCallRequest request = streamingOutputBuilder.build(); 927 928 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 929 StreamObserver<StreamingOutputCallRequest> requestStream = 930 asyncStub.fullDuplexCall(recorder); 931 932 final int numRequests = 10; 933 List<StreamingOutputCallRequest> requests = 934 new ArrayList<>(numRequests); 935 for (int ix = numRequests; ix > 0; --ix) { 936 requests.add(request); 937 requestStream.onNext(request); 938 } 939 requestStream.onCompleted(); 940 recorder.awaitCompletion(); 941 assertSuccess(recorder); 942 assertEquals(responseSizes.size() * (long) numRequests, recorder.getValues().size()); 943 for (int ix = 0; ix < recorder.getValues().size(); ++ix) { 944 StreamingOutputCallResponse response = recorder.getValues().get(ix); 945 int length = response.getPayload().getBody().size(); 946 int expectedSize = responseSizes.get(ix % responseSizes.size()); 947 assertEquals("comparison failed at index " + ix, expectedSize, length); 948 } 949 950 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests, 951 recorder.getValues()); 952 } 953 954 @Test halfDuplexCallShouldSucceed()955 public void halfDuplexCallShouldSucceed() throws Exception { 956 // Build the request. 957 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 958 StreamingOutputCallRequest.Builder streamingOutputBuilder = 959 StreamingOutputCallRequest.newBuilder(); 960 for (Integer size : responseSizes) { 961 streamingOutputBuilder.addResponseParameters( 962 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 963 } 964 final StreamingOutputCallRequest request = streamingOutputBuilder.build(); 965 966 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 967 StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder); 968 969 final int numRequests = 10; 970 for (int ix = numRequests; ix > 0; --ix) { 971 requestStream.onNext(request); 972 } 973 requestStream.onCompleted(); 974 recorder.awaitCompletion(); 975 assertSuccess(recorder); 976 assertEquals(responseSizes.size() * (long) numRequests, recorder.getValues().size()); 977 for (int ix = 0; ix < recorder.getValues().size(); ++ix) { 978 StreamingOutputCallResponse response = recorder.getValues().get(ix); 979 int length = response.getPayload().getBody().size(); 980 int expectedSize = responseSizes.get(ix % responseSizes.size()); 981 assertEquals("comparison failed at index " + ix, expectedSize, length); 982 } 983 } 984 985 @Test serverStreamingShouldBeFlowControlled()986 public void serverStreamingShouldBeFlowControlled() throws Exception { 987 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 988 .addResponseParameters(ResponseParameters.newBuilder().setSize(100000)) 989 .addResponseParameters(ResponseParameters.newBuilder().setSize(100001)) 990 .build(); 991 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 992 StreamingOutputCallResponse.newBuilder() 993 .setPayload(Payload.newBuilder() 994 .setBody(ByteString.copyFrom(new byte[100000]))).build(), 995 StreamingOutputCallResponse.newBuilder() 996 .setPayload(Payload.newBuilder() 997 .setBody(ByteString.copyFrom(new byte[100001]))).build()); 998 999 long start = System.nanoTime(); 1000 1001 final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<>(10); 1002 ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call = 1003 channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT); 1004 call.start(new ClientCall.Listener<StreamingOutputCallResponse>() { 1005 @Override 1006 public void onHeaders(Metadata headers) {} 1007 1008 @Override 1009 public void onMessage(final StreamingOutputCallResponse message) { 1010 queue.add(message); 1011 } 1012 1013 @Override 1014 public void onClose(Status status, Metadata trailers) { 1015 queue.add(status); 1016 } 1017 }, new Metadata()); 1018 call.sendMessage(request); 1019 call.halfClose(); 1020 1021 // Time how long it takes to get the first response. 1022 call.request(1); 1023 Object response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 1024 assertTrue(response instanceof StreamingOutputCallResponse); 1025 assertResponse(goldenResponses.get(0), (StreamingOutputCallResponse) response); 1026 long firstCallDuration = System.nanoTime() - start; 1027 1028 // Without giving additional flow control, make sure that we don't get another response. We wait 1029 // until we are comfortable the next message isn't coming. We may have very low nanoTime 1030 // resolution (like on Windows) or be using a testing, in-process transport where message 1031 // handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time 1032 // to at least 1ms. 1033 assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS)); 1034 1035 // Make sure that everything still completes. 1036 call.request(1); 1037 response = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 1038 assertTrue(response instanceof StreamingOutputCallResponse); 1039 assertResponse(goldenResponses.get(1), (StreamingOutputCallResponse) response); 1040 assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1041 } 1042 1043 @Test veryLargeRequest()1044 public void veryLargeRequest() throws Exception { 1045 assumeEnoughMemory(); 1046 final SimpleRequest request = SimpleRequest.newBuilder() 1047 .setPayload(Payload.newBuilder() 1048 .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()]))) 1049 .setResponseSize(10) 1050 .build(); 1051 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1052 .setPayload(Payload.newBuilder() 1053 .setBody(ByteString.copyFrom(new byte[10]))) 1054 .build(); 1055 1056 assertResponse(goldenResponse, blockingStub.unaryCall(request)); 1057 } 1058 1059 @Test veryLargeResponse()1060 public void veryLargeResponse() throws Exception { 1061 assumeEnoughMemory(); 1062 final SimpleRequest request = SimpleRequest.newBuilder() 1063 .setResponseSize(unaryPayloadLength()) 1064 .build(); 1065 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1066 .setPayload(Payload.newBuilder() 1067 .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()]))) 1068 .build(); 1069 1070 assertResponse(goldenResponse, blockingStub.unaryCall(request)); 1071 } 1072 1073 @Test exchangeMetadataUnaryCall()1074 public void exchangeMetadataUnaryCall() throws Exception { 1075 // Capture the metadata exchange 1076 Metadata fixedHeaders = new Metadata(); 1077 // Send a metadata proto 1078 StringValue metadataValue = StringValue.newBuilder().setValue("dog").build(); 1079 fixedHeaders.put(Util.METADATA_KEY, metadataValue); 1080 // .. and expect it to be echoed back in trailers 1081 AtomicReference<Metadata> trailersCapture = new AtomicReference<>(); 1082 AtomicReference<Metadata> headersCapture = new AtomicReference<>(); 1083 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub.withInterceptors( 1084 MetadataUtils.newAttachHeadersInterceptor(fixedHeaders), 1085 MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); 1086 1087 assertNotNull(stub.emptyCall(EMPTY)); 1088 1089 // Assert that our side channel object is echoed back in both headers and trailers 1090 Assert.assertEquals(metadataValue, headersCapture.get().get(Util.METADATA_KEY)); 1091 Assert.assertEquals(metadataValue, trailersCapture.get().get(Util.METADATA_KEY)); 1092 } 1093 1094 @Test exchangeMetadataStreamingCall()1095 public void exchangeMetadataStreamingCall() throws Exception { 1096 // Capture the metadata exchange 1097 Metadata fixedHeaders = new Metadata(); 1098 // Send a metadata proto 1099 StringValue metadataValue = StringValue.newBuilder().setValue("dog").build(); 1100 fixedHeaders.put(Util.METADATA_KEY, metadataValue); 1101 // .. and expect it to be echoed back in trailers 1102 AtomicReference<Metadata> trailersCapture = new AtomicReference<>(); 1103 AtomicReference<Metadata> headersCapture = new AtomicReference<>(); 1104 TestServiceGrpc.TestServiceStub stub = asyncStub.withInterceptors( 1105 MetadataUtils.newAttachHeadersInterceptor(fixedHeaders), 1106 MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); 1107 1108 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 1109 Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder = 1110 Messages.StreamingOutputCallRequest.newBuilder(); 1111 for (Integer size : responseSizes) { 1112 streamingOutputBuilder.addResponseParameters( 1113 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 1114 } 1115 final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build(); 1116 1117 StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1118 StreamObserver<Messages.StreamingOutputCallRequest> requestStream = 1119 stub.fullDuplexCall(recorder); 1120 1121 final int numRequests = 10; 1122 for (int ix = numRequests; ix > 0; --ix) { 1123 requestStream.onNext(request); 1124 } 1125 requestStream.onCompleted(); 1126 recorder.awaitCompletion(); 1127 assertSuccess(recorder); 1128 org.junit.Assert.assertEquals( 1129 responseSizes.size() * (long) numRequests, recorder.getValues().size()); 1130 1131 // Assert that our side channel object is echoed back in both headers and trailers 1132 Assert.assertEquals(metadataValue, headersCapture.get().get(Util.METADATA_KEY)); 1133 Assert.assertEquals(metadataValue, trailersCapture.get().get(Util.METADATA_KEY)); 1134 } 1135 1136 @Test sendsTimeoutHeader()1137 public void sendsTimeoutHeader() { 1138 Assume.assumeTrue("can not capture request headers on server side", server != null); 1139 long configuredTimeoutMinutes = 100; 1140 TestServiceGrpc.TestServiceBlockingStub stub = 1141 blockingStub.withDeadlineAfter(configuredTimeoutMinutes, TimeUnit.MINUTES); 1142 stub.emptyCall(EMPTY); 1143 long transferredTimeoutMinutes = TimeUnit.NANOSECONDS.toMinutes( 1144 requestHeadersCapture.get().get(GrpcUtil.TIMEOUT_KEY)); 1145 Assert.assertTrue( 1146 "configuredTimeoutMinutes=" + configuredTimeoutMinutes 1147 + ", transferredTimeoutMinutes=" + transferredTimeoutMinutes, 1148 configuredTimeoutMinutes - transferredTimeoutMinutes >= 0 1149 && configuredTimeoutMinutes - transferredTimeoutMinutes <= 1); 1150 } 1151 1152 @Test deadlineNotExceeded()1153 public void deadlineNotExceeded() { 1154 // warm up the channel and JVM 1155 blockingStub.emptyCall(Empty.getDefaultInstance()); 1156 blockingStub 1157 .withDeadlineAfter(10, TimeUnit.SECONDS) 1158 .streamingOutputCall(StreamingOutputCallRequest.newBuilder() 1159 .addResponseParameters(ResponseParameters.newBuilder() 1160 .setIntervalUs(0)) 1161 .build()).next(); 1162 } 1163 1164 @Test deadlineExceeded()1165 public void deadlineExceeded() throws Exception { 1166 // warm up the channel and JVM 1167 blockingStub.emptyCall(Empty.getDefaultInstance()); 1168 TestServiceGrpc.TestServiceBlockingStub stub = 1169 blockingStub.withDeadlineAfter(1, TimeUnit.SECONDS); 1170 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1171 .addResponseParameters(ResponseParameters.newBuilder() 1172 .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20))) 1173 .build(); 1174 try { 1175 stub.streamingOutputCall(request).next(); 1176 fail("Expected deadline to be exceeded"); 1177 } catch (StatusRuntimeException ex) { 1178 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode()); 1179 String desc = ex.getStatus().getDescription(); 1180 assertTrue(desc, 1181 // There is a race between client and server-side deadline expiration. 1182 // If client expires first, it'd generate this message 1183 Pattern.matches("deadline exceeded after .*s. \\[.*\\]", desc) 1184 // If server expires first, it'd reset the stream and client would generate a different 1185 // message 1186 || desc.startsWith("ClientCall was cancelled at or after deadline.")); 1187 } 1188 1189 assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); 1190 if (metricsExpected()) { 1191 // Stream may not have been created before deadline is exceeded, thus we don't test the tracer 1192 // stats. 1193 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1194 checkStartTags( 1195 clientStartRecord, "grpc.testing.TestService/StreamingOutputCall", true); 1196 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1197 checkEndTags( 1198 clientEndRecord, 1199 "grpc.testing.TestService/StreamingOutputCall", 1200 Status.Code.DEADLINE_EXCEEDED, true); 1201 // Do not check server-side metrics, because the status on the server side is undetermined. 1202 } 1203 } 1204 1205 @Test deadlineExceededServerStreaming()1206 public void deadlineExceededServerStreaming() throws Exception { 1207 // warm up the channel and JVM 1208 blockingStub.emptyCall(Empty.getDefaultInstance()); 1209 assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); 1210 ResponseParameters.Builder responseParameters = ResponseParameters.newBuilder() 1211 .setSize(1) 1212 .setIntervalUs(10000); 1213 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1214 .addResponseParameters(responseParameters) 1215 .addResponseParameters(responseParameters) 1216 .addResponseParameters(responseParameters) 1217 .addResponseParameters(responseParameters) 1218 .build(); 1219 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1220 asyncStub 1221 .withDeadlineAfter(30, TimeUnit.MILLISECONDS) 1222 .streamingOutputCall(request, recorder); 1223 recorder.awaitCompletion(); 1224 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), 1225 Status.fromThrowable(recorder.getError()).getCode()); 1226 if (metricsExpected()) { 1227 // Stream may not have been created when deadline is exceeded, thus we don't check tracer 1228 // stats. 1229 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1230 checkStartTags( 1231 clientStartRecord, "grpc.testing.TestService/StreamingOutputCall", true); 1232 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1233 checkEndTags( 1234 clientEndRecord, 1235 "grpc.testing.TestService/StreamingOutputCall", 1236 Status.Code.DEADLINE_EXCEEDED, true); 1237 // Do not check server-side metrics, because the status on the server side is undetermined. 1238 } 1239 } 1240 1241 @Test deadlineInPast()1242 public void deadlineInPast() throws Exception { 1243 // Test once with idle channel and once with active channel 1244 try { 1245 blockingStub 1246 .withDeadlineAfter(-10, TimeUnit.SECONDS) 1247 .emptyCall(Empty.getDefaultInstance()); 1248 fail("Should have thrown"); 1249 } catch (StatusRuntimeException ex) { 1250 assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); 1251 assertThat(ex.getStatus().getDescription()) 1252 .startsWith("ClientCall started after CallOptions deadline was exceeded"); 1253 } 1254 1255 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 1256 // recorded. The tracer stats rely on the stream being created, which is not the case if 1257 // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. 1258 if (metricsExpected()) { 1259 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1260 checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall", true); 1261 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1262 checkEndTags( 1263 clientEndRecord, "grpc.testing.TestService/EmptyCall", 1264 Status.DEADLINE_EXCEEDED.getCode(), true); 1265 assertZeroRetryRecorded(); 1266 } 1267 1268 // warm up the channel 1269 blockingStub.emptyCall(Empty.getDefaultInstance()); 1270 if (metricsExpected()) { 1271 // clientStartRecord 1272 clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1273 // clientEndRecord 1274 clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1275 assertZeroRetryRecorded(); 1276 } 1277 try { 1278 blockingStub 1279 .withDeadlineAfter(-10, TimeUnit.SECONDS) 1280 .emptyCall(Empty.getDefaultInstance()); 1281 fail("Should have thrown"); 1282 } catch (StatusRuntimeException ex) { 1283 assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); 1284 assertThat(ex.getStatus().getDescription()) 1285 .startsWith("ClientCall started after CallOptions deadline was exceeded"); 1286 } 1287 if (metricsExpected()) { 1288 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1289 checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall", true); 1290 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1291 checkEndTags( 1292 clientEndRecord, "grpc.testing.TestService/EmptyCall", 1293 Status.DEADLINE_EXCEEDED.getCode(), true); 1294 assertZeroRetryRecorded(); 1295 } 1296 } 1297 1298 @Test maxInboundSize_exact()1299 public void maxInboundSize_exact() { 1300 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1301 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1302 .build(); 1303 1304 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1305 TestServiceGrpc.getStreamingOutputCallMethod(); 1306 ByteSizeMarshaller<StreamingOutputCallResponse> mar = 1307 new ByteSizeMarshaller<>(md.getResponseMarshaller()); 1308 blockingServerStreamingCall( 1309 blockingStub.getChannel(), 1310 md.toBuilder(md.getRequestMarshaller(), mar).build(), 1311 blockingStub.getCallOptions(), 1312 request) 1313 .next(); 1314 1315 int size = mar.lastInSize; 1316 1317 TestServiceGrpc.TestServiceBlockingStub stub = 1318 blockingStub.withMaxInboundMessageSize(size); 1319 1320 stub.streamingOutputCall(request).next(); 1321 } 1322 1323 @Test maxInboundSize_tooBig()1324 public void maxInboundSize_tooBig() { 1325 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1326 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1327 .build(); 1328 1329 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1330 TestServiceGrpc.getStreamingOutputCallMethod(); 1331 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1332 new ByteSizeMarshaller<>(md.getRequestMarshaller()); 1333 blockingServerStreamingCall( 1334 blockingStub.getChannel(), 1335 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1336 blockingStub.getCallOptions(), 1337 request) 1338 .next(); 1339 1340 int size = mar.lastOutSize; 1341 1342 TestServiceGrpc.TestServiceBlockingStub stub = 1343 blockingStub.withMaxInboundMessageSize(size - 1); 1344 1345 try { 1346 stub.streamingOutputCall(request).next(); 1347 fail(); 1348 } catch (StatusRuntimeException ex) { 1349 Status s = ex.getStatus(); 1350 assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.RESOURCE_EXHAUSTED); 1351 assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum"); 1352 } 1353 } 1354 1355 @Test maxOutboundSize_exact()1356 public void maxOutboundSize_exact() { 1357 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1358 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1359 .build(); 1360 1361 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1362 TestServiceGrpc.getStreamingOutputCallMethod(); 1363 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1364 new ByteSizeMarshaller<>(md.getRequestMarshaller()); 1365 blockingServerStreamingCall( 1366 blockingStub.getChannel(), 1367 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1368 blockingStub.getCallOptions(), 1369 request) 1370 .next(); 1371 1372 int size = mar.lastOutSize; 1373 1374 TestServiceGrpc.TestServiceBlockingStub stub = 1375 blockingStub.withMaxOutboundMessageSize(size); 1376 1377 stub.streamingOutputCall(request).next(); 1378 } 1379 1380 @Test maxOutboundSize_tooBig()1381 public void maxOutboundSize_tooBig() { 1382 // set at least one field to ensure the size is non-zero. 1383 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1384 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1385 .build(); 1386 1387 1388 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1389 TestServiceGrpc.getStreamingOutputCallMethod(); 1390 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1391 new ByteSizeMarshaller<>(md.getRequestMarshaller()); 1392 blockingServerStreamingCall( 1393 blockingStub.getChannel(), 1394 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1395 blockingStub.getCallOptions(), 1396 request) 1397 .next(); 1398 1399 TestServiceGrpc.TestServiceBlockingStub stub = 1400 blockingStub.withMaxOutboundMessageSize(mar.lastOutSize - 1); 1401 try { 1402 stub.streamingOutputCall(request).next(); 1403 fail(); 1404 } catch (StatusRuntimeException ex) { 1405 Status s = ex.getStatus(); 1406 assertWithMessage(s.toString()).that(s.getCode()).isEqualTo(Status.Code.CANCELLED); 1407 assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large"); 1408 } 1409 } 1410 unaryPayloadLength()1411 protected int unaryPayloadLength() { 1412 // 10MiB. 1413 return 10485760; 1414 } 1415 1416 @Test gracefulShutdown()1417 public void gracefulShutdown() throws Exception { 1418 final List<StreamingOutputCallRequest> requests = Arrays.asList( 1419 StreamingOutputCallRequest.newBuilder() 1420 .addResponseParameters(ResponseParameters.newBuilder() 1421 .setSize(3)) 1422 .setPayload(Payload.newBuilder() 1423 .setBody(ByteString.copyFrom(new byte[2]))) 1424 .build(), 1425 StreamingOutputCallRequest.newBuilder() 1426 .addResponseParameters(ResponseParameters.newBuilder() 1427 .setSize(1)) 1428 .setPayload(Payload.newBuilder() 1429 .setBody(ByteString.copyFrom(new byte[7]))) 1430 .build(), 1431 StreamingOutputCallRequest.newBuilder() 1432 .addResponseParameters(ResponseParameters.newBuilder() 1433 .setSize(4)) 1434 .setPayload(Payload.newBuilder() 1435 .setBody(ByteString.copyFrom(new byte[1]))) 1436 .build()); 1437 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 1438 StreamingOutputCallResponse.newBuilder() 1439 .setPayload(Payload.newBuilder() 1440 .setBody(ByteString.copyFrom(new byte[3]))) 1441 .build(), 1442 StreamingOutputCallResponse.newBuilder() 1443 .setPayload(Payload.newBuilder() 1444 .setBody(ByteString.copyFrom(new byte[1]))) 1445 .build(), 1446 StreamingOutputCallResponse.newBuilder() 1447 .setPayload(Payload.newBuilder() 1448 .setBody(ByteString.copyFrom(new byte[4]))) 1449 .build()); 1450 1451 final ArrayBlockingQueue<StreamingOutputCallResponse> responses = 1452 new ArrayBlockingQueue<>(3); 1453 final SettableFuture<Void> completed = SettableFuture.create(); 1454 final SettableFuture<Void> errorSeen = SettableFuture.create(); 1455 StreamObserver<StreamingOutputCallResponse> responseObserver = 1456 new StreamObserver<StreamingOutputCallResponse>() { 1457 1458 @Override 1459 public void onNext(StreamingOutputCallResponse value) { 1460 responses.add(value); 1461 } 1462 1463 @Override 1464 public void onError(Throwable t) { 1465 errorSeen.set(null); 1466 } 1467 1468 @Override 1469 public void onCompleted() { 1470 completed.set(null); 1471 } 1472 }; 1473 StreamObserver<StreamingOutputCallRequest> requestObserver 1474 = asyncStub.fullDuplexCall(responseObserver); 1475 requestObserver.onNext(requests.get(0)); 1476 assertResponse( 1477 goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1478 // Initiate graceful shutdown. 1479 channel.shutdown(); 1480 requestObserver.onNext(requests.get(1)); 1481 assertResponse( 1482 goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1483 // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't. 1484 requestObserver.onNext(requests.get(2)); 1485 assertResponse( 1486 goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1487 assertFalse(completed.isDone()); 1488 requestObserver.onCompleted(); 1489 completed.get(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 1490 assertFalse(errorSeen.isDone()); 1491 } 1492 1493 @Test customMetadata()1494 public void customMetadata() throws Exception { 1495 final int responseSize = 314159; 1496 final int requestSize = 271828; 1497 final SimpleRequest request = SimpleRequest.newBuilder() 1498 .setResponseSize(responseSize) 1499 .setPayload(Payload.newBuilder() 1500 .setBody(ByteString.copyFrom(new byte[requestSize]))) 1501 .build(); 1502 final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder() 1503 .addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize)) 1504 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize]))) 1505 .build(); 1506 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1507 .setPayload(Payload.newBuilder() 1508 .setBody(ByteString.copyFrom(new byte[responseSize]))) 1509 .build(); 1510 final StreamingOutputCallResponse goldenStreamingResponse = 1511 StreamingOutputCallResponse.newBuilder() 1512 .setPayload(Payload.newBuilder() 1513 .setBody(ByteString.copyFrom(new byte[responseSize]))) 1514 .build(); 1515 final byte[] trailingBytes = 1516 {(byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb}; 1517 1518 // Test UnaryCall 1519 Metadata metadata = new Metadata(); 1520 metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value"); 1521 metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes); 1522 AtomicReference<Metadata> headersCapture = new AtomicReference<>(); 1523 AtomicReference<Metadata> trailersCapture = new AtomicReference<>(); 1524 TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub.withInterceptors( 1525 MetadataUtils.newAttachHeadersInterceptor(metadata), 1526 MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); 1527 SimpleResponse response = blockingStub.unaryCall(request); 1528 1529 assertResponse(goldenResponse, response); 1530 assertEquals("test_initial_metadata_value", 1531 headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY)); 1532 assertTrue( 1533 Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY))); 1534 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK, 1535 Collections.singleton(request), Collections.singleton(goldenResponse)); 1536 1537 // Test FullDuplexCall 1538 metadata = new Metadata(); 1539 metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value"); 1540 metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes); 1541 headersCapture = new AtomicReference<>(); 1542 trailersCapture = new AtomicReference<>(); 1543 TestServiceGrpc.TestServiceStub stub = asyncStub.withInterceptors( 1544 MetadataUtils.newAttachHeadersInterceptor(metadata), 1545 MetadataUtils.newCaptureMetadataInterceptor(headersCapture, trailersCapture)); 1546 1547 StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1548 StreamObserver<Messages.StreamingOutputCallRequest> requestStream = 1549 stub.fullDuplexCall(recorder); 1550 requestStream.onNext(streamingRequest); 1551 requestStream.onCompleted(); 1552 recorder.awaitCompletion(); 1553 1554 assertSuccess(recorder); 1555 assertResponse(goldenStreamingResponse, recorder.firstValue().get()); 1556 assertEquals("test_initial_metadata_value", 1557 headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY)); 1558 assertTrue( 1559 Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY))); 1560 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, 1561 Collections.singleton(streamingRequest), Collections.singleton(goldenStreamingResponse)); 1562 } 1563 1564 @SuppressWarnings("deprecation") 1565 @Test(timeout = 10000) censusContextsPropagated()1566 public void censusContextsPropagated() { 1567 Assume.assumeTrue("Skip the test because server is not in the same process.", server != null); 1568 Assume.assumeTrue(customCensusModulePresent()); 1569 Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan(); 1570 // A valid ID is guaranteed to be unique, so we can verify it is actually propagated. 1571 assertTrue(clientParentSpan.getContext().getTraceId().isValid()); 1572 Context ctx = 1573 io.opencensus.tags.unsafe.ContextUtils.withValue( 1574 Context.ROOT, 1575 tagger 1576 .emptyBuilder() 1577 .putLocal(StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")) 1578 .build()); 1579 ctx = io.opencensus.trace.unsafe.ContextUtils.withValue(ctx, clientParentSpan); 1580 Context origCtx = ctx.attach(); 1581 try { 1582 blockingStub.unaryCall(SimpleRequest.getDefaultInstance()); 1583 Context serverCtx = contextCapture.get(); 1584 assertNotNull(serverCtx); 1585 1586 FakeTagContext statsCtx = 1587 (FakeTagContext) io.opencensus.tags.unsafe.ContextUtils.getValue(serverCtx); 1588 assertNotNull(statsCtx); 1589 Map<TagKey, TagValue> tags = statsCtx.getTags(); 1590 boolean tagFound = false; 1591 for (Map.Entry<TagKey, TagValue> tag : tags.entrySet()) { 1592 if (tag.getKey().equals(StatsTestUtils.EXTRA_TAG)) { 1593 assertEquals(TagValue.create("extra value"), tag.getValue()); 1594 tagFound = true; 1595 } 1596 } 1597 assertTrue("tag not found", tagFound); 1598 1599 Span span = io.opencensus.trace.unsafe.ContextUtils.getValue(serverCtx); 1600 assertNotNull(span); 1601 SpanContext spanContext = span.getContext(); 1602 assertEquals(clientParentSpan.getContext().getTraceId(), spanContext.getTraceId()); 1603 } finally { 1604 ctx.detach(origCtx); 1605 } 1606 } 1607 1608 @Test statusCodeAndMessage()1609 public void statusCodeAndMessage() throws Exception { 1610 int errorCode = 2; 1611 String errorMessage = "test status message"; 1612 EchoStatus responseStatus = EchoStatus.newBuilder() 1613 .setCode(errorCode) 1614 .setMessage(errorMessage) 1615 .build(); 1616 SimpleRequest simpleRequest = SimpleRequest.newBuilder() 1617 .setResponseStatus(responseStatus) 1618 .build(); 1619 StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder() 1620 .setResponseStatus(responseStatus) 1621 .build(); 1622 1623 // Test UnaryCall 1624 try { 1625 blockingStub.unaryCall(simpleRequest); 1626 fail(); 1627 } catch (StatusRuntimeException e) { 1628 assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); 1629 assertEquals(errorMessage, e.getStatus().getDescription()); 1630 } 1631 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN); 1632 1633 // Test FullDuplexCall 1634 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 1635 StreamObserver<StreamingOutputCallRequest> requestObserver 1636 = asyncStub.fullDuplexCall(responseObserver); 1637 requestObserver.onNext(streamingRequest); 1638 requestObserver.onCompleted(); 1639 1640 assertThat(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS)) 1641 .isTrue(); 1642 assertThat(responseObserver.getError()).isNotNull(); 1643 Status status = Status.fromThrowable(responseObserver.getError()); 1644 assertEquals(Status.UNKNOWN.getCode(), status.getCode()); 1645 assertEquals(errorMessage, status.getDescription()); 1646 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN); 1647 } 1648 1649 @Test specialStatusMessage()1650 public void specialStatusMessage() throws Exception { 1651 int errorCode = 2; 1652 String errorMessage = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n"; 1653 SimpleRequest simpleRequest = SimpleRequest.newBuilder() 1654 .setResponseStatus(EchoStatus.newBuilder() 1655 .setCode(errorCode) 1656 .setMessage(errorMessage) 1657 .build()) 1658 .build(); 1659 1660 try { 1661 blockingStub.unaryCall(simpleRequest); 1662 fail(); 1663 } catch (StatusRuntimeException e) { 1664 assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); 1665 assertEquals(errorMessage, e.getStatus().getDescription()); 1666 } 1667 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN); 1668 } 1669 1670 /** Sends an rpc to an unimplemented method within TestService. */ 1671 @Test unimplementedMethod()1672 public void unimplementedMethod() { 1673 try { 1674 blockingStub.unimplementedCall(Empty.getDefaultInstance()); 1675 fail(); 1676 } catch (StatusRuntimeException e) { 1677 assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); 1678 } 1679 1680 assertClientStatsTrace("grpc.testing.TestService/UnimplementedCall", 1681 Status.Code.UNIMPLEMENTED); 1682 } 1683 1684 /** Sends an rpc to an unimplemented service on the server. */ 1685 @Test unimplementedService()1686 public void unimplementedService() { 1687 UnimplementedServiceGrpc.UnimplementedServiceBlockingStub stub = 1688 UnimplementedServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); 1689 try { 1690 stub.unimplementedCall(Empty.getDefaultInstance()); 1691 fail(); 1692 } catch (StatusRuntimeException e) { 1693 assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); 1694 } 1695 1696 assertStatsTrace("grpc.testing.UnimplementedService/UnimplementedCall", 1697 Status.Code.UNIMPLEMENTED); 1698 } 1699 1700 /** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */ 1701 @SuppressWarnings("MissingFail") 1702 @Test timeoutOnSleepingServer()1703 public void timeoutOnSleepingServer() throws Exception { 1704 TestServiceGrpc.TestServiceStub stub = 1705 asyncStub.withDeadlineAfter(1, TimeUnit.MILLISECONDS); 1706 1707 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 1708 StreamObserver<StreamingOutputCallRequest> requestObserver 1709 = stub.fullDuplexCall(responseObserver); 1710 1711 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1712 .setPayload(Payload.newBuilder() 1713 .setBody(ByteString.copyFrom(new byte[27182]))) 1714 .build(); 1715 try { 1716 requestObserver.onNext(request); 1717 } catch (IllegalStateException expected) { 1718 // This can happen if the stream has already been terminated due to deadline exceeded. 1719 } 1720 1721 assertTrue(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1722 assertEquals(0, responseObserver.getValues().size()); 1723 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), 1724 Status.fromThrowable(responseObserver.getError()).getCode()); 1725 1726 if (metricsExpected()) { 1727 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 1728 // recorded. The tracer stats rely on the stream being created, which is not always the case 1729 // in this test, thus we will not check that. 1730 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1731 checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall", true); 1732 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1733 checkEndTags( 1734 clientEndRecord, 1735 "grpc.testing.TestService/FullDuplexCall", 1736 Status.DEADLINE_EXCEEDED.getCode(), true); 1737 } 1738 } 1739 1740 /** 1741 * Verifies remote server address and local client address are available from ClientCall 1742 * Attributes via ClientInterceptor. 1743 */ 1744 @Test getServerAddressAndLocalAddressFromClient()1745 public void getServerAddressAndLocalAddressFromClient() { 1746 assertNotNull(obtainRemoteServerAddr()); 1747 assertNotNull(obtainLocalClientAddr()); 1748 } 1749 1750 /** 1751 * Test backend metrics per query reporting: expect the test client LB policy to receive load 1752 * reports. 1753 */ testOrcaPerRpc()1754 public void testOrcaPerRpc() throws Exception { 1755 AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>(); 1756 TestOrcaReport answer = TestOrcaReport.newBuilder() 1757 .setCpuUtilization(0.8210) 1758 .setMemoryUtilization(0.5847) 1759 .putRequestCost("cost", 3456.32) 1760 .putUtilization("util", 0.30499) 1761 .build(); 1762 blockingStub.withOption(ORCA_RPC_REPORT_KEY, reportHolder).unaryCall( 1763 SimpleRequest.newBuilder().setOrcaPerQueryReport(answer).build()); 1764 assertThat(reportHolder.get()).isEqualTo(answer); 1765 } 1766 1767 /** 1768 * Test backend metrics OOB reporting: expect the test client LB policy to receive load reports. 1769 */ testOrcaOob()1770 public void testOrcaOob() throws Exception { 1771 AtomicReference<TestOrcaReport> reportHolder = new AtomicReference<>(); 1772 final TestOrcaReport answer = TestOrcaReport.newBuilder() 1773 .setCpuUtilization(0.8210) 1774 .setMemoryUtilization(0.5847) 1775 .putUtilization("util", 0.30499) 1776 .build(); 1777 final TestOrcaReport answer2 = TestOrcaReport.newBuilder() 1778 .setCpuUtilization(0.29309) 1779 .setMemoryUtilization(0.2) 1780 .putUtilization("util", 0.2039) 1781 .build(); 1782 1783 final int retryLimit = 5; 1784 BlockingQueue<Object> queue = new LinkedBlockingQueue<>(); 1785 final Object lastItem = new Object(); 1786 StreamObserver<StreamingOutputCallRequest> streamObserver = 1787 asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() { 1788 1789 @Override 1790 public void onNext(StreamingOutputCallResponse value) { 1791 queue.add(value); 1792 } 1793 1794 @Override 1795 public void onError(Throwable t) { 1796 queue.add(t); 1797 } 1798 1799 @Override 1800 public void onCompleted() { 1801 queue.add(lastItem); 1802 } 1803 }); 1804 1805 streamObserver.onNext(StreamingOutputCallRequest.newBuilder() 1806 .setOrcaOobReport(answer) 1807 .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); 1808 assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); 1809 int i = 0; 1810 for (; i < retryLimit; i++) { 1811 Thread.sleep(1000); 1812 blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); 1813 if (answer.equals(reportHolder.get())) { 1814 break; 1815 } 1816 } 1817 assertThat(i).isLessThan(retryLimit); 1818 streamObserver.onNext(StreamingOutputCallRequest.newBuilder() 1819 .setOrcaOobReport(answer2) 1820 .addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build()); 1821 assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class); 1822 1823 for (i = 0; i < retryLimit; i++) { 1824 Thread.sleep(1000); 1825 blockingStub.withOption(ORCA_OOB_REPORT_KEY, reportHolder).emptyCall(EMPTY); 1826 if (reportHolder.get().equals(answer2)) { 1827 break; 1828 } 1829 } 1830 assertThat(i).isLessThan(retryLimit); 1831 streamObserver.onCompleted(); 1832 assertThat(queue.take()).isSameInstanceAs(lastItem); 1833 } 1834 1835 /** Sends a large unary rpc with service account credentials. */ serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)1836 public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) 1837 throws Exception { 1838 // cast to ServiceAccountCredentials to double-check the right type of object was created. 1839 GoogleCredentials credentials = 1840 ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream)); 1841 credentials = credentials.createScoped(Arrays.asList(authScope)); 1842 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1843 .withCallCredentials(MoreCallCredentials.from(credentials)); 1844 final SimpleRequest request = SimpleRequest.newBuilder() 1845 .setFillUsername(true) 1846 .setFillOauthScope(true) 1847 .setResponseSize(314159) 1848 .setPayload(Payload.newBuilder() 1849 .setBody(ByteString.copyFrom(new byte[271828]))) 1850 .build(); 1851 1852 final SimpleResponse response = stub.unaryCall(request); 1853 assertFalse(response.getUsername().isEmpty()); 1854 assertTrue("Received username: " + response.getUsername(), 1855 jsonKey.contains(response.getUsername())); 1856 assertFalse(response.getOauthScope().isEmpty()); 1857 assertTrue("Received oauth scope: " + response.getOauthScope(), 1858 authScope.contains(response.getOauthScope())); 1859 1860 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1861 .setOauthScope(response.getOauthScope()) 1862 .setUsername(response.getUsername()) 1863 .setPayload(Payload.newBuilder() 1864 .setBody(ByteString.copyFrom(new byte[314159]))) 1865 .build(); 1866 assertResponse(goldenResponse, response); 1867 } 1868 1869 /** Sends a large unary rpc with compute engine credentials. */ computeEngineCreds(String serviceAccount, String oauthScope)1870 public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception { 1871 ComputeEngineCredentials credentials = ComputeEngineCredentials.create(); 1872 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1873 .withCallCredentials(MoreCallCredentials.from(credentials)); 1874 final SimpleRequest request = SimpleRequest.newBuilder() 1875 .setFillUsername(true) 1876 .setFillOauthScope(true) 1877 .setResponseSize(314159) 1878 .setPayload(Payload.newBuilder() 1879 .setBody(ByteString.copyFrom(new byte[271828]))) 1880 .build(); 1881 1882 final SimpleResponse response = stub.unaryCall(request); 1883 assertEquals(serviceAccount, response.getUsername()); 1884 assertFalse(response.getOauthScope().isEmpty()); 1885 assertTrue("Received oauth scope: " + response.getOauthScope(), 1886 oauthScope.contains(response.getOauthScope())); 1887 1888 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1889 .setOauthScope(response.getOauthScope()) 1890 .setUsername(response.getUsername()) 1891 .setPayload(Payload.newBuilder() 1892 .setBody(ByteString.copyFrom(new byte[314159]))) 1893 .build(); 1894 assertResponse(goldenResponse, response); 1895 } 1896 1897 /** Sends an unary rpc with ComputeEngineChannelBuilder. */ computeEngineChannelCredentials( String defaultServiceAccount, TestServiceGrpc.TestServiceBlockingStub computeEngineStub)1898 public void computeEngineChannelCredentials( 1899 String defaultServiceAccount, 1900 TestServiceGrpc.TestServiceBlockingStub computeEngineStub) throws Exception { 1901 final SimpleRequest request = SimpleRequest.newBuilder() 1902 .setFillUsername(true) 1903 .setResponseSize(314159) 1904 .setPayload(Payload.newBuilder() 1905 .setBody(ByteString.copyFrom(new byte[271828]))) 1906 .build(); 1907 final SimpleResponse response = computeEngineStub.unaryCall(request); 1908 assertEquals(defaultServiceAccount, response.getUsername()); 1909 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1910 .setUsername(defaultServiceAccount) 1911 .setPayload(Payload.newBuilder() 1912 .setBody(ByteString.copyFrom(new byte[314159]))) 1913 .build(); 1914 assertResponse(goldenResponse, response); 1915 } 1916 1917 /** Test JWT-based auth. */ jwtTokenCreds(InputStream serviceAccountJson)1918 public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception { 1919 final SimpleRequest request = SimpleRequest.newBuilder() 1920 .setResponseSize(314159) 1921 .setPayload(Payload.newBuilder() 1922 .setBody(ByteString.copyFrom(new byte[271828]))) 1923 .setFillUsername(true) 1924 .build(); 1925 1926 ServiceAccountCredentials credentials = (ServiceAccountCredentials) 1927 GoogleCredentials.fromStream(serviceAccountJson); 1928 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1929 .withCallCredentials(MoreCallCredentials.from(credentials)); 1930 SimpleResponse response = stub.unaryCall(request); 1931 assertEquals(credentials.getClientEmail(), response.getUsername()); 1932 assertEquals(314159, response.getPayload().getBody().size()); 1933 } 1934 1935 /** Sends a unary rpc with raw oauth2 access token credentials. */ oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)1936 public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) 1937 throws Exception { 1938 GoogleCredentials utilCredentials = 1939 GoogleCredentials.fromStream(credentialsStream); 1940 utilCredentials = utilCredentials.createScoped(Arrays.asList(authScope)); 1941 AccessToken accessToken = utilCredentials.refreshAccessToken(); 1942 1943 OAuth2Credentials credentials = OAuth2Credentials.create(accessToken); 1944 1945 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1946 .withCallCredentials(MoreCallCredentials.from(credentials)); 1947 final SimpleRequest request = SimpleRequest.newBuilder() 1948 .setFillUsername(true) 1949 .setFillOauthScope(true) 1950 .build(); 1951 1952 final SimpleResponse response = stub.unaryCall(request); 1953 assertFalse(response.getUsername().isEmpty()); 1954 assertTrue("Received username: " + response.getUsername(), 1955 jsonKey.contains(response.getUsername())); 1956 assertFalse(response.getOauthScope().isEmpty()); 1957 assertTrue("Received oauth scope: " + response.getOauthScope(), 1958 authScope.contains(response.getOauthScope())); 1959 } 1960 1961 /** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */ perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)1962 public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) 1963 throws Exception { 1964 // In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once 1965 // for that purpose. 1966 // So, this test is identical to oauth2_auth_token test. 1967 oauth2AuthToken(jsonKey, credentialsStream, oauthScope); 1968 } 1969 1970 /** Sends an unary rpc with "google default credentials". */ googleDefaultCredentials( String defaultServiceAccount, TestServiceGrpc.TestServiceBlockingStub googleDefaultStub)1971 public void googleDefaultCredentials( 1972 String defaultServiceAccount, 1973 TestServiceGrpc.TestServiceBlockingStub googleDefaultStub) throws Exception { 1974 final SimpleRequest request = SimpleRequest.newBuilder() 1975 .setFillUsername(true) 1976 .setResponseSize(314159) 1977 .setPayload(Payload.newBuilder() 1978 .setBody(ByteString.copyFrom(new byte[271828]))) 1979 .build(); 1980 final SimpleResponse response = googleDefaultStub.unaryCall(request); 1981 assertEquals(defaultServiceAccount, response.getUsername()); 1982 1983 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1984 .setUsername(defaultServiceAccount) 1985 .setPayload(Payload.newBuilder() 1986 .setBody(ByteString.copyFrom(new byte[314159]))) 1987 .build(); 1988 assertResponse(goldenResponse, response); 1989 } 1990 1991 private static class SoakIterationResult { SoakIterationResult(long latencyMs, Status status)1992 public SoakIterationResult(long latencyMs, Status status) { 1993 this.latencyMs = latencyMs; 1994 this.status = status; 1995 } 1996 getLatencyMs()1997 public long getLatencyMs() { 1998 return latencyMs; 1999 } 2000 getStatus()2001 public Status getStatus() { 2002 return status; 2003 } 2004 2005 private long latencyMs = -1; 2006 private Status status = Status.OK; 2007 } 2008 performOneSoakIteration( TestServiceGrpc.TestServiceBlockingStub soakStub)2009 private SoakIterationResult performOneSoakIteration( 2010 TestServiceGrpc.TestServiceBlockingStub soakStub) throws Exception { 2011 long startNs = System.nanoTime(); 2012 Status status = Status.OK; 2013 try { 2014 final SimpleRequest request = 2015 SimpleRequest.newBuilder() 2016 .setResponseSize(314159) 2017 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 2018 .build(); 2019 final SimpleResponse goldenResponse = 2020 SimpleResponse.newBuilder() 2021 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) 2022 .build(); 2023 assertResponse(goldenResponse, soakStub.unaryCall(request)); 2024 } catch (StatusRuntimeException e) { 2025 status = e.getStatus(); 2026 } 2027 long elapsedNs = System.nanoTime() - startNs; 2028 return new SoakIterationResult(TimeUnit.NANOSECONDS.toMillis(elapsedNs), status); 2029 } 2030 2031 /** 2032 * Runs large unary RPCs in a loop with configurable failure thresholds 2033 * and channel creation behavior. 2034 */ performSoakTest( String serverUri, boolean resetChannelPerIteration, int soakIterations, int maxFailures, int maxAcceptablePerIterationLatencyMs, int minTimeMsBetweenRpcs, int overallTimeoutSeconds)2035 public void performSoakTest( 2036 String serverUri, 2037 boolean resetChannelPerIteration, 2038 int soakIterations, 2039 int maxFailures, 2040 int maxAcceptablePerIterationLatencyMs, 2041 int minTimeMsBetweenRpcs, 2042 int overallTimeoutSeconds) 2043 throws Exception { 2044 int iterationsDone = 0; 2045 int totalFailures = 0; 2046 Histogram latencies = new Histogram(4 /* number of significant value digits */); 2047 long startNs = System.nanoTime(); 2048 ManagedChannel soakChannel = createChannel(); 2049 TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc 2050 .newBlockingStub(soakChannel) 2051 .withInterceptors(recordClientCallInterceptor(clientCallCapture)); 2052 for (int i = 0; i < soakIterations; i++) { 2053 if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) { 2054 break; 2055 } 2056 long earliestNextStartNs = System.nanoTime() 2057 + TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs); 2058 if (resetChannelPerIteration) { 2059 soakChannel.shutdownNow(); 2060 soakChannel.awaitTermination(10, TimeUnit.SECONDS); 2061 soakChannel = createChannel(); 2062 soakStub = TestServiceGrpc 2063 .newBlockingStub(soakChannel) 2064 .withInterceptors(recordClientCallInterceptor(clientCallCapture)); 2065 } 2066 SoakIterationResult result = performOneSoakIteration(soakStub); 2067 SocketAddress peer = clientCallCapture 2068 .get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 2069 StringBuilder logStr = new StringBuilder( 2070 String.format( 2071 Locale.US, 2072 "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s", 2073 i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri)); 2074 if (!result.getStatus().equals(Status.OK)) { 2075 totalFailures++; 2076 logStr.append(String.format(" failed: %s", result.getStatus())); 2077 } else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) { 2078 totalFailures++; 2079 logStr.append( 2080 " exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs); 2081 } else { 2082 logStr.append(" succeeded"); 2083 } 2084 System.err.println(logStr.toString()); 2085 iterationsDone++; 2086 latencies.recordValue(result.getLatencyMs()); 2087 long remainingNs = earliestNextStartNs - System.nanoTime(); 2088 if (remainingNs > 0) { 2089 TimeUnit.NANOSECONDS.sleep(remainingNs); 2090 } 2091 } 2092 soakChannel.shutdownNow(); 2093 soakChannel.awaitTermination(10, TimeUnit.SECONDS); 2094 System.err.println( 2095 String.format( 2096 Locale.US, 2097 "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. " 2098 + "p50: %d ms, p90: %d ms, p100: %d ms", 2099 serverUri, 2100 iterationsDone, 2101 soakIterations, 2102 totalFailures, 2103 latencies.getValueAtPercentile(50), 2104 latencies.getValueAtPercentile(90), 2105 latencies.getValueAtPercentile(100))); 2106 // check if we timed out 2107 String timeoutErrorMessage = 2108 String.format( 2109 Locale.US, 2110 "(server_uri: %s) soak test consumed all %d seconds of time and quit early, " 2111 + "only having ran %d out of desired %d iterations.", 2112 serverUri, 2113 overallTimeoutSeconds, 2114 iterationsDone, 2115 soakIterations); 2116 assertEquals(timeoutErrorMessage, iterationsDone, soakIterations); 2117 // check if we had too many failures 2118 String tooManyFailuresErrorMessage = 2119 String.format( 2120 Locale.US, 2121 "(server_uri: %s) soak test total failures: %d exceeds max failures " 2122 + "threshold: %d.", 2123 serverUri, totalFailures, maxFailures); 2124 assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures); 2125 } 2126 assertSuccess(StreamRecorder<?> recorder)2127 protected static void assertSuccess(StreamRecorder<?> recorder) { 2128 if (recorder.getError() != null) { 2129 throw new AssertionError(recorder.getError()); 2130 } 2131 } 2132 2133 /** Helper for getting remote address from {@link io.grpc.ServerCall#getAttributes()}. */ obtainRemoteClientAddr()2134 protected SocketAddress obtainRemoteClientAddr() { 2135 TestServiceGrpc.TestServiceBlockingStub stub = 2136 blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS); 2137 2138 stub.unaryCall(SimpleRequest.getDefaultInstance()); 2139 2140 return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 2141 } 2142 2143 /** Helper for getting remote address from {@link io.grpc.ClientCall#getAttributes()}. */ obtainRemoteServerAddr()2144 protected SocketAddress obtainRemoteServerAddr() { 2145 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 2146 .withInterceptors(recordClientCallInterceptor(clientCallCapture)) 2147 .withDeadlineAfter(5, TimeUnit.SECONDS); 2148 2149 stub.unaryCall(SimpleRequest.getDefaultInstance()); 2150 2151 return clientCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 2152 } 2153 2154 /** Helper for getting local address from {@link io.grpc.ServerCall#getAttributes()}. */ obtainLocalServerAddr()2155 protected SocketAddress obtainLocalServerAddr() { 2156 TestServiceGrpc.TestServiceBlockingStub stub = 2157 blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS); 2158 2159 stub.unaryCall(SimpleRequest.getDefaultInstance()); 2160 2161 return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); 2162 } 2163 2164 /** Helper for getting local address from {@link io.grpc.ClientCall#getAttributes()}. */ obtainLocalClientAddr()2165 protected SocketAddress obtainLocalClientAddr() { 2166 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 2167 .withInterceptors(recordClientCallInterceptor(clientCallCapture)) 2168 .withDeadlineAfter(5, TimeUnit.SECONDS); 2169 2170 stub.unaryCall(SimpleRequest.getDefaultInstance()); 2171 2172 return clientCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR); 2173 } 2174 2175 /** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()}. */ assertX500SubjectDn(String tlsInfo)2176 protected void assertX500SubjectDn(String tlsInfo) { 2177 TestServiceGrpc.TestServiceBlockingStub stub = 2178 blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS); 2179 2180 stub.unaryCall(SimpleRequest.getDefaultInstance()); 2181 2182 List<Certificate> certificates; 2183 SSLSession sslSession = 2184 serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION); 2185 try { 2186 certificates = Arrays.asList(sslSession.getPeerCertificates()); 2187 } catch (SSLPeerUnverifiedException e) { 2188 // Should never happen 2189 throw new AssertionError(e); 2190 } 2191 2192 X509Certificate x509cert = (X509Certificate) certificates.get(0); 2193 2194 assertEquals(1, certificates.size()); 2195 assertEquals(tlsInfo, x509cert.getSubjectDN().toString()); 2196 } 2197 operationTimeoutMillis()2198 protected int operationTimeoutMillis() { 2199 return 5000; 2200 } 2201 2202 /** 2203 * Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is 2204 * chosen as a maximum amount of memory a large test would need. 2205 */ assumeEnoughMemory()2206 protected static void assumeEnoughMemory() { 2207 Runtime r = Runtime.getRuntime(); 2208 long usedMem = r.totalMemory() - r.freeMemory(); 2209 long actuallyFreeMemory = r.maxMemory() - usedMem; 2210 Assume.assumeTrue( 2211 actuallyFreeMemory + " is not sufficient to run this test", 2212 actuallyFreeMemory >= 64 * 1024 * 1024); 2213 } 2214 2215 /** 2216 * Poll the next metrics record and check it against the provided information, including the 2217 * message sizes. 2218 */ assertStatsTrace(String method, Status.Code status, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2219 private void assertStatsTrace(String method, Status.Code status, 2220 Collection<? extends MessageLite> requests, 2221 Collection<? extends MessageLite> responses) { 2222 assertClientStatsTrace(method, status, requests, responses); 2223 assertServerStatsTrace(method, status, requests, responses); 2224 } 2225 2226 /** 2227 * Poll the next metrics record and check it against the provided information, without checking 2228 * the message sizes. 2229 */ assertStatsTrace(String method, Status.Code status)2230 private void assertStatsTrace(String method, Status.Code status) { 2231 assertStatsTrace(method, status, null, null); 2232 } 2233 assertZeroRetryRecorded()2234 private void assertZeroRetryRecorded() { 2235 MetricsRecord retryRecord = clientStatsRecorder.pollRecord(); 2236 assertThat(retryRecord.getMetric(RETRIES_PER_CALL)).isEqualTo(0); 2237 assertThat(retryRecord.getMetric(TRANSPARENT_RETRIES_PER_CALL)).isEqualTo(0); 2238 assertThat(retryRecord.getMetric(RETRY_DELAY_PER_CALL)).isEqualTo(0D); 2239 } 2240 assertClientStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2241 private void assertClientStatsTrace(String method, Status.Code code, 2242 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 2243 // Tracer-based stats 2244 TestClientStreamTracer tracer = clientStreamTracers.poll(); 2245 assertNotNull(tracer); 2246 assertTrue(tracer.getOutboundHeaders()); 2247 // assertClientStatsTrace() is called right after application receives status, 2248 // but streamClosed() may be called slightly later than that. So we need a timeout. 2249 try { 2250 assertTrue(tracer.await(5, TimeUnit.SECONDS)); 2251 } catch (InterruptedException e) { 2252 throw new AssertionError(e); 2253 } 2254 assertEquals(code, tracer.getStatus().getCode()); 2255 2256 if (requests != null && responses != null) { 2257 checkTracers(tracer, requests, responses); 2258 } 2259 if (metricsExpected()) { 2260 // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be 2261 // done before application receives status. 2262 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(); 2263 checkStartTags(clientStartRecord, method, true); 2264 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(); 2265 checkEndTags(clientEndRecord, method, code, true); 2266 2267 if (requests != null && responses != null) { 2268 checkCensus(clientEndRecord, false, requests, responses); 2269 } 2270 assertZeroRetryRecorded(); 2271 } 2272 } 2273 assertClientStatsTrace(String method, Status.Code status)2274 private void assertClientStatsTrace(String method, Status.Code status) { 2275 assertClientStatsTrace(method, status, null, null); 2276 } 2277 2278 @SuppressWarnings("AssertionFailureIgnored") // Failure is checked in the end by the passed flag. assertServerStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2279 private void assertServerStatsTrace(String method, Status.Code code, 2280 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 2281 if (server == null) { 2282 // Server is not in the same process. We can't check server-side stats. 2283 return; 2284 } 2285 2286 if (metricsExpected()) { 2287 MetricsRecord serverStartRecord; 2288 MetricsRecord serverEndRecord; 2289 try { 2290 // On the server, the stats is finalized in ServerStreamListener.closed(), which can be 2291 // run after the client receives the final status. So we use a timeout. 2292 serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 2293 serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 2294 } catch (InterruptedException e) { 2295 throw new RuntimeException(e); 2296 } 2297 assertNotNull(serverStartRecord); 2298 assertNotNull(serverEndRecord); 2299 checkStartTags(serverStartRecord, method, false); 2300 checkEndTags(serverEndRecord, method, code, false); 2301 if (requests != null && responses != null) { 2302 checkCensus(serverEndRecord, true, requests, responses); 2303 } 2304 } 2305 2306 ServerStreamTracerInfo tracerInfo; 2307 tracerInfo = serverStreamTracers.poll(); 2308 assertNotNull(tracerInfo); 2309 assertEquals(method, tracerInfo.fullMethodName); 2310 assertNotNull(tracerInfo.tracer.contextCapture); 2311 // On the server, streamClosed() may be called after the client receives the final status. 2312 // So we use a timeout. 2313 try { 2314 assertTrue(tracerInfo.tracer.await(1, TimeUnit.SECONDS)); 2315 } catch (InterruptedException e) { 2316 throw new AssertionError(e); 2317 } 2318 assertEquals(code, tracerInfo.tracer.getStatus().getCode()); 2319 if (requests != null && responses != null) { 2320 checkTracers(tracerInfo.tracer, responses, requests); 2321 } 2322 } 2323 checkStartTags(MetricsRecord record, String methodName, boolean clientSide)2324 private static void checkStartTags(MetricsRecord record, String methodName, boolean clientSide) { 2325 assertNotNull("record is not null", record); 2326 2327 TagKey methodNameTagKey = clientSide 2328 ? RpcMeasureConstants.GRPC_CLIENT_METHOD 2329 : RpcMeasureConstants.GRPC_SERVER_METHOD; 2330 TagValue methodNameTag = record.tags.get(methodNameTagKey); 2331 assertNotNull("method name tagged", methodNameTag); 2332 assertEquals("method names match", methodName, methodNameTag.asString()); 2333 } 2334 checkEndTags( MetricsRecord record, String methodName, Status.Code status, boolean clientSide)2335 private static void checkEndTags( 2336 MetricsRecord record, String methodName, Status.Code status, boolean clientSide) { 2337 assertNotNull("record is not null", record); 2338 2339 TagKey methodNameTagKey = clientSide 2340 ? RpcMeasureConstants.GRPC_CLIENT_METHOD 2341 : RpcMeasureConstants.GRPC_SERVER_METHOD; 2342 TagValue methodNameTag = record.tags.get(methodNameTagKey); 2343 assertNotNull("method name tagged", methodNameTag); 2344 assertEquals("method names match", methodName, methodNameTag.asString()); 2345 2346 TagKey statusTagKey = clientSide 2347 ? RpcMeasureConstants.GRPC_CLIENT_STATUS 2348 : RpcMeasureConstants.GRPC_SERVER_STATUS; 2349 TagValue statusTag = record.tags.get(statusTagKey); 2350 assertNotNull("status tagged", statusTag); 2351 assertEquals(status.toString(), statusTag.asString()); 2352 } 2353 2354 /** 2355 * Check information recorded by tracers. 2356 */ checkTracers( TestStreamTracer tracer, Collection<? extends MessageLite> sentMessages, Collection<? extends MessageLite> receivedMessages)2357 private void checkTracers( 2358 TestStreamTracer tracer, 2359 Collection<? extends MessageLite> sentMessages, 2360 Collection<? extends MessageLite> receivedMessages) { 2361 long uncompressedSentSize = 0; 2362 int seqNo = 0; 2363 for (MessageLite msg : sentMessages) { 2364 assertThat(tracer.nextOutboundEvent()) 2365 .isEqualTo(String.format(Locale.US, "outboundMessage(%d)", seqNo)); 2366 assertThat(tracer.nextOutboundEvent()).matches( 2367 String.format(Locale.US, "outboundMessageSent\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); 2368 seqNo++; 2369 uncompressedSentSize += msg.getSerializedSize(); 2370 } 2371 assertNull(tracer.nextOutboundEvent()); 2372 long uncompressedReceivedSize = 0; 2373 seqNo = 0; 2374 for (MessageLite msg : receivedMessages) { 2375 assertThat(tracer.nextInboundEvent()) 2376 .isEqualTo(String.format(Locale.US, "inboundMessage(%d)", seqNo)); 2377 assertThat(tracer.nextInboundEvent()).matches( 2378 String.format(Locale.US, "inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); 2379 uncompressedReceivedSize += msg.getSerializedSize(); 2380 seqNo++; 2381 } 2382 assertNull(tracer.nextInboundEvent()); 2383 if (metricsExpected()) { 2384 assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize()); 2385 assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize()); 2386 } 2387 } 2388 2389 /** 2390 * Check information recorded by Census. 2391 */ checkCensus(MetricsRecord record, boolean isServer, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2392 private void checkCensus(MetricsRecord record, boolean isServer, 2393 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 2394 int uncompressedRequestsSize = 0; 2395 for (MessageLite request : requests) { 2396 uncompressedRequestsSize += request.getSerializedSize(); 2397 } 2398 int uncompressedResponsesSize = 0; 2399 for (MessageLite response : responses) { 2400 uncompressedResponsesSize += response.getSerializedSize(); 2401 } 2402 if (isServer) { 2403 assertEquals( 2404 requests.size(), 2405 record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_SERVER_RECEIVED_MESSAGES_PER_RPC)); 2406 assertEquals( 2407 responses.size(), 2408 record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_SERVER_SENT_MESSAGES_PER_RPC)); 2409 assertEquals( 2410 uncompressedRequestsSize, 2411 record.getMetricAsLongOrFail( 2412 DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); 2413 assertEquals( 2414 uncompressedResponsesSize, 2415 record.getMetricAsLongOrFail( 2416 DeprecatedCensusConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); 2417 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_SERVER_LATENCY)); 2418 // It's impossible to get the expected wire sizes because it may be compressed, so we just 2419 // check if they are recorded. 2420 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_RECEIVED_BYTES_PER_RPC)); 2421 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_SERVER_SENT_BYTES_PER_RPC)); 2422 } else { 2423 assertEquals( 2424 requests.size(), 2425 record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_SENT_MESSAGES_PER_RPC)); 2426 assertEquals( 2427 responses.size(), 2428 record.getMetricAsLongOrFail(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_MESSAGES_PER_RPC)); 2429 assertEquals( 2430 uncompressedRequestsSize, 2431 record.getMetricAsLongOrFail( 2432 DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); 2433 assertEquals( 2434 uncompressedResponsesSize, 2435 record.getMetricAsLongOrFail( 2436 DeprecatedCensusConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); 2437 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY)); 2438 // It's impossible to get the expected wire sizes because it may be compressed, so we just 2439 // check if they are recorded. 2440 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_SENT_BYTES_PER_RPC)); 2441 assertNotNull(record.getMetric(RpcMeasureConstants.GRPC_CLIENT_RECEIVED_BYTES_PER_RPC)); 2442 } 2443 } 2444 2445 // Helper methods for responses containing Payload since proto equals does not ignore deprecated 2446 // fields (PayloadType). assertResponses( Collection<StreamingOutputCallResponse> expected, Collection<StreamingOutputCallResponse> actual)2447 private void assertResponses( 2448 Collection<StreamingOutputCallResponse> expected, 2449 Collection<StreamingOutputCallResponse> actual) { 2450 assertSame(expected.size(), actual.size()); 2451 Iterator<StreamingOutputCallResponse> expectedIter = expected.iterator(); 2452 Iterator<StreamingOutputCallResponse> actualIter = actual.iterator(); 2453 2454 while (expectedIter.hasNext()) { 2455 assertResponse(expectedIter.next(), actualIter.next()); 2456 } 2457 } 2458 assertResponse( StreamingOutputCallResponse expected, StreamingOutputCallResponse actual)2459 private void assertResponse( 2460 StreamingOutputCallResponse expected, StreamingOutputCallResponse actual) { 2461 if (expected == null || actual == null) { 2462 assertEquals(expected, actual); 2463 } else { 2464 assertPayload(expected.getPayload(), actual.getPayload()); 2465 } 2466 } 2467 assertResponse(SimpleResponse expected, SimpleResponse actual)2468 private void assertResponse(SimpleResponse expected, SimpleResponse actual) { 2469 assertPayload(expected.getPayload(), actual.getPayload()); 2470 assertEquals(expected.getUsername(), actual.getUsername()); 2471 assertEquals(expected.getOauthScope(), actual.getOauthScope()); 2472 } 2473 assertPayload(Payload expected, Payload actual)2474 private void assertPayload(Payload expected, Payload actual) { 2475 // Compare non deprecated fields in Payload, to make this test forward compatible. 2476 if (expected == null || actual == null) { 2477 assertEquals(expected, actual); 2478 } else { 2479 assertEquals(expected.getBody(), actual.getBody()); 2480 } 2481 } 2482 2483 /** 2484 * Captures the request attributes. Useful for testing ServerCalls. 2485 * {@link ServerCall#getAttributes()} 2486 */ recordServerCallInterceptor( final AtomicReference<ServerCall<?, ?>> serverCallCapture)2487 private static ServerInterceptor recordServerCallInterceptor( 2488 final AtomicReference<ServerCall<?, ?>> serverCallCapture) { 2489 return new ServerInterceptor() { 2490 @Override 2491 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 2492 ServerCall<ReqT, RespT> call, 2493 Metadata requestHeaders, 2494 ServerCallHandler<ReqT, RespT> next) { 2495 serverCallCapture.set(call); 2496 return next.startCall(call, requestHeaders); 2497 } 2498 }; 2499 } 2500 2501 /** 2502 * Captures the request attributes. Useful for testing ClientCalls. 2503 * {@link ClientCall#getAttributes()} 2504 */ 2505 private static ClientInterceptor recordClientCallInterceptor( 2506 final AtomicReference<ClientCall<?, ?>> clientCallCapture) { 2507 return new ClientInterceptor() { 2508 @Override 2509 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 2510 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 2511 ClientCall<ReqT, RespT> clientCall = next.newCall(method,callOptions); 2512 clientCallCapture.set(clientCall); 2513 return clientCall; 2514 } 2515 }; 2516 } 2517 2518 private static ServerInterceptor recordContextInterceptor( 2519 final AtomicReference<Context> contextCapture) { 2520 return new ServerInterceptor() { 2521 @Override 2522 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 2523 ServerCall<ReqT, RespT> call, 2524 Metadata requestHeaders, 2525 ServerCallHandler<ReqT, RespT> next) { 2526 contextCapture.set(Context.current()); 2527 return next.startCall(call, requestHeaders); 2528 } 2529 }; 2530 } 2531 2532 /** 2533 * A marshaller that record input and output sizes. 2534 */ 2535 private static final class ByteSizeMarshaller<T> implements MethodDescriptor.Marshaller<T> { 2536 2537 private final MethodDescriptor.Marshaller<T> delegate; 2538 volatile int lastOutSize; 2539 volatile int lastInSize; 2540 2541 ByteSizeMarshaller(MethodDescriptor.Marshaller<T> delegate) { 2542 this.delegate = delegate; 2543 } 2544 2545 @Override 2546 public InputStream stream(T value) { 2547 InputStream is = delegate.stream(value); 2548 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2549 try { 2550 lastOutSize = (int) ByteStreams.copy(is, baos); 2551 } catch (IOException e) { 2552 throw new RuntimeException(e); 2553 } 2554 return new ByteArrayInputStream(baos.toByteArray()); 2555 } 2556 2557 @Override 2558 public T parse(InputStream stream) { 2559 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2560 try { 2561 lastInSize = (int) ByteStreams.copy(stream, baos); 2562 } catch (IOException e) { 2563 throw new RuntimeException(e); 2564 } 2565 return delegate.parse(new ByteArrayInputStream(baos.toByteArray())); 2566 } 2567 } 2568 } 2569