• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static com.google.common.truth.Truth.assertThat;
20 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY;
21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
22 import static org.junit.Assert.assertEquals;
23 import static org.junit.Assert.assertFalse;
24 import static org.junit.Assert.assertNull;
25 import static org.junit.Assert.assertTrue;
26 
27 import com.google.protobuf.ByteString;
28 import io.grpc.CallOptions;
29 import io.grpc.Channel;
30 import io.grpc.ClientCall;
31 import io.grpc.ClientCall.Listener;
32 import io.grpc.ClientInterceptor;
33 import io.grpc.Codec;
34 import io.grpc.CompressorRegistry;
35 import io.grpc.DecompressorRegistry;
36 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
37 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
38 import io.grpc.Grpc;
39 import io.grpc.InsecureChannelCredentials;
40 import io.grpc.ManagedChannel;
41 import io.grpc.Metadata;
42 import io.grpc.MethodDescriptor;
43 import io.grpc.Server;
44 import io.grpc.ServerBuilder;
45 import io.grpc.ServerCall;
46 import io.grpc.ServerCallHandler;
47 import io.grpc.ServerInterceptor;
48 import io.grpc.ServerInterceptors;
49 import io.grpc.stub.StreamObserver;
50 import io.grpc.testing.integration.Messages.Payload;
51 import io.grpc.testing.integration.Messages.SimpleRequest;
52 import io.grpc.testing.integration.Messages.SimpleResponse;
53 import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub;
54 import io.grpc.testing.integration.TransportCompressionTest.Fzip;
55 import java.nio.charset.Charset;
56 import java.util.ArrayList;
57 import java.util.Collection;
58 import java.util.List;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.ScheduledExecutorService;
61 import org.junit.After;
62 import org.junit.Before;
63 import org.junit.Test;
64 import org.junit.runner.RunWith;
65 import org.junit.runners.Parameterized;
66 import org.junit.runners.Parameterized.Parameters;
67 
68 /**
69  * Tests for compression configurations.
70  *
71  * <p>Because of the asymmetry of clients and servers, clients will not know what decompression
72  * methods the server supports.  In cases where the client is willing to encode, and the server
73  * is willing to decode, a second RPC is sent to show that the client has learned and will use
74  * the encoding.
75  *
76  * <p>In cases where compression is negotiated, but either the client or the server doesn't
77  * actually want to encode, a dummy codec is used to record usage.  If compression is not enabled,
78  * the codec will see no data pass through.  This is checked on each test to ensure the code is
79  * doing the right thing.
80  */
81 @RunWith(Parameterized.class)
82 public class CompressionTest {
83   private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
84   // Ensures that both the request and response messages are more than 0 bytes.  The framer/deframer
85   // may not use the compressor if the message is empty.
86   private static final SimpleRequest REQUEST = SimpleRequest.newBuilder()
87       .setResponseSize(1)
88       .build();
89 
90   private Fzip clientCodec = new Fzip("fzip", Codec.Identity.NONE);
91   private Fzip serverCodec = new Fzip("fzip", Codec.Identity.NONE);
92   private DecompressorRegistry clientDecompressors = DecompressorRegistry.emptyInstance();
93   private DecompressorRegistry serverDecompressors = DecompressorRegistry.emptyInstance();
94   private CompressorRegistry clientCompressors = CompressorRegistry.newEmptyInstance();
95   private CompressorRegistry serverCompressors = CompressorRegistry.newEmptyInstance();
96 
97   /** The headers received by the server from the client. */
98   private volatile Metadata serverResponseHeaders;
99   /** The headers received by the client from the server. */
100   private volatile Metadata clientResponseHeaders;
101 
102   // Params
103   private final boolean enableClientMessageCompression;
104   private final boolean enableServerMessageCompression;
105   private final boolean clientAcceptEncoding;
106   private final boolean clientEncoding;
107   private final boolean serverAcceptEncoding;
108   private final boolean serverEncoding;
109 
110   private Server server;
111   private ManagedChannel channel;
112   private TestServiceBlockingStub stub;
113 
114   /**
115    * Auto called by test.
116    */
CompressionTest( boolean enableClientMessageCompression, boolean clientAcceptEncoding, boolean clientEncoding, boolean enableServerMessageCompression, boolean serverAcceptEncoding, boolean serverEncoding)117   public CompressionTest(
118       boolean enableClientMessageCompression,
119       boolean clientAcceptEncoding,
120       boolean clientEncoding,
121       boolean enableServerMessageCompression,
122       boolean serverAcceptEncoding,
123       boolean serverEncoding) {
124     this.enableClientMessageCompression = enableClientMessageCompression;
125     this.clientAcceptEncoding = clientAcceptEncoding;
126     this.clientEncoding = clientEncoding;
127     this.enableServerMessageCompression = enableServerMessageCompression;
128     this.serverAcceptEncoding = serverAcceptEncoding;
129     this.serverEncoding = serverEncoding;
130   }
131 
132   @Before
setUp()133   public void setUp() throws Exception {
134     clientDecompressors = clientDecompressors.with(Codec.Identity.NONE, false);
135     serverDecompressors = serverDecompressors.with(Codec.Identity.NONE, false);
136   }
137 
138   @After
tearDown()139   public void tearDown() {
140     channel.shutdownNow();
141     server.shutdownNow();
142     executor.shutdownNow();
143   }
144 
145   /**
146    * Parameters for test.
147    */
148   @Parameters
params()149   public static Collection<Object[]> params() {
150     boolean[] bools = new boolean[]{false, true};
151     List<Object[]> combos = new ArrayList<>(64);
152     for (boolean enableClientMessageCompression : bools) {
153       for (boolean clientAcceptEncoding : bools) {
154         for (boolean clientEncoding : bools) {
155           for (boolean enableServerMessageCompression : bools) {
156             for (boolean serverAcceptEncoding : bools) {
157               for (boolean serverEncoding : bools) {
158                 combos.add(new Object[] {
159                     enableClientMessageCompression, clientAcceptEncoding, clientEncoding,
160                     enableServerMessageCompression, serverAcceptEncoding, serverEncoding});
161               }
162             }
163           }
164         }
165       }
166     }
167     return combos;
168   }
169 
170   @Test
compression()171   public void compression() throws Exception {
172     if (clientAcceptEncoding) {
173       clientDecompressors = clientDecompressors.with(clientCodec, true);
174     }
175     if (clientEncoding) {
176       clientCompressors.register(clientCodec);
177     }
178     if (serverAcceptEncoding) {
179       serverDecompressors = serverDecompressors.with(serverCodec, true);
180     }
181     if (serverEncoding) {
182       serverCompressors.register(serverCodec);
183     }
184 
185     server = ServerBuilder.forPort(0)
186         .addService(
187             ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor()))
188         .compressorRegistry(serverCompressors)
189         .decompressorRegistry(serverDecompressors)
190         .build()
191         .start();
192 
193     channel = Grpc.newChannelBuilder(
194             "localhost:" + server.getPort(), InsecureChannelCredentials.create())
195         .decompressorRegistry(clientDecompressors)
196         .compressorRegistry(clientCompressors)
197         .intercept(new ClientCompressorInterceptor())
198         .build();
199     stub = TestServiceGrpc.newBlockingStub(channel);
200 
201     stub.unaryCall(REQUEST);
202 
203     if (clientAcceptEncoding && serverEncoding) {
204       assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY));
205       if (enableServerMessageCompression) {
206         assertTrue(clientCodec.anyRead);
207         assertTrue(serverCodec.anyWritten);
208       } else {
209         assertFalse(clientCodec.anyRead);
210         assertFalse(serverCodec.anyWritten);
211       }
212     } else {
213       // Either identity or null is accepted.
214       assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY))
215           .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null);
216       assertFalse(clientCodec.anyRead);
217       assertFalse(serverCodec.anyWritten);
218     }
219 
220     if (serverAcceptEncoding) {
221       assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
222     } else {
223       assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
224     }
225 
226     if (clientAcceptEncoding) {
227       assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
228     } else {
229       assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY));
230     }
231 
232     // Second call, once the client knows what the server supports.
233     if (clientEncoding && serverAcceptEncoding) {
234       assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY));
235       if (enableClientMessageCompression) {
236         assertTrue(clientCodec.anyWritten);
237         assertTrue(serverCodec.anyRead);
238       } else {
239         assertFalse(clientCodec.anyWritten);
240         assertFalse(serverCodec.anyRead);
241       }
242     } else {
243       assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY));
244       assertFalse(clientCodec.anyWritten);
245       assertFalse(serverCodec.anyRead);
246     }
247   }
248 
249   private static final class LocalServer extends TestServiceGrpc.TestServiceImplBase {
250     @Override
unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver)251     public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
252       responseObserver.onNext(SimpleResponse.newBuilder()
253           .setPayload(Payload.newBuilder()
254               .setBody(ByteString.copyFrom(new byte[]{127})))
255           .build());
256       responseObserver.onCompleted();
257     }
258   }
259 
260   private class ServerCompressorInterceptor implements ServerInterceptor {
261     @Override
interceptCall( ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next)262     public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
263         ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
264       if (serverEncoding) {
265         call.setCompression("fzip");
266       }
267       call.setMessageCompression(enableServerMessageCompression);
268       Metadata headersCopy = new Metadata();
269       headersCopy.merge(headers);
270       serverResponseHeaders = headersCopy;
271       return next.startCall(call, headers);
272     }
273   }
274 
275   private class ClientCompressorInterceptor implements ClientInterceptor {
276     @Override
interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)277     public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
278         MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
279       if (clientEncoding && serverAcceptEncoding) {
280         callOptions = callOptions.withCompression("fzip");
281       }
282       ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
283 
284       return new ClientCompressor<>(call);
285     }
286   }
287 
288   private class ClientCompressor<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> {
ClientCompressor(ClientCall<ReqT, RespT> delegate)289     protected ClientCompressor(ClientCall<ReqT, RespT> delegate) {
290       super(delegate);
291     }
292 
293     @Override
start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers)294     public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
295       super.start(new ClientHeadersCapture<>(responseListener), headers);
296       setMessageCompression(enableClientMessageCompression);
297     }
298   }
299 
300   private class ClientHeadersCapture<RespT> extends SimpleForwardingClientCallListener<RespT> {
ClientHeadersCapture(Listener<RespT> delegate)301     private ClientHeadersCapture(Listener<RespT> delegate) {
302       super(delegate);
303     }
304 
305     @Override
onHeaders(Metadata headers)306     public void onHeaders(Metadata headers) {
307       super.onHeaders(headers);
308       Metadata headersCopy = new Metadata();
309       headersCopy.merge(headers);
310       clientResponseHeaders = headersCopy;
311     }
312   }
313 
assertEqualsString(String expected, byte[] actual)314   private static void assertEqualsString(String expected, byte[] actual) {
315     assertEquals(expected, new String(actual, Charset.forName("US-ASCII")));
316   }
317 }
318