/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.testing.integration;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE;
import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY;
import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.MessageLite;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.CensusStatsModule;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.testing.StatsTestUtils;
import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder;
import io.grpc.internal.testing.StatsTestUtils.FakeTagContext;
import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer;
import io.grpc.internal.testing.StatsTestUtils.FakeTagger;
import io.grpc.internal.testing.StatsTestUtils.MetricsRecord;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.internal.testing.TestClientStreamTracer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.internal.testing.TestStreamTracer;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.TestUtils;
import io.grpc.testing.integration.EmptyProtos.Empty;
import io.grpc.testing.integration.Messages.EchoStatus;
import io.grpc.testing.integration.Messages.Payload;
import io.grpc.testing.integration.Messages.PayloadType;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.TagValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.SpanContext;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.unsafe.ContextUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketAddress;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
/**
* Abstract base class for all GRPC transport tests.
*
*
New tests should avoid using Mockito to support running on AppEngine.
*/
public abstract class AbstractInteropTest {
private static Logger logger = Logger.getLogger(AbstractInteropTest.class.getName());
@Rule public final Timeout globalTimeout = Timeout.seconds(30);
/** Must be at least {@link #unaryPayloadLength()}, plus some to account for encoding overhead. */
public static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
private static final FakeTagger tagger = new FakeTagger();
private static final FakeTagContextBinarySerializer tagContextBinarySerializer =
new FakeTagContextBinarySerializer();
private final AtomicReference> serverCallCapture =
new AtomicReference>();
private final AtomicReference requestHeadersCapture =
new AtomicReference();
private final AtomicReference contextCapture =
new AtomicReference();
private final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder();
private final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder();
private ScheduledExecutorService testServiceExecutor;
private Server server;
private final LinkedBlockingQueue serverStreamTracers =
new LinkedBlockingQueue();
private static final class ServerStreamTracerInfo {
final String fullMethodName;
final InteropServerStreamTracer tracer;
ServerStreamTracerInfo(String fullMethodName, InteropServerStreamTracer tracer) {
this.fullMethodName = fullMethodName;
this.tracer = tracer;
}
private static final class InteropServerStreamTracer extends TestServerStreamTracer {
private volatile Context contextCapture;
@Override
public Context filterContext(Context context) {
contextCapture = context;
return super.filterContext(context);
}
}
}
private final ServerStreamTracer.Factory serverStreamTracerFactory =
new ServerStreamTracer.Factory() {
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
ServerStreamTracerInfo.InteropServerStreamTracer tracer
= new ServerStreamTracerInfo.InteropServerStreamTracer();
serverStreamTracers.add(new ServerStreamTracerInfo(fullMethodName, tracer));
return tracer;
}
};
protected static final Empty EMPTY = Empty.getDefaultInstance();
private void startServer() {
AbstractServerImplBuilder> builder = getServerBuilder();
if (builder == null) {
server = null;
return;
}
testServiceExecutor = Executors.newScheduledThreadPool(2);
List allInterceptors = ImmutableList.builder()
.add(recordServerCallInterceptor(serverCallCapture))
.add(TestUtils.recordRequestHeadersInterceptor(requestHeadersCapture))
.add(recordContextInterceptor(contextCapture))
.addAll(TestServiceImpl.interceptors())
.build();
builder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(testServiceExecutor),
allInterceptors))
.addStreamTracerFactory(serverStreamTracerFactory);
io.grpc.internal.TestingAccessor.setStatsImplementation(
builder,
new CensusStatsModule(
tagger,
tagContextBinarySerializer,
serverStatsRecorder,
GrpcUtil.STOPWATCH_SUPPLIER,
true));
try {
server = builder.build().start();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
private void stopServer() {
if (server != null) {
server.shutdownNow();
}
if (testServiceExecutor != null) {
testServiceExecutor.shutdown();
}
}
@VisibleForTesting
final int getPort() {
return server.getPort();
}
protected ManagedChannel channel;
protected TestServiceGrpc.TestServiceBlockingStub blockingStub;
protected TestServiceGrpc.TestServiceStub asyncStub;
private final LinkedBlockingQueue clientStreamTracers =
new LinkedBlockingQueue();
private final ClientStreamTracer.Factory clientStreamTracerFactory =
new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
TestClientStreamTracer tracer = new TestClientStreamTracer();
clientStreamTracers.add(tracer);
return tracer;
}
};
private final ClientInterceptor tracerSetupInterceptor = new ClientInterceptor() {
@Override
public ClientCall interceptCall(
MethodDescriptor method, CallOptions callOptions, Channel next) {
return next.newCall(
method, callOptions.withStreamTracerFactory(clientStreamTracerFactory));
}
};
/**
* Must be called by the subclass setup method if overridden.
*/
@Before
public void setUp() {
startServer();
channel = createChannel();
blockingStub =
TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor);
asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor);
ClientInterceptor[] additionalInterceptors = getAdditionalInterceptors();
if (additionalInterceptors != null) {
blockingStub = blockingStub.withInterceptors(additionalInterceptors);
asyncStub = asyncStub.withInterceptors(additionalInterceptors);
}
requestHeadersCapture.set(null);
}
/** Clean up. */
@After
public void tearDown() {
if (channel != null) {
channel.shutdownNow();
try {
channel.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
logger.log(Level.FINE, "Interrupted while waiting for channel termination", ie);
// Best effort. If there is an interruption, we want to continue cleaning up, but quickly
Thread.currentThread().interrupt();
}
}
stopServer();
}
protected abstract ManagedChannel createChannel();
@Nullable
protected ClientInterceptor[] getAdditionalInterceptors() {
return null;
}
/**
* Returns the server builder used to create server for each test run. Return {@code null} if
* it shouldn't start a server in the same process.
*/
@Nullable
protected AbstractServerImplBuilder> getServerBuilder() {
return null;
}
protected final CensusStatsModule createClientCensusStatsModule() {
return new CensusStatsModule(
tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true);
}
/**
* Return true if exact metric values should be checked.
*/
protected boolean metricsExpected() {
return true;
}
@Test
public void emptyUnary() throws Exception {
assertEquals(EMPTY, blockingStub.emptyCall(EMPTY));
}
/** Sends a cacheable unary rpc using GET. Requires that the server is behind a caching proxy. */
public void cacheableUnary() {
// Set safe to true.
MethodDescriptor safeCacheableUnaryCallMethod =
TestServiceGrpc.getCacheableUnaryCallMethod().toBuilder().setSafe(true).build();
// Set fake user IP since some proxies (GFE) won't cache requests from localhost.
Metadata.Key userIpKey = Metadata.Key.of("x-user-ip", Metadata.ASCII_STRING_MARSHALLER);
Metadata metadata = new Metadata();
metadata.put(userIpKey, "1.2.3.4");
Channel channelWithUserIpKey =
ClientInterceptors.intercept(channel, MetadataUtils.newAttachHeadersInterceptor(metadata));
SimpleRequest requests1And2 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleRequest request3 =
SimpleRequest.newBuilder()
.setPayload(
Payload.newBuilder()
.setBody(ByteString.copyFromUtf8(String.valueOf(System.nanoTime()))))
.build();
SimpleResponse response1 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response2 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, requests1And2);
SimpleResponse response3 =
ClientCalls.blockingUnaryCall(
channelWithUserIpKey, safeCacheableUnaryCallMethod, CallOptions.DEFAULT, request3);
assertEquals(response1, response2);
assertNotEquals(response1, response3);
}
@Test
public void largeUnary() throws Exception {
assumeEnoughMemory();
final SimpleRequest request = SimpleRequest.newBuilder()
.setResponseSize(314159)
.setResponseType(PayloadType.COMPRESSABLE)
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertEquals(goldenResponse, blockingStub.unaryCall(request));
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.OK,
Collections.singleton(request), Collections.singleton(goldenResponse));
}
/**
* Tests client per-message compression for unary calls. The Java API does not support inspecting
* a message's compression level, so this is primarily intended to run against a gRPC C++ server.
*/
public void clientCompressedUnary(boolean probe) throws Exception {
assumeEnoughMemory();
final SimpleRequest expectCompressedRequest =
SimpleRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(true))
.setResponseSize(314159)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleRequest expectUncompressedRequest =
SimpleRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(false))
.setResponseSize(314159)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse goldenResponse =
SimpleResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
.build();
if (probe) {
// Send a non-compressed message with expectCompress=true. Servers supporting this test case
// should return INVALID_ARGUMENT.
try {
blockingStub.unaryCall(expectCompressedRequest);
fail("expected INVALID_ARGUMENT");
} catch (StatusRuntimeException e) {
assertEquals(Status.INVALID_ARGUMENT.getCode(), e.getStatus().getCode());
}
assertStatsTrace("grpc.testing.TestService/UnaryCall", Status.Code.INVALID_ARGUMENT);
}
assertEquals(
goldenResponse, blockingStub.withCompression("gzip").unaryCall(expectCompressedRequest));
assertStatsTrace(
"grpc.testing.TestService/UnaryCall",
Status.Code.OK,
Collections.singleton(expectCompressedRequest),
Collections.singleton(goldenResponse));
assertEquals(goldenResponse, blockingStub.unaryCall(expectUncompressedRequest));
assertStatsTrace(
"grpc.testing.TestService/UnaryCall",
Status.Code.OK,
Collections.singleton(expectUncompressedRequest),
Collections.singleton(goldenResponse));
}
/**
* Tests if the server can send a compressed unary response. Ideally we would assert that the
* responses have the requested compression, but this is not supported by the API. Given a
* compliant server, this test will exercise the code path for receiving a compressed response but
* cannot itself verify that the response was compressed.
*/
@Test
public void serverCompressedUnary() throws Exception {
assumeEnoughMemory();
final SimpleRequest responseShouldBeCompressed =
SimpleRequest.newBuilder()
.setResponseCompressed(BoolValue.newBuilder().setValue(true))
.setResponseSize(314159)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleRequest responseShouldBeUncompressed =
SimpleRequest.newBuilder()
.setResponseCompressed(BoolValue.newBuilder().setValue(false))
.setResponseSize(314159)
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[271828])))
.build();
final SimpleResponse goldenResponse =
SimpleResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[314159])))
.build();
assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeCompressed));
assertStatsTrace(
"grpc.testing.TestService/UnaryCall",
Status.Code.OK,
Collections.singleton(responseShouldBeCompressed),
Collections.singleton(goldenResponse));
assertEquals(goldenResponse, blockingStub.unaryCall(responseShouldBeUncompressed));
assertStatsTrace(
"grpc.testing.TestService/UnaryCall",
Status.Code.OK,
Collections.singleton(responseShouldBeUncompressed),
Collections.singleton(goldenResponse));
}
@Test
public void serverStreaming() throws Exception {
final StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.setResponseType(PayloadType.COMPRESSABLE)
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9))
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(2653))
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(58979))
.build();
final List goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[9])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[2653])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[58979])))
.build());
StreamRecorder recorder = StreamRecorder.create();
asyncStub.streamingOutputCall(request, recorder);
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(goldenResponses, recorder.getValues());
}
@Test
public void clientStreaming() throws Exception {
final List requests = Arrays.asList(
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[8])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[1828])))
.build(),
StreamingInputCallRequest.newBuilder()
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[45904])))
.build());
final StreamingInputCallResponse goldenResponse = StreamingInputCallResponse.newBuilder()
.setAggregatedPayloadSize(74922)
.build();
StreamRecorder responseObserver = StreamRecorder.create();
StreamObserver requestObserver =
asyncStub.streamingInputCall(responseObserver);
for (StreamingInputCallRequest request : requests) {
requestObserver.onNext(request);
}
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
responseObserver.awaitCompletion();
assertThat(responseObserver.getValues()).hasSize(1);
Throwable t = responseObserver.getError();
if (t != null) {
throw new AssertionError(t);
}
}
/**
* Tests client per-message compression for streaming calls. The Java API does not support
* inspecting a message's compression level, so this is primarily intended to run against a gRPC
* C++ server.
*/
public void clientCompressedStreaming(boolean probe) throws Exception {
final StreamingInputCallRequest expectCompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(true))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[27182])))
.build();
final StreamingInputCallRequest expectUncompressedRequest =
StreamingInputCallRequest.newBuilder()
.setExpectCompressed(BoolValue.newBuilder().setValue(false))
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[45904])))
.build();
final StreamingInputCallResponse goldenResponse =
StreamingInputCallResponse.newBuilder().setAggregatedPayloadSize(73086).build();
StreamRecorder responseObserver = StreamRecorder.create();
StreamObserver requestObserver =
asyncStub.streamingInputCall(responseObserver);
if (probe) {
// Send a non-compressed message with expectCompress=true. Servers supporting this test case
// should return INVALID_ARGUMENT.
requestObserver.onNext(expectCompressedRequest);
responseObserver.awaitCompletion(operationTimeoutMillis(), TimeUnit.MILLISECONDS);
Throwable e = responseObserver.getError();
assertNotNull("expected INVALID_ARGUMENT", e);
assertEquals(Status.INVALID_ARGUMENT.getCode(), Status.fromThrowable(e).getCode());
}
// Start a new stream
responseObserver = StreamRecorder.create();
@SuppressWarnings("unchecked")
ClientCallStreamObserver clientCallStreamObserver =
(ClientCallStreamObserver)
asyncStub.withCompression("gzip").streamingInputCall(responseObserver);
clientCallStreamObserver.setMessageCompression(true);
clientCallStreamObserver.onNext(expectCompressedRequest);
clientCallStreamObserver.setMessageCompression(false);
clientCallStreamObserver.onNext(expectUncompressedRequest);
clientCallStreamObserver.onCompleted();
responseObserver.awaitCompletion();
assertSuccess(responseObserver);
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
/**
* Tests server per-message compression in a streaming response. Ideally we would assert that the
* responses have the requested compression, but this is not supported by the API. Given a
* compliant server, this test will exercise the code path for receiving a compressed response but
* cannot itself verify that the response was compressed.
*/
public void serverCompressedStreaming() throws Exception {
final StreamingOutputCallRequest request =
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(
ResponseParameters.newBuilder()
.setCompressed(BoolValue.newBuilder().setValue(true))
.setSize(31415))
.addResponseParameters(
ResponseParameters.newBuilder()
.setCompressed(BoolValue.newBuilder().setValue(false))
.setSize(92653))
.build();
final List goldenResponses =
Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[92653])))
.build());
StreamRecorder recorder = StreamRecorder.create();
asyncStub.streamingOutputCall(request, recorder);
recorder.awaitCompletion();
assertSuccess(recorder);
assertEquals(goldenResponses, recorder.getValues());
}
@Test
public void pingPong() throws Exception {
final List requests = Arrays.asList(
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(31415))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[27182])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[8])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(2653))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[1828])))
.build(),
StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(58979))
.setPayload(Payload.newBuilder()
.setBody(ByteString.copyFrom(new byte[45904])))
.build());
final List goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[31415])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[9])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[2653])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(PayloadType.COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[58979])))
.build());
final ArrayBlockingQueue