1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.stub.ClientCalls.blockingServerStreamingCall; 21 import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE; 22 import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; 23 import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; 24 import static org.junit.Assert.assertEquals; 25 import static org.junit.Assert.assertFalse; 26 import static org.junit.Assert.assertNotEquals; 27 import static org.junit.Assert.assertNotNull; 28 import static org.junit.Assert.assertNull; 29 import static org.junit.Assert.assertTrue; 30 import static org.junit.Assert.fail; 31 import static org.mockito.Mockito.mock; 32 import static org.mockito.Mockito.timeout; 33 34 import com.google.auth.oauth2.AccessToken; 35 import com.google.auth.oauth2.ComputeEngineCredentials; 36 import com.google.auth.oauth2.GoogleCredentials; 37 import com.google.auth.oauth2.OAuth2Credentials; 38 import com.google.auth.oauth2.ServiceAccountCredentials; 39 import com.google.common.annotations.VisibleForTesting; 40 import com.google.common.base.Throwables; 41 import com.google.common.collect.ImmutableList; 42 import com.google.common.collect.Lists; 43 import com.google.common.io.ByteStreams; 44 import com.google.common.util.concurrent.SettableFuture; 45 import com.google.protobuf.BoolValue; 46 import com.google.protobuf.ByteString; 47 import com.google.protobuf.MessageLite; 48 import io.grpc.CallOptions; 49 import io.grpc.Channel; 50 import io.grpc.ClientCall; 51 import io.grpc.ClientInterceptor; 52 import io.grpc.ClientInterceptors; 53 import io.grpc.ClientStreamTracer; 54 import io.grpc.Context; 55 import io.grpc.Grpc; 56 import io.grpc.ManagedChannel; 57 import io.grpc.Metadata; 58 import io.grpc.MethodDescriptor; 59 import io.grpc.Server; 60 import io.grpc.ServerCall; 61 import io.grpc.ServerCallHandler; 62 import io.grpc.ServerInterceptor; 63 import io.grpc.ServerInterceptors; 64 import io.grpc.ServerStreamTracer; 65 import io.grpc.Status; 66 import io.grpc.StatusRuntimeException; 67 import io.grpc.auth.MoreCallCredentials; 68 import io.grpc.internal.AbstractServerImplBuilder; 69 import io.grpc.internal.CensusStatsModule; 70 import io.grpc.internal.GrpcUtil; 71 import io.grpc.internal.testing.StatsTestUtils; 72 import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; 73 import io.grpc.internal.testing.StatsTestUtils.FakeTagContext; 74 import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; 75 import io.grpc.internal.testing.StatsTestUtils.FakeTagger; 76 import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; 77 import io.grpc.internal.testing.StreamRecorder; 78 import io.grpc.internal.testing.TestClientStreamTracer; 79 import io.grpc.internal.testing.TestServerStreamTracer; 80 import io.grpc.internal.testing.TestStreamTracer; 81 import io.grpc.stub.ClientCallStreamObserver; 82 import io.grpc.stub.ClientCalls; 83 import io.grpc.stub.MetadataUtils; 84 import io.grpc.stub.StreamObserver; 85 import io.grpc.testing.TestUtils; 86 import io.grpc.testing.integration.EmptyProtos.Empty; 87 import io.grpc.testing.integration.Messages.EchoStatus; 88 import io.grpc.testing.integration.Messages.Payload; 89 import io.grpc.testing.integration.Messages.PayloadType; 90 import io.grpc.testing.integration.Messages.ResponseParameters; 91 import io.grpc.testing.integration.Messages.SimpleRequest; 92 import io.grpc.testing.integration.Messages.SimpleResponse; 93 import io.grpc.testing.integration.Messages.StreamingInputCallRequest; 94 import io.grpc.testing.integration.Messages.StreamingInputCallResponse; 95 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 96 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 97 import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; 98 import io.opencensus.tags.TagKey; 99 import io.opencensus.tags.TagValue; 100 import io.opencensus.trace.Span; 101 import io.opencensus.trace.SpanContext; 102 import io.opencensus.trace.Tracing; 103 import io.opencensus.trace.unsafe.ContextUtils; 104 import java.io.ByteArrayInputStream; 105 import java.io.ByteArrayOutputStream; 106 import java.io.IOException; 107 import java.io.InputStream; 108 import java.net.SocketAddress; 109 import java.security.cert.Certificate; 110 import java.security.cert.X509Certificate; 111 import java.util.ArrayList; 112 import java.util.Arrays; 113 import java.util.Collection; 114 import java.util.Collections; 115 import java.util.List; 116 import java.util.Map; 117 import java.util.concurrent.ArrayBlockingQueue; 118 import java.util.concurrent.Executors; 119 import java.util.concurrent.LinkedBlockingQueue; 120 import java.util.concurrent.ScheduledExecutorService; 121 import java.util.concurrent.TimeUnit; 122 import java.util.concurrent.atomic.AtomicReference; 123 import java.util.logging.Level; 124 import java.util.logging.Logger; 125 import javax.annotation.Nullable; 126 import javax.net.ssl.SSLPeerUnverifiedException; 127 import javax.net.ssl.SSLSession; 128 import org.junit.After; 129 import org.junit.Assert; 130 import org.junit.Assume; 131 import org.junit.Before; 132 import org.junit.Rule; 133 import org.junit.Test; 134 import org.junit.rules.Timeout; 135 import org.mockito.ArgumentCaptor; 136 import org.mockito.Mockito; 137 import org.mockito.verification.VerificationMode; 138 139 /** 140 * Abstract base class for all GRPC transport tests. 141 * 142 * <p> New tests should avoid using Mockito to support running on AppEngine.</p> 143 */ 144 public abstract class AbstractInteropTest { 145 private static Logger logger = Logger.getLogger(AbstractInteropTest.class.getName()); 146 147 @Rule public final Timeout globalTimeout = Timeout.seconds(30); 148 149 /** Must be at least {@link #unaryPayloadLength()}, plus some to account for encoding overhead. */ 150 public static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024; 151 152 private static final FakeTagger tagger = new FakeTagger(); 153 private static final FakeTagContextBinarySerializer tagContextBinarySerializer = 154 new FakeTagContextBinarySerializer(); 155 156 private final AtomicReference<ServerCall<?, ?>> serverCallCapture = 157 new AtomicReference<ServerCall<?, ?>>(); 158 private final AtomicReference<Metadata> requestHeadersCapture = 159 new AtomicReference<Metadata>(); 160 private final AtomicReference<Context> contextCapture = 161 new AtomicReference<Context>(); 162 private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); 163 private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder(); 164 165 private ScheduledExecutorService testServiceExecutor; 166 private Server server; 167 168 private final LinkedBlockingQueue<ServerStreamTracerInfo> serverStreamTracers = 169 new LinkedBlockingQueue<ServerStreamTracerInfo>(); 170 171 private static final class ServerStreamTracerInfo { 172 final String fullMethodName; 173 final InteropServerStreamTracer tracer; 174 ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer)175 ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer) { 176 this.fullMethodName = fullMethodName; 177 this.tracer = tracer; 178 } 179 180 private static final class InteropServerStreamTracer extends TestServerStreamTracer { 181 private volatile Context contextCapture; 182 183 @Override filterContext(Context context)184 public Context filterContext(Context context) { 185 contextCapture = context; 186 return super.filterContext(context); 187 } 188 } 189 } 190 191 private final ServerStreamTracer.Factory serverStreamTracerFactory = 192 new ServerStreamTracer.Factory() { 193 @Override 194 public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { 195 ServerStreamTracerInfo.InteropServerStreamTracer tracer 196 = new ServerStreamTracerInfo.InteropServerStreamTracer(); 197 serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer)); 198 return tracer; 199 } 200 }; 201 202 protected static final Empty EMPTY = Empty.getDefaultInstance(); 203 startServer()204 private void startServer() { 205 AbstractServerImplBuilder<?> builder = getServerBuilder(); 206 if (builder == null) { 207 server = null; 208 return; 209 } 210 testServiceExecutor = Executors.newScheduledThreadPool(2); 211 212 List<ServerInterceptor> allInterceptors = ImmutableList.<ServerInterceptor>builder() 213 .add(recordServerCallInterceptor(serverCallCapture)) 214 .add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture)) 215 .add(recordContextInterceptor(contextCapture)) 216 .addAll(TestServiceImpl.interceptors()) 217 .build(); 218 219 builder 220 .addService( 221 ServerInterceptors.intercept( 222 new TestServiceImpl(testServiceExecutor), 223 allInterceptors)) 224 .addStreamTracerFactory(serverStreamTracerFactory); 225 io.grpc.internal.TestingAccessor.setStatsImplementation( 226 builder, 227 new CensusStatsModule( 228 tagger, 229 tagContextBinarySerializer, 230 serverStatsRecorder, 231 GrpcUtil.STOPWATCH_SUPPLIER, 232 true)); 233 try { 234 server = builder.build().start(); 235 } catch (IOException ex) { 236 throw new RuntimeException(ex); 237 } 238 } 239 stopServer()240 private void stopServer() { 241 if (server != null) { 242 server.shutdownNow(); 243 } 244 if (testServiceExecutor != null) { 245 testServiceExecutor.shutdown(); 246 } 247 } 248 249 @VisibleForTesting getPort()250 final int getPort() { 251 return server.getPort(); 252 } 253 254 protected ManagedChannel channel; 255 protected TestServiceGrpc.TestServiceBlockingStub blockingStub; 256 protected TestServiceGrpc.TestServiceStub asyncStub; 257 258 private final LinkedBlockingQueue<TestClientStreamTracer> clientStreamTracers = 259 new LinkedBlockingQueue<TestClientStreamTracer>(); 260 261 private final ClientStreamTracer.Factory clientStreamTracerFactory = 262 new ClientStreamTracer.Factory() { 263 @Override 264 public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { 265 TestClientStreamTracer tracer = new TestClientStreamTracer(); 266 clientStreamTracers.add(tracer); 267 return tracer; 268 } 269 }; 270 private final ClientInterceptor tracerSetupInterceptor = new ClientInterceptor() { 271 @Override 272 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 273 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 274 return next.newCall( 275 method, callOptions.withStreamTracerFactory(clientStreamTracerFactory)); 276 } 277 }; 278 279 /** 280 * Must be called by the subclass setup method if overridden. 281 */ 282 @Before setUp()283 public void setUp() { 284 startServer(); 285 channel = createChannel(); 286 287 blockingStub = 288 TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); 289 asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor); 290 291 ClientInterceptor[] additionalInterceptors = getAdditionalInterceptors(); 292 if (additionalInterceptors != null) { 293 blockingStub = blockingStub.withInterceptors(additionalInterceptors); 294 asyncStub = asyncStub.withInterceptors(additionalInterceptors); 295 } 296 297 requestHeadersCapture.set(null); 298 } 299 300 /** Clean up. */ 301 @After tearDown()302 public void tearDown() { 303 if (channel != null) { 304 channel.shutdownNow(); 305 try { 306 channel.awaitTermination(1, TimeUnit.SECONDS); 307 } catch (InterruptedException ie) { 308 logger.log(Level.FINE, "Interrupted while waiting for channel termination", ie); 309 // Best effort. If there is an interruption, we want to continue cleaning up, but quickly 310 Thread.currentThread().interrupt(); 311 } 312 } 313 stopServer(); 314 } 315 createChannel()316 protected abstract ManagedChannel createChannel(); 317 318 @Nullable getAdditionalInterceptors()319 protected ClientInterceptor[] getAdditionalInterceptors() { 320 return null; 321 } 322 323 /** 324 * Returns the server builder used to create server for each test run. Return {@code null} if 325 * it shouldn't start a server in the same process. 326 */ 327 @Nullable getServerBuilder()328 protected AbstractServerImplBuilder<?> getServerBuilder() { 329 return null; 330 } 331 createClientCensusStatsModule()332 protected final CensusStatsModule createClientCensusStatsModule() { 333 return new CensusStatsModule( 334 tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true); 335 } 336 337 /** 338 * Return true if exact metric values should be checked. 339 */ metricsExpected()340 protected boolean metricsExpected() { 341 return true; 342 } 343 344 @Test emptyUnary()345 public void emptyUnary() throws Exception { 346 assertEquals(EMPTY, blockingStub.emptyCall(EMPTY)); 347 } 348 349 /** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */ cacheableUnary()350 public void cacheableUnary() { 351 // Set safe to true. 352 MethodDescriptor<SimpleRequest, SimpleResponse> safeCacheableUnaryCallMethod = 353 TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build(); 354 // Set fake user IP since some proxies (GFE) won't cache requests from localhost. 355 Metadata.Key<String> userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER); 356 Metadata metadata = new Metadata(); 357 metadata.put(userIpKey, "1.2.3.4"); 358 Channel channelWithUserIpKey = 359 ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata)); 360 SimpleRequest requests1And2 = 361 SimpleRequest.newBuilder() 362 .setPayload( 363 Payload.newBuilder() 364 .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime())))) 365 .build(); 366 SimpleRequest request3 = 367 SimpleRequest.newBuilder() 368 .setPayload( 369 Payload.newBuilder() 370 .setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime())))) 371 .build(); 372 373 SimpleResponse response1 = 374 ClientCalls.blockingUnaryCall( 375 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2); 376 SimpleResponse response2 = 377 ClientCalls.blockingUnaryCall( 378 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2); 379 SimpleResponse response3 = 380 ClientCalls.blockingUnaryCall( 381 channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3); 382 383 assertEquals(response1, response2); 384 assertNotEquals(response1, response3); 385 } 386 387 @Test largeUnary()388 public void largeUnary() throws Exception { 389 assumeEnoughMemory(); 390 final SimpleRequest request = SimpleRequest.newBuilder() 391 .setResponseSize(314159) 392 .setResponseType(PayloadType.COMPRESSABLE) 393 .setPayload(Payload.newBuilder() 394 .setBody(ByteString.copyFrom(new byte[271828]))) 395 .build(); 396 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 397 .setPayload(Payload.newBuilder() 398 .setType(PayloadType.COMPRESSABLE) 399 .setBody(ByteString.copyFrom(new byte[314159]))) 400 .build(); 401 402 assertEquals(goldenResponse, blockingStub.unaryCall(request)); 403 404 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK, 405 Collections.singleton(request), Collections.singleton(goldenResponse)); 406 } 407 408 /** 409 * Tests client per-message compression for unary calls. The Java API does not support inspecting 410 * a message's compression level, so this is primarily intended to run against a gRPC C++ server. 411 */ clientCompressedUnary(boolean probe)412 public void clientCompressedUnary(boolean probe) throws Exception { 413 assumeEnoughMemory(); 414 final SimpleRequest expectCompressedRequest = 415 SimpleRequest.newBuilder() 416 .setExpectCompressed(BoolValue.newBuilder().setValue(true)) 417 .setResponseSize(314159) 418 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 419 .build(); 420 final SimpleRequest expectUncompressedRequest = 421 SimpleRequest.newBuilder() 422 .setExpectCompressed(BoolValue.newBuilder().setValue(false)) 423 .setResponseSize(314159) 424 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 425 .build(); 426 final SimpleResponse goldenResponse = 427 SimpleResponse.newBuilder() 428 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) 429 .build(); 430 431 if (probe) { 432 // Send a non-compressed message with expectCompress=true. Servers supporting this test case 433 // should return INVALID_ARGUMENT. 434 try { 435 blockingStub.unaryCall(expectCompressedRequest); 436 fail("expected INVALID_ARGUMENT"); 437 } catch (StatusRuntimeException e) { 438 assertEquals(Status.INVALID_ARGUMENT.getCode(), e.getStatus().getCode()); 439 } 440 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT); 441 } 442 443 assertEquals( 444 goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest)); 445 assertStatsTrace( 446 "grpc.testing.TestService/UnaryCall", 447 Status.Code.OK, 448 Collections.singleton(expectCompressedRequest), 449 Collections.singleton(goldenResponse)); 450 451 assertEquals(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest)); 452 assertStatsTrace( 453 "grpc.testing.TestService/UnaryCall", 454 Status.Code.OK, 455 Collections.singleton(expectUncompressedRequest), 456 Collections.singleton(goldenResponse)); 457 } 458 459 /** 460 * Tests if the server can send a compressed unary response. Ideally we would assert that the 461 * responses have the requested compression, but this is not supported by the API. Given a 462 * compliant server, this test will exercise the code path for receiving a compressed response but 463 * cannot itself verify that the response was compressed. 464 */ 465 @Test serverCompressedUnary()466 public void serverCompressedUnary() throws Exception { 467 assumeEnoughMemory(); 468 final SimpleRequest responseShouldBeCompressed = 469 SimpleRequest.newBuilder() 470 .setResponseCompressed(BoolValue.newBuilder().setValue(true)) 471 .setResponseSize(314159) 472 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 473 .build(); 474 final SimpleRequest responseShouldBeUncompressed = 475 SimpleRequest.newBuilder() 476 .setResponseCompressed(BoolValue.newBuilder().setValue(false)) 477 .setResponseSize(314159) 478 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828]))) 479 .build(); 480 final SimpleResponse goldenResponse = 481 SimpleResponse.newBuilder() 482 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159]))) 483 .build(); 484 485 assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed)); 486 assertStatsTrace( 487 "grpc.testing.TestService/UnaryCall", 488 Status.Code.OK, 489 Collections.singleton(responseShouldBeCompressed), 490 Collections.singleton(goldenResponse)); 491 492 assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed)); 493 assertStatsTrace( 494 "grpc.testing.TestService/UnaryCall", 495 Status.Code.OK, 496 Collections.singleton(responseShouldBeUncompressed), 497 Collections.singleton(goldenResponse)); 498 } 499 500 @Test serverStreaming()501 public void serverStreaming() throws Exception { 502 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 503 .setResponseType(PayloadType.COMPRESSABLE) 504 .addResponseParameters(ResponseParameters.newBuilder() 505 .setSize(31415)) 506 .addResponseParameters(ResponseParameters.newBuilder() 507 .setSize(9)) 508 .addResponseParameters(ResponseParameters.newBuilder() 509 .setSize(2653)) 510 .addResponseParameters(ResponseParameters.newBuilder() 511 .setSize(58979)) 512 .build(); 513 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 514 StreamingOutputCallResponse.newBuilder() 515 .setPayload(Payload.newBuilder() 516 .setType(PayloadType.COMPRESSABLE) 517 .setBody(ByteString.copyFrom(new byte[31415]))) 518 .build(), 519 StreamingOutputCallResponse.newBuilder() 520 .setPayload(Payload.newBuilder() 521 .setType(PayloadType.COMPRESSABLE) 522 .setBody(ByteString.copyFrom(new byte[9]))) 523 .build(), 524 StreamingOutputCallResponse.newBuilder() 525 .setPayload(Payload.newBuilder() 526 .setType(PayloadType.COMPRESSABLE) 527 .setBody(ByteString.copyFrom(new byte[2653]))) 528 .build(), 529 StreamingOutputCallResponse.newBuilder() 530 .setPayload(Payload.newBuilder() 531 .setType(PayloadType.COMPRESSABLE) 532 .setBody(ByteString.copyFrom(new byte[58979]))) 533 .build()); 534 535 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 536 asyncStub.streamingOutputCall(request, recorder); 537 recorder.awaitCompletion(); 538 assertSuccess(recorder); 539 assertEquals(goldenResponses, recorder.getValues()); 540 } 541 542 @Test clientStreaming()543 public void clientStreaming() throws Exception { 544 final List<StreamingInputCallRequest> requests = Arrays.asList( 545 StreamingInputCallRequest.newBuilder() 546 .setPayload(Payload.newBuilder() 547 .setBody(ByteString.copyFrom(new byte[27182]))) 548 .build(), 549 StreamingInputCallRequest.newBuilder() 550 .setPayload(Payload.newBuilder() 551 .setBody(ByteString.copyFrom(new byte[8]))) 552 .build(), 553 StreamingInputCallRequest.newBuilder() 554 .setPayload(Payload.newBuilder() 555 .setBody(ByteString.copyFrom(new byte[1828]))) 556 .build(), 557 StreamingInputCallRequest.newBuilder() 558 .setPayload(Payload.newBuilder() 559 .setBody(ByteString.copyFrom(new byte[45904]))) 560 .build()); 561 final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder() 562 .setAggregatedPayloadSize(74922) 563 .build(); 564 565 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 566 StreamObserver<StreamingInputCallRequest> requestObserver = 567 asyncStub.streamingInputCall(responseObserver); 568 for (StreamingInputCallRequest request : requests) { 569 requestObserver.onNext(request); 570 } 571 requestObserver.onCompleted(); 572 573 assertEquals(goldenResponse, responseObserver.firstValue().get()); 574 responseObserver.awaitCompletion(); 575 assertThat(responseObserver.getValues()).hasSize(1); 576 Throwable t = responseObserver.getError(); 577 if (t != null) { 578 throw new AssertionError(t); 579 } 580 } 581 582 /** 583 * Tests client per-message compression for streaming calls. The Java API does not support 584 * inspecting a message's compression level, so this is primarily intended to run against a gRPC 585 * C++ server. 586 */ clientCompressedStreaming(boolean probe)587 public void clientCompressedStreaming(boolean probe) throws Exception { 588 final StreamingInputCallRequest expectCompressedRequest = 589 StreamingInputCallRequest.newBuilder() 590 .setExpectCompressed(BoolValue.newBuilder().setValue(true)) 591 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182]))) 592 .build(); 593 final StreamingInputCallRequest expectUncompressedRequest = 594 StreamingInputCallRequest.newBuilder() 595 .setExpectCompressed(BoolValue.newBuilder().setValue(false)) 596 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904]))) 597 .build(); 598 final StreamingInputCallResponse goldenResponse = 599 StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build(); 600 601 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 602 StreamObserver<StreamingInputCallRequest> requestObserver = 603 asyncStub.streamingInputCall(responseObserver); 604 605 if (probe) { 606 // Send a non-compressed message with expectCompress=true. Servers supporting this test case 607 // should return INVALID_ARGUMENT. 608 requestObserver.onNext(expectCompressedRequest); 609 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 610 Throwable e = responseObserver.getError(); 611 assertNotNull("expected INVALID_ARGUMENT", e); 612 assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode()); 613 } 614 615 // Start a new stream 616 responseObserver = StreamRecorder.create(); 617 @SuppressWarnings("unchecked") 618 ClientCallStreamObserver<StreamingInputCallRequest> clientCallStreamObserver = 619 (ClientCallStreamObserver) 620 asyncStub.withCompression("gzip").streamingInputCall(responseObserver); 621 clientCallStreamObserver.setMessageCompression(true); 622 clientCallStreamObserver.onNext(expectCompressedRequest); 623 clientCallStreamObserver.setMessageCompression(false); 624 clientCallStreamObserver.onNext(expectUncompressedRequest); 625 clientCallStreamObserver.onCompleted(); 626 responseObserver.awaitCompletion(); 627 assertSuccess(responseObserver); 628 assertEquals(goldenResponse, responseObserver.firstValue().get()); 629 } 630 631 /** 632 * Tests server per-message compression in a streaming response. Ideally we would assert that the 633 * responses have the requested compression, but this is not supported by the API. Given a 634 * compliant server, this test will exercise the code path for receiving a compressed response but 635 * cannot itself verify that the response was compressed. 636 */ serverCompressedStreaming()637 public void serverCompressedStreaming() throws Exception { 638 final StreamingOutputCallRequest request = 639 StreamingOutputCallRequest.newBuilder() 640 .addResponseParameters( 641 ResponseParameters.newBuilder() 642 .setCompressed(BoolValue.newBuilder().setValue(true)) 643 .setSize(31415)) 644 .addResponseParameters( 645 ResponseParameters.newBuilder() 646 .setCompressed(BoolValue.newBuilder().setValue(false)) 647 .setSize(92653)) 648 .build(); 649 final List<StreamingOutputCallResponse> goldenResponses = 650 Arrays.asList( 651 StreamingOutputCallResponse.newBuilder() 652 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415]))) 653 .build(), 654 StreamingOutputCallResponse.newBuilder() 655 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653]))) 656 .build()); 657 658 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 659 asyncStub.streamingOutputCall(request, recorder); 660 recorder.awaitCompletion(); 661 assertSuccess(recorder); 662 assertEquals(goldenResponses, recorder.getValues()); 663 } 664 665 @Test pingPong()666 public void pingPong() throws Exception { 667 final List<StreamingOutputCallRequest> requests = Arrays.asList( 668 StreamingOutputCallRequest.newBuilder() 669 .addResponseParameters(ResponseParameters.newBuilder() 670 .setSize(31415)) 671 .setPayload(Payload.newBuilder() 672 .setBody(ByteString.copyFrom(new byte[27182]))) 673 .build(), 674 StreamingOutputCallRequest.newBuilder() 675 .addResponseParameters(ResponseParameters.newBuilder() 676 .setSize(9)) 677 .setPayload(Payload.newBuilder() 678 .setBody(ByteString.copyFrom(new byte[8]))) 679 .build(), 680 StreamingOutputCallRequest.newBuilder() 681 .addResponseParameters(ResponseParameters.newBuilder() 682 .setSize(2653)) 683 .setPayload(Payload.newBuilder() 684 .setBody(ByteString.copyFrom(new byte[1828]))) 685 .build(), 686 StreamingOutputCallRequest.newBuilder() 687 .addResponseParameters(ResponseParameters.newBuilder() 688 .setSize(58979)) 689 .setPayload(Payload.newBuilder() 690 .setBody(ByteString.copyFrom(new byte[45904]))) 691 .build()); 692 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 693 StreamingOutputCallResponse.newBuilder() 694 .setPayload(Payload.newBuilder() 695 .setType(PayloadType.COMPRESSABLE) 696 .setBody(ByteString.copyFrom(new byte[31415]))) 697 .build(), 698 StreamingOutputCallResponse.newBuilder() 699 .setPayload(Payload.newBuilder() 700 .setType(PayloadType.COMPRESSABLE) 701 .setBody(ByteString.copyFrom(new byte[9]))) 702 .build(), 703 StreamingOutputCallResponse.newBuilder() 704 .setPayload(Payload.newBuilder() 705 .setType(PayloadType.COMPRESSABLE) 706 .setBody(ByteString.copyFrom(new byte[2653]))) 707 .build(), 708 StreamingOutputCallResponse.newBuilder() 709 .setPayload(Payload.newBuilder() 710 .setType(PayloadType.COMPRESSABLE) 711 .setBody(ByteString.copyFrom(new byte[58979]))) 712 .build()); 713 714 final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(5); 715 StreamObserver<StreamingOutputCallRequest> requestObserver 716 = asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() { 717 @Override 718 public void onNext(StreamingOutputCallResponse response) { 719 queue.add(response); 720 } 721 722 @Override 723 public void onError(Throwable t) { 724 queue.add(t); 725 } 726 727 @Override 728 public void onCompleted() { 729 queue.add("Completed"); 730 } 731 }); 732 for (int i = 0; i < requests.size(); i++) { 733 assertNull(queue.peek()); 734 requestObserver.onNext(requests.get(i)); 735 Object actualResponse = queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 736 assertNotNull("Timed out waiting for response", actualResponse); 737 if (actualResponse instanceof Throwable) { 738 throw new AssertionError((Throwable) actualResponse); 739 } 740 assertEquals(goldenResponses.get(i), actualResponse); 741 } 742 requestObserver.onCompleted(); 743 assertEquals("Completed", queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 744 } 745 746 @Test emptyStream()747 public void emptyStream() throws Exception { 748 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 749 StreamObserver<StreamingOutputCallRequest> requestObserver 750 = asyncStub.fullDuplexCall(responseObserver); 751 requestObserver.onCompleted(); 752 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 753 } 754 755 @Test cancelAfterBegin()756 public void cancelAfterBegin() throws Exception { 757 StreamRecorder<StreamingInputCallResponse> responseObserver = StreamRecorder.create(); 758 StreamObserver<StreamingInputCallRequest> requestObserver = 759 asyncStub.streamingInputCall(responseObserver); 760 requestObserver.onError(new RuntimeException()); 761 responseObserver.awaitCompletion(); 762 assertEquals(Arrays.<StreamingInputCallResponse>asList(), responseObserver.getValues()); 763 assertEquals(Status.Code.CANCELLED, 764 Status.fromThrowable(responseObserver.getError()).getCode()); 765 766 if (metricsExpected()) { 767 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 768 checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingInputCall"); 769 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 770 // recorded. The tracer stats rely on the stream being created, which is not always the case 771 // in this test. Therefore we don't check the tracer stats. 772 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 773 checkEndTags( 774 clientEndRecord, "grpc.testing.TestService/StreamingInputCall", 775 Status.CANCELLED.getCode()); 776 // Do not check server-side metrics, because the status on the server side is undetermined. 777 } 778 } 779 780 @Test cancelAfterFirstResponse()781 public void cancelAfterFirstResponse() throws Exception { 782 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 783 .addResponseParameters(ResponseParameters.newBuilder() 784 .setSize(31415)) 785 .setPayload(Payload.newBuilder() 786 .setBody(ByteString.copyFrom(new byte[27182]))) 787 .build(); 788 final StreamingOutputCallResponse goldenResponse = StreamingOutputCallResponse.newBuilder() 789 .setPayload(Payload.newBuilder() 790 .setType(PayloadType.COMPRESSABLE) 791 .setBody(ByteString.copyFrom(new byte[31415]))) 792 .build(); 793 794 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 795 StreamObserver<StreamingOutputCallRequest> requestObserver 796 = asyncStub.fullDuplexCall(responseObserver); 797 requestObserver.onNext(request); 798 assertEquals(goldenResponse, responseObserver.firstValue().get()); 799 requestObserver.onError(new RuntimeException()); 800 responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 801 assertEquals(1, responseObserver.getValues().size()); 802 assertEquals(Status.Code.CANCELLED, 803 Status.fromThrowable(responseObserver.getError()).getCode()); 804 805 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.CANCELLED); 806 } 807 808 @Test fullDuplexCallShouldSucceed()809 public void fullDuplexCallShouldSucceed() throws Exception { 810 // Build the request. 811 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 812 StreamingOutputCallRequest.Builder streamingOutputBuilder = 813 StreamingOutputCallRequest.newBuilder(); 814 streamingOutputBuilder.setResponseType(COMPRESSABLE); 815 for (Integer size : responseSizes) { 816 streamingOutputBuilder.addResponseParameters( 817 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 818 } 819 final StreamingOutputCallRequest request = streamingOutputBuilder.build(); 820 821 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 822 StreamObserver<StreamingOutputCallRequest> requestStream = 823 asyncStub.fullDuplexCall(recorder); 824 825 final int numRequests = 10; 826 List<StreamingOutputCallRequest> requests = 827 new ArrayList<>(numRequests); 828 for (int ix = numRequests; ix > 0; --ix) { 829 requests.add(request); 830 requestStream.onNext(request); 831 } 832 requestStream.onCompleted(); 833 recorder.awaitCompletion(); 834 assertSuccess(recorder); 835 assertEquals(responseSizes.size() * numRequests, recorder.getValues().size()); 836 for (int ix = 0; ix < recorder.getValues().size(); ++ix) { 837 StreamingOutputCallResponse response = recorder.getValues().get(ix); 838 assertEquals(COMPRESSABLE, response.getPayload().getType()); 839 int length = response.getPayload().getBody().size(); 840 int expectedSize = responseSizes.get(ix % responseSizes.size()); 841 assertEquals("comparison failed at index " + ix, expectedSize, length); 842 } 843 844 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, requests, 845 recorder.getValues()); 846 } 847 848 @Test halfDuplexCallShouldSucceed()849 public void halfDuplexCallShouldSucceed() throws Exception { 850 // Build the request. 851 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 852 StreamingOutputCallRequest.Builder streamingOutputBuilder = 853 StreamingOutputCallRequest.newBuilder(); 854 streamingOutputBuilder.setResponseType(COMPRESSABLE); 855 for (Integer size : responseSizes) { 856 streamingOutputBuilder.addResponseParameters( 857 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 858 } 859 final StreamingOutputCallRequest request = streamingOutputBuilder.build(); 860 861 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 862 StreamObserver<StreamingOutputCallRequest> requestStream = asyncStub.halfDuplexCall(recorder); 863 864 final int numRequests = 10; 865 List<StreamingOutputCallRequest> requests = 866 new ArrayList<>(numRequests); 867 for (int ix = numRequests; ix > 0; --ix) { 868 requests.add(request); 869 requestStream.onNext(request); 870 } 871 requestStream.onCompleted(); 872 recorder.awaitCompletion(); 873 assertSuccess(recorder); 874 assertEquals(responseSizes.size() * numRequests, recorder.getValues().size()); 875 for (int ix = 0; ix < recorder.getValues().size(); ++ix) { 876 StreamingOutputCallResponse response = recorder.getValues().get(ix); 877 assertEquals(COMPRESSABLE, response.getPayload().getType()); 878 int length = response.getPayload().getBody().size(); 879 int expectedSize = responseSizes.get(ix % responseSizes.size()); 880 assertEquals("comparison failed at index " + ix, expectedSize, length); 881 } 882 } 883 884 @Test serverStreamingShouldBeFlowControlled()885 public void serverStreamingShouldBeFlowControlled() throws Exception { 886 final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 887 .setResponseType(COMPRESSABLE) 888 .addResponseParameters(ResponseParameters.newBuilder().setSize(100000)) 889 .addResponseParameters(ResponseParameters.newBuilder().setSize(100001)) 890 .build(); 891 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 892 StreamingOutputCallResponse.newBuilder() 893 .setPayload(Payload.newBuilder() 894 .setType(PayloadType.COMPRESSABLE) 895 .setBody(ByteString.copyFrom(new byte[100000]))).build(), 896 StreamingOutputCallResponse.newBuilder() 897 .setPayload(Payload.newBuilder() 898 .setType(PayloadType.COMPRESSABLE) 899 .setBody(ByteString.copyFrom(new byte[100001]))).build()); 900 901 long start = System.nanoTime(); 902 903 final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(10); 904 ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call = 905 channel.newCall(TestServiceGrpc.getStreamingOutputCallMethod(), CallOptions.DEFAULT); 906 call.start(new ClientCall.Listener<StreamingOutputCallResponse>() { 907 @Override 908 public void onHeaders(Metadata headers) {} 909 910 @Override 911 public void onMessage(final StreamingOutputCallResponse message) { 912 queue.add(message); 913 } 914 915 @Override 916 public void onClose(Status status, Metadata trailers) { 917 queue.add(status); 918 } 919 }, new Metadata()); 920 call.sendMessage(request); 921 call.halfClose(); 922 923 // Time how long it takes to get the first response. 924 call.request(1); 925 assertEquals(goldenResponses.get(0), 926 queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 927 long firstCallDuration = System.nanoTime() - start; 928 929 // Without giving additional flow control, make sure that we don't get another response. We wait 930 // until we are comfortable the next message isn't coming. We may have very low nanoTime 931 // resolution (like on Windows) or be using a testing, in-process transport where message 932 // handling is instantaneous. In both cases, firstCallDuration may be 0, so round up sleep time 933 // to at least 1ms. 934 assertNull(queue.poll(Math.max(firstCallDuration * 4, 1 * 1000 * 1000), TimeUnit.NANOSECONDS)); 935 936 // Make sure that everything still completes. 937 call.request(1); 938 assertEquals(goldenResponses.get(1), 939 queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 940 assertEquals(Status.OK, queue.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 941 } 942 943 @Test veryLargeRequest()944 public void veryLargeRequest() throws Exception { 945 assumeEnoughMemory(); 946 final SimpleRequest request = SimpleRequest.newBuilder() 947 .setPayload(Payload.newBuilder() 948 .setType(PayloadType.COMPRESSABLE) 949 .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()]))) 950 .setResponseSize(10) 951 .setResponseType(PayloadType.COMPRESSABLE) 952 .build(); 953 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 954 .setPayload(Payload.newBuilder() 955 .setType(PayloadType.COMPRESSABLE) 956 .setBody(ByteString.copyFrom(new byte[10]))) 957 .build(); 958 959 assertEquals(goldenResponse, blockingStub.unaryCall(request)); 960 } 961 962 @Test veryLargeResponse()963 public void veryLargeResponse() throws Exception { 964 assumeEnoughMemory(); 965 final SimpleRequest request = SimpleRequest.newBuilder() 966 .setResponseSize(unaryPayloadLength()) 967 .setResponseType(PayloadType.COMPRESSABLE) 968 .build(); 969 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 970 .setPayload(Payload.newBuilder() 971 .setType(PayloadType.COMPRESSABLE) 972 .setBody(ByteString.copyFrom(new byte[unaryPayloadLength()]))) 973 .build(); 974 975 assertEquals(goldenResponse, blockingStub.unaryCall(request)); 976 } 977 978 @Test exchangeMetadataUnaryCall()979 public void exchangeMetadataUnaryCall() throws Exception { 980 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub; 981 982 // Capture the metadata exchange 983 Metadata fixedHeaders = new Metadata(); 984 // Send a context proto (as it's in the default extension registry) 985 Messages.SimpleContext contextValue = 986 Messages.SimpleContext.newBuilder().setValue("dog").build(); 987 fixedHeaders.put(Util.METADATA_KEY, contextValue); 988 stub = MetadataUtils.attachHeaders(stub, fixedHeaders); 989 // .. and expect it to be echoed back in trailers 990 AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>(); 991 AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>(); 992 stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture); 993 994 assertNotNull(stub.emptyCall(EMPTY)); 995 996 // Assert that our side channel object is echoed back in both headers and trailers 997 Assert.assertEquals(contextValue, headersCapture.get().get(Util.METADATA_KEY)); 998 Assert.assertEquals(contextValue, trailersCapture.get().get(Util.METADATA_KEY)); 999 } 1000 1001 @Test exchangeMetadataStreamingCall()1002 public void exchangeMetadataStreamingCall() throws Exception { 1003 TestServiceGrpc.TestServiceStub stub = asyncStub; 1004 1005 // Capture the metadata exchange 1006 Metadata fixedHeaders = new Metadata(); 1007 // Send a context proto (as it's in the default extension registry) 1008 Messages.SimpleContext contextValue = 1009 Messages.SimpleContext.newBuilder().setValue("dog").build(); 1010 fixedHeaders.put(Util.METADATA_KEY, contextValue); 1011 stub = MetadataUtils.attachHeaders(stub, fixedHeaders); 1012 // .. and expect it to be echoed back in trailers 1013 AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>(); 1014 AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>(); 1015 stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture); 1016 1017 List<Integer> responseSizes = Arrays.asList(50, 100, 150, 200); 1018 Messages.StreamingOutputCallRequest.Builder streamingOutputBuilder = 1019 Messages.StreamingOutputCallRequest.newBuilder(); 1020 streamingOutputBuilder.setResponseType(COMPRESSABLE); 1021 for (Integer size : responseSizes) { 1022 streamingOutputBuilder.addResponseParameters( 1023 ResponseParameters.newBuilder().setSize(size).setIntervalUs(0)); 1024 } 1025 final Messages.StreamingOutputCallRequest request = streamingOutputBuilder.build(); 1026 1027 StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1028 StreamObserver<Messages.StreamingOutputCallRequest> requestStream = 1029 stub.fullDuplexCall(recorder); 1030 1031 final int numRequests = 10; 1032 List<StreamingOutputCallRequest> requests = 1033 new ArrayList<>(numRequests); 1034 1035 for (int ix = numRequests; ix > 0; --ix) { 1036 requests.add(request); 1037 requestStream.onNext(request); 1038 } 1039 requestStream.onCompleted(); 1040 recorder.awaitCompletion(); 1041 assertSuccess(recorder); 1042 org.junit.Assert.assertEquals(responseSizes.size() * numRequests, recorder.getValues().size()); 1043 1044 // Assert that our side channel object is echoed back in both headers and trailers 1045 Assert.assertEquals(contextValue, headersCapture.get().get(Util.METADATA_KEY)); 1046 Assert.assertEquals(contextValue, trailersCapture.get().get(Util.METADATA_KEY)); 1047 } 1048 1049 @Test sendsTimeoutHeader()1050 public void sendsTimeoutHeader() { 1051 Assume.assumeTrue("can not capture request headers on server side", server != null); 1052 long configuredTimeoutMinutes = 100; 1053 TestServiceGrpc.TestServiceBlockingStub stub = 1054 blockingStub.withDeadlineAfter(configuredTimeoutMinutes, TimeUnit.MINUTES); 1055 stub.emptyCall(EMPTY); 1056 long transferredTimeoutMinutes = TimeUnit.NANOSECONDS.toMinutes( 1057 requestHeadersCapture.get().get(GrpcUtil.TIMEOUT_KEY)); 1058 Assert.assertTrue( 1059 "configuredTimeoutMinutes=" + configuredTimeoutMinutes 1060 + ", transferredTimeoutMinutes=" + transferredTimeoutMinutes, 1061 configuredTimeoutMinutes - transferredTimeoutMinutes >= 0 1062 && configuredTimeoutMinutes - transferredTimeoutMinutes <= 1); 1063 } 1064 1065 @Test deadlineNotExceeded()1066 public void deadlineNotExceeded() { 1067 // warm up the channel and JVM 1068 blockingStub.emptyCall(Empty.getDefaultInstance()); 1069 blockingStub 1070 .withDeadlineAfter(10, TimeUnit.SECONDS) 1071 .streamingOutputCall(StreamingOutputCallRequest.newBuilder() 1072 .addResponseParameters(ResponseParameters.newBuilder() 1073 .setIntervalUs(0)) 1074 .build()).next(); 1075 } 1076 1077 @Test deadlineExceeded()1078 public void deadlineExceeded() throws Exception { 1079 // warm up the channel and JVM 1080 blockingStub.emptyCall(Empty.getDefaultInstance()); 1081 TestServiceGrpc.TestServiceBlockingStub stub = 1082 blockingStub.withDeadlineAfter(10, TimeUnit.MILLISECONDS); 1083 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1084 .addResponseParameters(ResponseParameters.newBuilder() 1085 .setIntervalUs((int) TimeUnit.SECONDS.toMicros(20))) 1086 .build(); 1087 try { 1088 stub.streamingOutputCall(request).next(); 1089 fail("Expected deadline to be exceeded"); 1090 } catch (StatusRuntimeException ex) { 1091 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), ex.getStatus().getCode()); 1092 } 1093 1094 assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); 1095 if (metricsExpected()) { 1096 // Stream may not have been created before deadline is exceeded, thus we don't test the tracer 1097 // stats. 1098 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1099 checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall"); 1100 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1101 checkEndTags( 1102 clientEndRecord, 1103 "grpc.testing.TestService/StreamingOutputCall", 1104 Status.Code.DEADLINE_EXCEEDED); 1105 // Do not check server-side metrics, because the status on the server side is undetermined. 1106 } 1107 } 1108 1109 @Test deadlineExceededServerStreaming()1110 public void deadlineExceededServerStreaming() throws Exception { 1111 // warm up the channel and JVM 1112 blockingStub.emptyCall(Empty.getDefaultInstance()); 1113 ResponseParameters.Builder responseParameters = ResponseParameters.newBuilder() 1114 .setSize(1) 1115 .setIntervalUs(10000); 1116 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1117 .setResponseType(PayloadType.COMPRESSABLE) 1118 .addResponseParameters(responseParameters) 1119 .addResponseParameters(responseParameters) 1120 .addResponseParameters(responseParameters) 1121 .addResponseParameters(responseParameters) 1122 .build(); 1123 StreamRecorder<StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1124 asyncStub 1125 .withDeadlineAfter(30, TimeUnit.MILLISECONDS) 1126 .streamingOutputCall(request, recorder); 1127 recorder.awaitCompletion(); 1128 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), 1129 Status.fromThrowable(recorder.getError()).getCode()); 1130 assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); 1131 if (metricsExpected()) { 1132 // Stream may not have been created when deadline is exceeded, thus we don't check tracer 1133 // stats. 1134 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1135 checkStartTags(clientStartRecord, "grpc.testing.TestService/StreamingOutputCall"); 1136 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1137 checkEndTags( 1138 clientEndRecord, 1139 "grpc.testing.TestService/StreamingOutputCall", 1140 Status.Code.DEADLINE_EXCEEDED); 1141 // Do not check server-side metrics, because the status on the server side is undetermined. 1142 } 1143 } 1144 1145 @Test deadlineInPast()1146 public void deadlineInPast() throws Exception { 1147 // Test once with idle channel and once with active channel 1148 try { 1149 blockingStub 1150 .withDeadlineAfter(-10, TimeUnit.SECONDS) 1151 .emptyCall(Empty.getDefaultInstance()); 1152 fail("Should have thrown"); 1153 } catch (StatusRuntimeException ex) { 1154 assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); 1155 } 1156 1157 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 1158 // recorded. The tracer stats rely on the stream being created, which is not the case if 1159 // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. 1160 if (metricsExpected()) { 1161 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1162 checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall"); 1163 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1164 checkEndTags( 1165 clientEndRecord, "grpc.testing.TestService/EmptyCall", 1166 Status.DEADLINE_EXCEEDED.getCode()); 1167 } 1168 1169 // warm up the channel 1170 blockingStub.emptyCall(Empty.getDefaultInstance()); 1171 try { 1172 blockingStub 1173 .withDeadlineAfter(-10, TimeUnit.SECONDS) 1174 .emptyCall(Empty.getDefaultInstance()); 1175 fail("Should have thrown"); 1176 } catch (StatusRuntimeException ex) { 1177 assertEquals(Status.Code.DEADLINE_EXCEEDED, ex.getStatus().getCode()); 1178 } 1179 assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); 1180 if (metricsExpected()) { 1181 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1182 checkStartTags(clientStartRecord, "grpc.testing.TestService/EmptyCall"); 1183 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1184 checkEndTags( 1185 clientEndRecord, "grpc.testing.TestService/EmptyCall", 1186 Status.DEADLINE_EXCEEDED.getCode()); 1187 } 1188 } 1189 1190 @Test maxInboundSize_exact()1191 public void maxInboundSize_exact() { 1192 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1193 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1194 .build(); 1195 1196 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1197 TestServiceGrpc.getStreamingOutputCallMethod(); 1198 ByteSizeMarshaller<StreamingOutputCallResponse> mar = 1199 new ByteSizeMarshaller<StreamingOutputCallResponse>(md.getResponseMarshaller()); 1200 blockingServerStreamingCall( 1201 blockingStub.getChannel(), 1202 md.toBuilder(md.getRequestMarshaller(), mar).build(), 1203 blockingStub.getCallOptions(), 1204 request) 1205 .next(); 1206 1207 int size = mar.lastInSize; 1208 1209 TestServiceGrpc.TestServiceBlockingStub stub = 1210 blockingStub.withMaxInboundMessageSize(size); 1211 1212 stub.streamingOutputCall(request).next(); 1213 } 1214 1215 @Test maxInboundSize_tooBig()1216 public void maxInboundSize_tooBig() { 1217 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1218 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1219 .build(); 1220 1221 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1222 TestServiceGrpc.getStreamingOutputCallMethod(); 1223 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1224 new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller()); 1225 blockingServerStreamingCall( 1226 blockingStub.getChannel(), 1227 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1228 blockingStub.getCallOptions(), 1229 request) 1230 .next(); 1231 1232 int size = mar.lastOutSize; 1233 1234 TestServiceGrpc.TestServiceBlockingStub stub = 1235 blockingStub.withMaxInboundMessageSize(size - 1); 1236 1237 try { 1238 stub.streamingOutputCall(request).next(); 1239 fail(); 1240 } catch (StatusRuntimeException ex) { 1241 Status s = ex.getStatus(); 1242 assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.RESOURCE_EXHAUSTED); 1243 assertThat(Throwables.getStackTraceAsString(ex)).contains("exceeds maximum"); 1244 } 1245 } 1246 1247 @Test maxOutboundSize_exact()1248 public void maxOutboundSize_exact() { 1249 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1250 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1251 .build(); 1252 1253 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1254 TestServiceGrpc.getStreamingOutputCallMethod(); 1255 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1256 new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller()); 1257 blockingServerStreamingCall( 1258 blockingStub.getChannel(), 1259 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1260 blockingStub.getCallOptions(), 1261 request) 1262 .next(); 1263 1264 int size = mar.lastOutSize; 1265 1266 TestServiceGrpc.TestServiceBlockingStub stub = 1267 blockingStub.withMaxOutboundMessageSize(size); 1268 1269 stub.streamingOutputCall(request).next(); 1270 } 1271 1272 @Test maxOutboundSize_tooBig()1273 public void maxOutboundSize_tooBig() { 1274 // set at least one field to ensure the size is non-zero. 1275 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1276 .addResponseParameters(ResponseParameters.newBuilder().setSize(1)) 1277 .build(); 1278 1279 1280 MethodDescriptor<StreamingOutputCallRequest, StreamingOutputCallResponse> md = 1281 TestServiceGrpc.getStreamingOutputCallMethod(); 1282 ByteSizeMarshaller<StreamingOutputCallRequest> mar = 1283 new ByteSizeMarshaller<StreamingOutputCallRequest>(md.getRequestMarshaller()); 1284 blockingServerStreamingCall( 1285 blockingStub.getChannel(), 1286 md.toBuilder(mar, md.getResponseMarshaller()).build(), 1287 blockingStub.getCallOptions(), 1288 request) 1289 .next(); 1290 1291 TestServiceGrpc.TestServiceBlockingStub stub = 1292 blockingStub.withMaxOutboundMessageSize(mar.lastOutSize - 1); 1293 try { 1294 stub.streamingOutputCall(request).next(); 1295 fail(); 1296 } catch (StatusRuntimeException ex) { 1297 Status s = ex.getStatus(); 1298 assertThat(s.getCode()).named(s.toString()).isEqualTo(Status.Code.CANCELLED); 1299 assertThat(Throwables.getStackTraceAsString(ex)).contains("message too large"); 1300 } 1301 } 1302 unaryPayloadLength()1303 protected int unaryPayloadLength() { 1304 // 10MiB. 1305 return 10485760; 1306 } 1307 1308 @Test gracefulShutdown()1309 public void gracefulShutdown() throws Exception { 1310 final List<StreamingOutputCallRequest> requests = Arrays.asList( 1311 StreamingOutputCallRequest.newBuilder() 1312 .addResponseParameters(ResponseParameters.newBuilder() 1313 .setSize(3)) 1314 .setPayload(Payload.newBuilder() 1315 .setBody(ByteString.copyFrom(new byte[2]))) 1316 .build(), 1317 StreamingOutputCallRequest.newBuilder() 1318 .addResponseParameters(ResponseParameters.newBuilder() 1319 .setSize(1)) 1320 .setPayload(Payload.newBuilder() 1321 .setBody(ByteString.copyFrom(new byte[7]))) 1322 .build(), 1323 StreamingOutputCallRequest.newBuilder() 1324 .addResponseParameters(ResponseParameters.newBuilder() 1325 .setSize(4)) 1326 .setPayload(Payload.newBuilder() 1327 .setBody(ByteString.copyFrom(new byte[1]))) 1328 .build()); 1329 final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList( 1330 StreamingOutputCallResponse.newBuilder() 1331 .setPayload(Payload.newBuilder() 1332 .setType(PayloadType.COMPRESSABLE) 1333 .setBody(ByteString.copyFrom(new byte[3]))) 1334 .build(), 1335 StreamingOutputCallResponse.newBuilder() 1336 .setPayload(Payload.newBuilder() 1337 .setType(PayloadType.COMPRESSABLE) 1338 .setBody(ByteString.copyFrom(new byte[1]))) 1339 .build(), 1340 StreamingOutputCallResponse.newBuilder() 1341 .setPayload(Payload.newBuilder() 1342 .setType(PayloadType.COMPRESSABLE) 1343 .setBody(ByteString.copyFrom(new byte[4]))) 1344 .build()); 1345 1346 final ArrayBlockingQueue<StreamingOutputCallResponse> responses = 1347 new ArrayBlockingQueue<StreamingOutputCallResponse>(3); 1348 final SettableFuture<Void> completed = SettableFuture.create(); 1349 final SettableFuture<Void> errorSeen = SettableFuture.create(); 1350 StreamObserver<StreamingOutputCallResponse> responseObserver = 1351 new StreamObserver<StreamingOutputCallResponse>() { 1352 1353 @Override 1354 public void onNext(StreamingOutputCallResponse value) { 1355 responses.add(value); 1356 } 1357 1358 @Override 1359 public void onError(Throwable t) { 1360 errorSeen.set(null); 1361 } 1362 1363 @Override 1364 public void onCompleted() { 1365 completed.set(null); 1366 } 1367 }; 1368 StreamObserver<StreamingOutputCallRequest> requestObserver 1369 = asyncStub.fullDuplexCall(responseObserver); 1370 requestObserver.onNext(requests.get(0)); 1371 assertEquals( 1372 goldenResponses.get(0), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1373 // Initiate graceful shutdown. 1374 channel.shutdown(); 1375 requestObserver.onNext(requests.get(1)); 1376 assertEquals( 1377 goldenResponses.get(1), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1378 // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't. 1379 requestObserver.onNext(requests.get(2)); 1380 assertEquals( 1381 goldenResponses.get(2), responses.poll(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1382 assertFalse(completed.isDone()); 1383 requestObserver.onCompleted(); 1384 completed.get(operationTimeoutMillis(), TimeUnit.MILLISECONDS); 1385 assertFalse(errorSeen.isDone()); 1386 } 1387 1388 @Test customMetadata()1389 public void customMetadata() throws Exception { 1390 final int responseSize = 314159; 1391 final int requestSize = 271828; 1392 final SimpleRequest request = SimpleRequest.newBuilder() 1393 .setResponseSize(responseSize) 1394 .setResponseType(PayloadType.COMPRESSABLE) 1395 .setPayload(Payload.newBuilder() 1396 .setBody(ByteString.copyFrom(new byte[requestSize]))) 1397 .build(); 1398 final StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder() 1399 .addResponseParameters(ResponseParameters.newBuilder().setSize(responseSize)) 1400 .setResponseType(PayloadType.COMPRESSABLE) 1401 .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[requestSize]))) 1402 .build(); 1403 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1404 .setPayload(Payload.newBuilder() 1405 .setType(PayloadType.COMPRESSABLE) 1406 .setBody(ByteString.copyFrom(new byte[responseSize]))) 1407 .build(); 1408 final StreamingOutputCallResponse goldenStreamingResponse = 1409 StreamingOutputCallResponse.newBuilder() 1410 .setPayload(Payload.newBuilder() 1411 .setType(PayloadType.COMPRESSABLE) 1412 .setBody(ByteString.copyFrom(new byte[responseSize]))) 1413 .build(); 1414 final byte[] trailingBytes = 1415 {(byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb, (byte) 0xa, (byte) 0xb}; 1416 1417 // Test UnaryCall 1418 Metadata metadata = new Metadata(); 1419 metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value"); 1420 metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes); 1421 TestServiceGrpc.TestServiceBlockingStub blockingStub = this.blockingStub; 1422 blockingStub = MetadataUtils.attachHeaders(blockingStub, metadata); 1423 AtomicReference<Metadata> headersCapture = new AtomicReference<Metadata>(); 1424 AtomicReference<Metadata> trailersCapture = new AtomicReference<Metadata>(); 1425 blockingStub = MetadataUtils.captureMetadata(blockingStub, headersCapture, trailersCapture); 1426 SimpleResponse response = blockingStub.unaryCall(request); 1427 1428 assertEquals(goldenResponse, response); 1429 assertEquals("test_initial_metadata_value", 1430 headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY)); 1431 assertTrue( 1432 Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY))); 1433 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK, 1434 Collections.singleton(request), Collections.singleton(goldenResponse)); 1435 1436 // Test FullDuplexCall 1437 metadata = new Metadata(); 1438 metadata.put(Util.ECHO_INITIAL_METADATA_KEY, "test_initial_metadata_value"); 1439 metadata.put(Util.ECHO_TRAILING_METADATA_KEY, trailingBytes); 1440 TestServiceGrpc.TestServiceStub stub = asyncStub; 1441 stub = MetadataUtils.attachHeaders(stub, metadata); 1442 headersCapture = new AtomicReference<Metadata>(); 1443 trailersCapture = new AtomicReference<Metadata>(); 1444 stub = MetadataUtils.captureMetadata(stub, headersCapture, trailersCapture); 1445 1446 StreamRecorder<Messages.StreamingOutputCallResponse> recorder = StreamRecorder.create(); 1447 StreamObserver<Messages.StreamingOutputCallRequest> requestStream = 1448 stub.fullDuplexCall(recorder); 1449 requestStream.onNext(streamingRequest); 1450 requestStream.onCompleted(); 1451 recorder.awaitCompletion(); 1452 1453 assertSuccess(recorder); 1454 assertEquals(goldenStreamingResponse, recorder.firstValue().get()); 1455 assertEquals("test_initial_metadata_value", 1456 headersCapture.get().get(Util.ECHO_INITIAL_METADATA_KEY)); 1457 assertTrue( 1458 Arrays.equals(trailingBytes, trailersCapture.get().get(Util.ECHO_TRAILING_METADATA_KEY))); 1459 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.OK, 1460 Collections.singleton(streamingRequest), Collections.singleton(goldenStreamingResponse)); 1461 } 1462 1463 @Test(timeout = 10000) censusContextsPropagated()1464 public void censusContextsPropagated() { 1465 Assume.assumeTrue("Skip the test because server is not in the same process.", server != null); 1466 Span clientParentSpan = Tracing.getTracer().spanBuilder("Test.interopTest").startSpan(); 1467 // A valid ID is guaranteed to be unique, so we can verify it is actually propagated. 1468 assertTrue(clientParentSpan.getContext().getTraceId().isValid()); 1469 Context ctx = 1470 Context.ROOT.withValues( 1471 TAG_CONTEXT_KEY, 1472 tagger.emptyBuilder().put( 1473 StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), 1474 ContextUtils.CONTEXT_SPAN_KEY, 1475 clientParentSpan); 1476 Context origCtx = ctx.attach(); 1477 try { 1478 blockingStub.unaryCall(SimpleRequest.getDefaultInstance()); 1479 Context serverCtx = contextCapture.get(); 1480 assertNotNull(serverCtx); 1481 1482 FakeTagContext statsCtx = (FakeTagContext) TAG_CONTEXT_KEY.get(serverCtx); 1483 assertNotNull(statsCtx); 1484 Map<TagKey, TagValue> tags = statsCtx.getTags(); 1485 boolean tagFound = false; 1486 for (Map.Entry<TagKey, TagValue> tag : tags.entrySet()) { 1487 if (tag.getKey().equals(StatsTestUtils.EXTRA_TAG)) { 1488 assertEquals(TagValue.create("extra value"), tag.getValue()); 1489 tagFound = true; 1490 } 1491 } 1492 assertTrue("tag not found", tagFound); 1493 1494 Span span = CONTEXT_SPAN_KEY.get(serverCtx); 1495 assertNotNull(span); 1496 SpanContext spanContext = span.getContext(); 1497 assertEquals(clientParentSpan.getContext().getTraceId(), spanContext.getTraceId()); 1498 } finally { 1499 ctx.detach(origCtx); 1500 } 1501 } 1502 1503 @Test statusCodeAndMessage()1504 public void statusCodeAndMessage() throws Exception { 1505 int errorCode = 2; 1506 String errorMessage = "test status message"; 1507 EchoStatus responseStatus = EchoStatus.newBuilder() 1508 .setCode(errorCode) 1509 .setMessage(errorMessage) 1510 .build(); 1511 SimpleRequest simpleRequest = SimpleRequest.newBuilder() 1512 .setResponseStatus(responseStatus) 1513 .build(); 1514 StreamingOutputCallRequest streamingRequest = StreamingOutputCallRequest.newBuilder() 1515 .setResponseStatus(responseStatus) 1516 .build(); 1517 1518 // Test UnaryCall 1519 try { 1520 blockingStub.unaryCall(simpleRequest); 1521 fail(); 1522 } catch (StatusRuntimeException e) { 1523 assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); 1524 assertEquals(errorMessage, e.getStatus().getDescription()); 1525 } 1526 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN); 1527 1528 // Test FullDuplexCall 1529 @SuppressWarnings("unchecked") 1530 StreamObserver<StreamingOutputCallResponse> responseObserver = 1531 mock(StreamObserver.class); 1532 StreamObserver<StreamingOutputCallRequest> requestObserver 1533 = asyncStub.fullDuplexCall(responseObserver); 1534 requestObserver.onNext(streamingRequest); 1535 requestObserver.onCompleted(); 1536 1537 ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class); 1538 verify(responseObserver, timeout(operationTimeoutMillis())).onError(captor.capture()); 1539 assertEquals(Status.UNKNOWN.getCode(), Status.fromThrowable(captor.getValue()).getCode()); 1540 assertEquals(errorMessage, Status.fromThrowable(captor.getValue()).getDescription()); 1541 verifyNoMoreInteractions(responseObserver); 1542 assertStatsTrace("grpc.testing.TestService/FullDuplexCall", Status.Code.UNKNOWN); 1543 } 1544 1545 @Test specialStatusMessage()1546 public void specialStatusMessage() throws Exception { 1547 int errorCode = 2; 1548 String errorMessage = "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP \t\n"; 1549 SimpleRequest simpleRequest = SimpleRequest.newBuilder() 1550 .setResponseStatus(EchoStatus.newBuilder() 1551 .setCode(errorCode) 1552 .setMessage(errorMessage) 1553 .build()) 1554 .build(); 1555 1556 try { 1557 blockingStub.unaryCall(simpleRequest); 1558 fail(); 1559 } catch (StatusRuntimeException e) { 1560 assertEquals(Status.UNKNOWN.getCode(), e.getStatus().getCode()); 1561 assertEquals(errorMessage, e.getStatus().getDescription()); 1562 } 1563 assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.UNKNOWN); 1564 } 1565 1566 /** Sends an rpc to an unimplemented method within TestService. */ 1567 @Test unimplementedMethod()1568 public void unimplementedMethod() { 1569 try { 1570 blockingStub.unimplementedCall(Empty.getDefaultInstance()); 1571 fail(); 1572 } catch (StatusRuntimeException e) { 1573 assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); 1574 } 1575 1576 assertClientStatsTrace("grpc.testing.TestService/UnimplementedCall", 1577 Status.Code.UNIMPLEMENTED); 1578 } 1579 1580 /** Sends an rpc to an unimplemented service on the server. */ 1581 @Test unimplementedService()1582 public void unimplementedService() { 1583 UnimplementedServiceGrpc.UnimplementedServiceBlockingStub stub = 1584 UnimplementedServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); 1585 try { 1586 stub.unimplementedCall(Empty.getDefaultInstance()); 1587 fail(); 1588 } catch (StatusRuntimeException e) { 1589 assertEquals(Status.UNIMPLEMENTED.getCode(), e.getStatus().getCode()); 1590 } 1591 1592 assertStatsTrace("grpc.testing.UnimplementedService/UnimplementedCall", 1593 Status.Code.UNIMPLEMENTED); 1594 } 1595 1596 /** Start a fullDuplexCall which the server will not respond, and verify the deadline expires. */ 1597 @SuppressWarnings("MissingFail") 1598 @Test timeoutOnSleepingServer()1599 public void timeoutOnSleepingServer() throws Exception { 1600 TestServiceGrpc.TestServiceStub stub = 1601 asyncStub.withDeadlineAfter(1, TimeUnit.MILLISECONDS); 1602 1603 StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create(); 1604 StreamObserver<StreamingOutputCallRequest> requestObserver 1605 = stub.fullDuplexCall(responseObserver); 1606 1607 StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder() 1608 .setPayload(Payload.newBuilder() 1609 .setBody(ByteString.copyFrom(new byte[27182]))) 1610 .build(); 1611 try { 1612 requestObserver.onNext(request); 1613 } catch (IllegalStateException expected) { 1614 // This can happen if the stream has already been terminated due to deadline exceeded. 1615 } 1616 1617 assertTrue(responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS)); 1618 assertEquals(0, responseObserver.getValues().size()); 1619 assertEquals(Status.DEADLINE_EXCEEDED.getCode(), 1620 Status.fromThrowable(responseObserver.getError()).getCode()); 1621 1622 if (metricsExpected()) { 1623 // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be 1624 // recorded. The tracer stats rely on the stream being created, which is not always the case 1625 // in this test, thus we will not check that. 1626 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1627 checkStartTags(clientStartRecord, "grpc.testing.TestService/FullDuplexCall"); 1628 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1629 checkEndTags( 1630 clientEndRecord, 1631 "grpc.testing.TestService/FullDuplexCall", 1632 Status.DEADLINE_EXCEEDED.getCode()); 1633 } 1634 } 1635 1636 /** Sends a large unary rpc with service account credentials. */ serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope)1637 public void serviceAccountCreds(String jsonKey, InputStream credentialsStream, String authScope) 1638 throws Exception { 1639 // cast to ServiceAccountCredentials to double-check the right type of object was created. 1640 GoogleCredentials credentials = 1641 ServiceAccountCredentials.class.cast(GoogleCredentials.fromStream(credentialsStream)); 1642 credentials = credentials.createScoped(Arrays.<String>asList(authScope)); 1643 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1644 .withCallCredentials(MoreCallCredentials.from(credentials)); 1645 final SimpleRequest request = SimpleRequest.newBuilder() 1646 .setFillUsername(true) 1647 .setFillOauthScope(true) 1648 .setResponseSize(314159) 1649 .setResponseType(PayloadType.COMPRESSABLE) 1650 .setPayload(Payload.newBuilder() 1651 .setBody(ByteString.copyFrom(new byte[271828]))) 1652 .build(); 1653 1654 final SimpleResponse response = stub.unaryCall(request); 1655 assertFalse(response.getUsername().isEmpty()); 1656 assertTrue("Received username: " + response.getUsername(), 1657 jsonKey.contains(response.getUsername())); 1658 assertFalse(response.getOauthScope().isEmpty()); 1659 assertTrue("Received oauth scope: " + response.getOauthScope(), 1660 authScope.contains(response.getOauthScope())); 1661 1662 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1663 .setOauthScope(response.getOauthScope()) 1664 .setUsername(response.getUsername()) 1665 .setPayload(Payload.newBuilder() 1666 .setType(PayloadType.COMPRESSABLE) 1667 .setBody(ByteString.copyFrom(new byte[314159]))) 1668 .build(); 1669 assertEquals(goldenResponse, response); 1670 } 1671 1672 /** Sends a large unary rpc with compute engine credentials. */ computeEngineCreds(String serviceAccount, String oauthScope)1673 public void computeEngineCreds(String serviceAccount, String oauthScope) throws Exception { 1674 ComputeEngineCredentials credentials = ComputeEngineCredentials.create(); 1675 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1676 .withCallCredentials(MoreCallCredentials.from(credentials)); 1677 final SimpleRequest request = SimpleRequest.newBuilder() 1678 .setFillUsername(true) 1679 .setFillOauthScope(true) 1680 .setResponseSize(314159) 1681 .setResponseType(PayloadType.COMPRESSABLE) 1682 .setPayload(Payload.newBuilder() 1683 .setBody(ByteString.copyFrom(new byte[271828]))) 1684 .build(); 1685 1686 final SimpleResponse response = stub.unaryCall(request); 1687 assertEquals(serviceAccount, response.getUsername()); 1688 assertFalse(response.getOauthScope().isEmpty()); 1689 assertTrue("Received oauth scope: " + response.getOauthScope(), 1690 oauthScope.contains(response.getOauthScope())); 1691 1692 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 1693 .setOauthScope(response.getOauthScope()) 1694 .setUsername(response.getUsername()) 1695 .setPayload(Payload.newBuilder() 1696 .setType(PayloadType.COMPRESSABLE) 1697 .setBody(ByteString.copyFrom(new byte[314159]))) 1698 .build(); 1699 assertEquals(goldenResponse, response); 1700 } 1701 1702 /** Test JWT-based auth. */ jwtTokenCreds(InputStream serviceAccountJson)1703 public void jwtTokenCreds(InputStream serviceAccountJson) throws Exception { 1704 final SimpleRequest request = SimpleRequest.newBuilder() 1705 .setResponseType(PayloadType.COMPRESSABLE) 1706 .setResponseSize(314159) 1707 .setPayload(Payload.newBuilder() 1708 .setBody(ByteString.copyFrom(new byte[271828]))) 1709 .setFillUsername(true) 1710 .build(); 1711 1712 ServiceAccountCredentials credentials = (ServiceAccountCredentials) 1713 GoogleCredentials.fromStream(serviceAccountJson); 1714 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1715 .withCallCredentials(MoreCallCredentials.from(credentials)); 1716 SimpleResponse response = stub.unaryCall(request); 1717 assertEquals(credentials.getClientEmail(), response.getUsername()); 1718 assertEquals(314159, response.getPayload().getBody().size()); 1719 } 1720 1721 /** Sends a unary rpc with raw oauth2 access token credentials. */ oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope)1722 public void oauth2AuthToken(String jsonKey, InputStream credentialsStream, String authScope) 1723 throws Exception { 1724 GoogleCredentials utilCredentials = 1725 GoogleCredentials.fromStream(credentialsStream); 1726 utilCredentials = utilCredentials.createScoped(Arrays.<String>asList(authScope)); 1727 AccessToken accessToken = utilCredentials.refreshAccessToken(); 1728 1729 OAuth2Credentials credentials = OAuth2Credentials.create(accessToken); 1730 1731 TestServiceGrpc.TestServiceBlockingStub stub = blockingStub 1732 .withCallCredentials(MoreCallCredentials.from(credentials)); 1733 final SimpleRequest request = SimpleRequest.newBuilder() 1734 .setFillUsername(true) 1735 .setFillOauthScope(true) 1736 .build(); 1737 1738 final SimpleResponse response = stub.unaryCall(request); 1739 assertFalse(response.getUsername().isEmpty()); 1740 assertTrue("Received username: " + response.getUsername(), 1741 jsonKey.contains(response.getUsername())); 1742 assertFalse(response.getOauthScope().isEmpty()); 1743 assertTrue("Received oauth scope: " + response.getOauthScope(), 1744 authScope.contains(response.getOauthScope())); 1745 } 1746 1747 /** Sends a unary rpc with "per rpc" raw oauth2 access token credentials. */ perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope)1748 public void perRpcCreds(String jsonKey, InputStream credentialsStream, String oauthScope) 1749 throws Exception { 1750 // In gRpc Java, we don't have per Rpc credentials, user can use an intercepted stub only once 1751 // for that purpose. 1752 // So, this test is identical to oauth2_auth_token test. 1753 oauth2AuthToken(jsonKey, credentialsStream, oauthScope); 1754 } 1755 assertSuccess(StreamRecorder<?> recorder)1756 protected static void assertSuccess(StreamRecorder<?> recorder) { 1757 if (recorder.getError() != null) { 1758 throw new AssertionError(recorder.getError()); 1759 } 1760 } 1761 1762 /** Helper for getting remote address {@link io.grpc.ServerCall#getAttributes()} */ obtainRemoteClientAddr()1763 protected SocketAddress obtainRemoteClientAddr() { 1764 TestServiceGrpc.TestServiceBlockingStub stub = 1765 blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS); 1766 1767 stub.unaryCall(SimpleRequest.getDefaultInstance()); 1768 1769 return serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR); 1770 } 1771 1772 /** Helper for asserting TLS info in SSLSession {@link io.grpc.ServerCall#getAttributes()} */ assertX500SubjectDn(String tlsInfo)1773 protected void assertX500SubjectDn(String tlsInfo) { 1774 TestServiceGrpc.TestServiceBlockingStub stub = 1775 blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS); 1776 1777 stub.unaryCall(SimpleRequest.getDefaultInstance()); 1778 1779 List<Certificate> certificates = Lists.newArrayList(); 1780 SSLSession sslSession = 1781 serverCallCapture.get().getAttributes().get(Grpc.TRANSPORT_ATTR_SSL_SESSION); 1782 try { 1783 certificates = Arrays.asList(sslSession.getPeerCertificates()); 1784 } catch (SSLPeerUnverifiedException e) { 1785 // Should never happen 1786 throw new AssertionError(e); 1787 } 1788 1789 X509Certificate x509cert = (X509Certificate) certificates.get(0); 1790 1791 assertEquals(1, certificates.size()); 1792 assertEquals(tlsInfo, x509cert.getSubjectDN().toString()); 1793 } 1794 operationTimeoutMillis()1795 protected int operationTimeoutMillis() { 1796 return 5000; 1797 } 1798 1799 /** 1800 * Some tests run on memory constrained environments. Rather than OOM, just give up. 64 is 1801 * chosen as a maximum amount of memory a large test would need. 1802 */ assumeEnoughMemory()1803 private static void assumeEnoughMemory() { 1804 Runtime r = Runtime.getRuntime(); 1805 long usedMem = r.totalMemory() - r.freeMemory(); 1806 long actuallyFreeMemory = r.maxMemory() - usedMem; 1807 Assume.assumeTrue( 1808 actuallyFreeMemory + " is not sufficient to run this test", 1809 actuallyFreeMemory >= 64 * 1024 * 1024); 1810 } 1811 1812 /** 1813 * Wrapper around {@link Mockito#verify}, to keep log spam down on failure. 1814 */ verify(T mock, VerificationMode mode)1815 private static <T> T verify(T mock, VerificationMode mode) { 1816 try { 1817 return Mockito.verify(mock, mode); 1818 } catch (final AssertionError e) { 1819 String msg = e.getMessage(); 1820 if (msg.length() >= 256) { 1821 // AssertionError(String, Throwable) only present in Android API 19+ 1822 throw new AssertionError(msg.substring(0, 256)) { 1823 @Override 1824 public synchronized Throwable getCause() { 1825 return e; 1826 } 1827 }; 1828 } 1829 throw e; 1830 } 1831 } 1832 1833 /** 1834 * Wrapper around {@link Mockito#verify}, to keep log spam down on failure. 1835 */ verifyNoMoreInteractions(Object... mocks)1836 private static void verifyNoMoreInteractions(Object... mocks) { 1837 try { 1838 Mockito.verifyNoMoreInteractions(mocks); 1839 } catch (final AssertionError e) { 1840 String msg = e.getMessage(); 1841 if (msg.length() >= 256) { 1842 // AssertionError(String, Throwable) only present in Android API 19+ 1843 throw new AssertionError(msg.substring(0, 256)) { 1844 @Override 1845 public synchronized Throwable getCause() { 1846 return e; 1847 } 1848 }; 1849 } 1850 throw e; 1851 } 1852 } 1853 1854 /** 1855 * Poll the next metrics record and check it against the provided information, including the 1856 * message sizes. 1857 */ assertStatsTrace(String method, Status.Code status, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1858 private void assertStatsTrace(String method, Status.Code status, 1859 Collection<? extends MessageLite> requests, 1860 Collection<? extends MessageLite> responses) { 1861 assertClientStatsTrace(method, status, requests, responses); 1862 assertServerStatsTrace(method, status, requests, responses); 1863 } 1864 1865 /** 1866 * Poll the next metrics record and check it against the provided information, without checking 1867 * the message sizes. 1868 */ assertStatsTrace(String method, Status.Code status)1869 private void assertStatsTrace(String method, Status.Code status) { 1870 assertStatsTrace(method, status, null, null); 1871 } 1872 assertClientStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1873 private void assertClientStatsTrace(String method, Status.Code code, 1874 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 1875 // Tracer-based stats 1876 TestClientStreamTracer tracer = clientStreamTracers.poll(); 1877 assertNotNull(tracer); 1878 assertTrue(tracer.getOutboundHeaders()); 1879 // assertClientStatsTrace() is called right after application receives status, 1880 // but streamClosed() may be called slightly later than that. So we need a timeout. 1881 try { 1882 assertTrue(tracer.await(5, TimeUnit.SECONDS)); 1883 } catch (InterruptedException e) { 1884 throw new AssertionError(e); 1885 } 1886 assertEquals(code, tracer.getStatus().getCode()); 1887 1888 if (requests != null && responses != null) { 1889 checkTracers(tracer, requests, responses); 1890 } 1891 if (metricsExpected()) { 1892 // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be 1893 // done before application receives status. 1894 MetricsRecord clientStartRecord = clientStatsRecorder.pollRecord(); 1895 checkStartTags(clientStartRecord, method); 1896 MetricsRecord clientEndRecord = clientStatsRecorder.pollRecord(); 1897 checkEndTags(clientEndRecord, method, code); 1898 1899 if (requests != null && responses != null) { 1900 checkCensus(clientEndRecord, false, requests, responses); 1901 } 1902 } 1903 } 1904 assertClientStatsTrace(String method, Status.Code status)1905 private void assertClientStatsTrace(String method, Status.Code status) { 1906 assertClientStatsTrace(method, status, null, null); 1907 } 1908 1909 @SuppressWarnings("AssertionFailureIgnored") // Failure is checked in the end by the passed flag. assertServerStatsTrace(String method, Status.Code code, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)1910 private void assertServerStatsTrace(String method, Status.Code code, 1911 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 1912 if (server == null) { 1913 // Server is not in the same process. We can't check server-side stats. 1914 return; 1915 } 1916 1917 if (metricsExpected()) { 1918 MetricsRecord serverStartRecord; 1919 MetricsRecord serverEndRecord; 1920 try { 1921 // On the server, the stats is finalized in ServerStreamListener.closed(), which can be 1922 // run after the client receives the final status. So we use a timeout. 1923 serverStartRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1924 serverEndRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); 1925 } catch (InterruptedException e) { 1926 throw new RuntimeException(e); 1927 } 1928 assertNotNull(serverStartRecord); 1929 assertNotNull(serverEndRecord); 1930 checkStartTags(serverStartRecord, method); 1931 checkEndTags(serverEndRecord, method, code); 1932 if (requests != null && responses != null) { 1933 checkCensus(serverEndRecord, true, requests, responses); 1934 } 1935 } 1936 1937 ServerStreamTracerInfo tracerInfo; 1938 tracerInfo = serverStreamTracers.poll(); 1939 assertNotNull(tracerInfo); 1940 assertEquals(method, tracerInfo.fullMethodName); 1941 assertNotNull(tracerInfo.tracer.contextCapture); 1942 // On the server, streamClosed() may be called after the client receives the final status. 1943 // So we use a timeout. 1944 try { 1945 assertTrue(tracerInfo.tracer.await(1, TimeUnit.SECONDS)); 1946 } catch (InterruptedException e) { 1947 throw new AssertionError(e); 1948 } 1949 assertEquals(code, tracerInfo.tracer.getStatus().getCode()); 1950 if (requests != null && responses != null) { 1951 checkTracers(tracerInfo.tracer, responses, requests); 1952 } 1953 } 1954 checkStartTags(MetricsRecord record, String methodName)1955 private static void checkStartTags(MetricsRecord record, String methodName) { 1956 assertNotNull("record is not null", record); 1957 TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); 1958 assertNotNull("method name tagged", methodNameTag); 1959 assertEquals("method names match", methodName, methodNameTag.asString()); 1960 } 1961 checkEndTags( MetricsRecord record, String methodName, Status.Code status)1962 private static void checkEndTags( 1963 MetricsRecord record, String methodName, Status.Code status) { 1964 assertNotNull("record is not null", record); 1965 TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); 1966 assertNotNull("method name tagged", methodNameTag); 1967 assertEquals("method names match", methodName, methodNameTag.asString()); 1968 TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); 1969 assertNotNull("status tagged", statusTag); 1970 assertEquals(status.toString(), statusTag.asString()); 1971 } 1972 1973 /** 1974 * Check information recorded by tracers. 1975 */ checkTracers( TestStreamTracer tracer, Collection<? extends MessageLite> sentMessages, Collection<? extends MessageLite> receivedMessages)1976 private void checkTracers( 1977 TestStreamTracer tracer, 1978 Collection<? extends MessageLite> sentMessages, 1979 Collection<? extends MessageLite> receivedMessages) { 1980 long uncompressedSentSize = 0; 1981 int seqNo = 0; 1982 for (MessageLite msg : sentMessages) { 1983 assertThat(tracer.nextOutboundEvent()).isEqualTo(String.format("outboundMessage(%d)", seqNo)); 1984 assertThat(tracer.nextOutboundEvent()).matches( 1985 String.format("outboundMessageSent\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); 1986 seqNo++; 1987 uncompressedSentSize += msg.getSerializedSize(); 1988 } 1989 assertNull(tracer.nextOutboundEvent()); 1990 long uncompressedReceivedSize = 0; 1991 seqNo = 0; 1992 for (MessageLite msg : receivedMessages) { 1993 assertThat(tracer.nextInboundEvent()).isEqualTo(String.format("inboundMessage(%d)", seqNo)); 1994 assertThat(tracer.nextInboundEvent()).matches( 1995 String.format("inboundMessageRead\\(%d, -?[0-9]+, -?[0-9]+\\)", seqNo)); 1996 uncompressedReceivedSize += msg.getSerializedSize(); 1997 seqNo++; 1998 } 1999 assertNull(tracer.nextInboundEvent()); 2000 if (metricsExpected()) { 2001 assertEquals(uncompressedSentSize, tracer.getOutboundUncompressedSize()); 2002 assertEquals(uncompressedReceivedSize, tracer.getInboundUncompressedSize()); 2003 } 2004 } 2005 2006 /** 2007 * Check information recorded by Census. 2008 */ checkCensus(MetricsRecord record, boolean isServer, Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses)2009 private void checkCensus(MetricsRecord record, boolean isServer, 2010 Collection<? extends MessageLite> requests, Collection<? extends MessageLite> responses) { 2011 int uncompressedRequestsSize = 0; 2012 for (MessageLite request : requests) { 2013 uncompressedRequestsSize += request.getSerializedSize(); 2014 } 2015 int uncompressedResponsesSize = 0; 2016 for (MessageLite response : responses) { 2017 uncompressedResponsesSize += response.getSerializedSize(); 2018 } 2019 if (isServer) { 2020 assertEquals( 2021 requests.size(), 2022 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); 2023 assertEquals( 2024 responses.size(), 2025 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); 2026 assertEquals( 2027 uncompressedRequestsSize, 2028 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); 2029 assertEquals( 2030 uncompressedResponsesSize, 2031 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); 2032 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); 2033 // It's impossible to get the expected wire sizes because it may be compressed, so we just 2034 // check if they are recorded. 2035 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); 2036 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); 2037 } else { 2038 assertEquals( 2039 requests.size(), 2040 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); 2041 assertEquals( 2042 responses.size(), 2043 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); 2044 assertEquals( 2045 uncompressedRequestsSize, 2046 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); 2047 assertEquals( 2048 uncompressedResponsesSize, 2049 record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); 2050 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); 2051 // It's impossible to get the expected wire sizes because it may be compressed, so we just 2052 // check if they are recorded. 2053 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); 2054 assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); 2055 } 2056 } 2057 2058 /** 2059 * Captures the request attributes. Useful for testing ServerCalls. 2060 * {@link ServerCall#getAttributes()} 2061 */ recordServerCallInterceptor( final AtomicReference<ServerCall<?, ?>> serverCallCapture)2062 private static ServerInterceptor recordServerCallInterceptor( 2063 final AtomicReference<ServerCall<?, ?>> serverCallCapture) { 2064 return new ServerInterceptor() { 2065 @Override 2066 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 2067 ServerCall<ReqT, RespT> call, 2068 Metadata requestHeaders, 2069 ServerCallHandler<ReqT, RespT> next) { 2070 serverCallCapture.set(call); 2071 return next.startCall(call, requestHeaders); 2072 } 2073 }; 2074 } 2075 2076 private static ServerInterceptor recordContextInterceptor( 2077 final AtomicReference<Context> contextCapture) { 2078 return new ServerInterceptor() { 2079 @Override 2080 public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall( 2081 ServerCall<ReqT, RespT> call, 2082 Metadata requestHeaders, 2083 ServerCallHandler<ReqT, RespT> next) { 2084 contextCapture.set(Context.current()); 2085 return next.startCall(call, requestHeaders); 2086 } 2087 }; 2088 } 2089 2090 /** 2091 * A marshaller that record input and output sizes. 2092 */ 2093 private static final class ByteSizeMarshaller<T> implements MethodDescriptor.Marshaller<T> { 2094 2095 private final MethodDescriptor.Marshaller<T> delegate; 2096 volatile int lastOutSize; 2097 volatile int lastInSize; 2098 2099 ByteSizeMarshaller(MethodDescriptor.Marshaller<T> delegate) { 2100 this.delegate = delegate; 2101 } 2102 2103 @Override 2104 public InputStream stream(T value) { 2105 InputStream is = delegate.stream(value); 2106 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2107 try { 2108 lastOutSize = (int) ByteStreams.copy(is, baos); 2109 } catch (IOException e) { 2110 throw new RuntimeException(e); 2111 } 2112 return new ByteArrayInputStream(baos.toByteArray()); 2113 } 2114 2115 @Override 2116 public T parse(InputStream stream) { 2117 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2118 try { 2119 lastInSize = (int) ByteStreams.copy(stream, baos); 2120 } catch (IOException e) { 2121 throw new RuntimeException(e); 2122 } 2123 return delegate.parse(new ByteArrayInputStream(baos.toByteArray())); 2124 } 2125 } 2126 } 2127