1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; 21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNull; 25 import static org.junit.Assert.assertTrue; 26 27 import com.google.protobuf.ByteString; 28 import io.grpc.CallOptions; 29 import io.grpc.Channel; 30 import io.grpc.ClientCall; 31 import io.grpc.ClientCall.Listener; 32 import io.grpc.ClientInterceptor; 33 import io.grpc.Codec; 34 import io.grpc.CompressorRegistry; 35 import io.grpc.DecompressorRegistry; 36 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; 37 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; 38 import io.grpc.ManagedChannel; 39 import io.grpc.ManagedChannelBuilder; 40 import io.grpc.Metadata; 41 import io.grpc.MethodDescriptor; 42 import io.grpc.Server; 43 import io.grpc.ServerBuilder; 44 import io.grpc.ServerCall; 45 import io.grpc.ServerCallHandler; 46 import io.grpc.ServerInterceptor; 47 import io.grpc.ServerInterceptors; 48 import io.grpc.stub.StreamObserver; 49 import io.grpc.testing.integration.Messages.Payload; 50 import io.grpc.testing.integration.Messages.SimpleRequest; 51 import io.grpc.testing.integration.Messages.SimpleResponse; 52 import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub; 53 import io.grpc.testing.integration.TransportCompressionTest.Fzip; 54 import java.nio.charset.Charset; 55 import java.util.ArrayList; 56 import java.util.Collection; 57 import java.util.List; 58 import java.util.concurrent.Executors; 59 import java.util.concurrent.ScheduledExecutorService; 60 import org.junit.After; 61 import org.junit.Before; 62 import org.junit.Test; 63 import org.junit.runner.RunWith; 64 import org.junit.runners.Parameterized; 65 import org.junit.runners.Parameterized.Parameters; 66 67 /** 68 * Tests for compression configurations. 69 * 70 * <p>Because of the asymmetry of clients and servers, clients will not know what decompression 71 * methods the server supports. In cases where the client is willing to encode, and the server 72 * is willing to decode, a second RPC is sent to show that the client has learned and will use 73 * the encoding. 74 * 75 * <p>In cases where compression is negotiated, but either the client or the server doesn't 76 * actually want to encode, a dummy codec is used to record usage. If compression is not enabled, 77 * the codec will see no data pass through. This is checked on each test to ensure the code is 78 * doing the right thing. 79 */ 80 @RunWith(Parameterized.class) 81 public class CompressionTest { 82 private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 83 // Ensures that both the request and response messages are more than 0 bytes. The framer/deframer 84 // may not use the compressor if the message is empty. 85 private static final SimpleRequest REQUEST = SimpleRequest.newBuilder() 86 .setResponseSize(1) 87 .build(); 88 89 private Fzip clientCodec = new Fzip("fzip", Codec.Identity.NONE); 90 private Fzip serverCodec = new Fzip("fzip", Codec.Identity.NONE); 91 private DecompressorRegistry clientDecompressors = DecompressorRegistry.emptyInstance(); 92 private DecompressorRegistry serverDecompressors = DecompressorRegistry.emptyInstance(); 93 private CompressorRegistry clientCompressors = CompressorRegistry.newEmptyInstance(); 94 private CompressorRegistry serverCompressors = CompressorRegistry.newEmptyInstance(); 95 96 /** The headers received by the server from the client. */ 97 private volatile Metadata serverResponseHeaders; 98 /** The headers received by the client from the server. */ 99 private volatile Metadata clientResponseHeaders; 100 101 // Params 102 private final boolean enableClientMessageCompression; 103 private final boolean enableServerMessageCompression; 104 private final boolean clientAcceptEncoding; 105 private final boolean clientEncoding; 106 private final boolean serverAcceptEncoding; 107 private final boolean serverEncoding; 108 109 private Server server; 110 private ManagedChannel channel; 111 private TestServiceBlockingStub stub; 112 113 /** 114 * Auto called by test. 115 */ CompressionTest( boolean enableClientMessageCompression, boolean clientAcceptEncoding, boolean clientEncoding, boolean enableServerMessageCompression, boolean serverAcceptEncoding, boolean serverEncoding)116 public CompressionTest( 117 boolean enableClientMessageCompression, 118 boolean clientAcceptEncoding, 119 boolean clientEncoding, 120 boolean enableServerMessageCompression, 121 boolean serverAcceptEncoding, 122 boolean serverEncoding) { 123 this.enableClientMessageCompression = enableClientMessageCompression; 124 this.clientAcceptEncoding = clientAcceptEncoding; 125 this.clientEncoding = clientEncoding; 126 this.enableServerMessageCompression = enableServerMessageCompression; 127 this.serverAcceptEncoding = serverAcceptEncoding; 128 this.serverEncoding = serverEncoding; 129 } 130 131 @Before setUp()132 public void setUp() throws Exception { 133 clientDecompressors = clientDecompressors.with(Codec.Identity.NONE, false); 134 serverDecompressors = serverDecompressors.with(Codec.Identity.NONE, false); 135 } 136 137 @After tearDown()138 public void tearDown() { 139 channel.shutdownNow(); 140 server.shutdownNow(); 141 executor.shutdownNow(); 142 } 143 144 /** 145 * Parameters for test. 146 */ 147 @Parameters params()148 public static Collection<Object[]> params() { 149 boolean[] bools = new boolean[]{false, true}; 150 List<Object[]> combos = new ArrayList<Object[]>(64); 151 for (boolean enableClientMessageCompression : bools) { 152 for (boolean clientAcceptEncoding : bools) { 153 for (boolean clientEncoding : bools) { 154 for (boolean enableServerMessageCompression : bools) { 155 for (boolean serverAcceptEncoding : bools) { 156 for (boolean serverEncoding : bools) { 157 combos.add(new Object[] { 158 enableClientMessageCompression, clientAcceptEncoding, clientEncoding, 159 enableServerMessageCompression, serverAcceptEncoding, serverEncoding}); 160 } 161 } 162 } 163 } 164 } 165 } 166 return combos; 167 } 168 169 @Test compression()170 public void compression() throws Exception { 171 if (clientAcceptEncoding) { 172 clientDecompressors = clientDecompressors.with(clientCodec, true); 173 } 174 if (clientEncoding) { 175 clientCompressors.register(clientCodec); 176 } 177 if (serverAcceptEncoding) { 178 serverDecompressors = serverDecompressors.with(serverCodec, true); 179 } 180 if (serverEncoding) { 181 serverCompressors.register(serverCodec); 182 } 183 184 server = ServerBuilder.forPort(0) 185 .addService( 186 ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor())) 187 .compressorRegistry(serverCompressors) 188 .decompressorRegistry(serverDecompressors) 189 .build() 190 .start(); 191 192 channel = ManagedChannelBuilder.forAddress("localhost", server.getPort()) 193 .decompressorRegistry(clientDecompressors) 194 .compressorRegistry(clientCompressors) 195 .intercept(new ClientCompressorInterceptor()) 196 .usePlaintext() 197 .build(); 198 stub = TestServiceGrpc.newBlockingStub(channel); 199 200 stub.unaryCall(REQUEST); 201 202 if (clientAcceptEncoding && serverEncoding) { 203 assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY)); 204 if (enableServerMessageCompression) { 205 assertTrue(clientCodec.anyRead); 206 assertTrue(serverCodec.anyWritten); 207 } else { 208 assertFalse(clientCodec.anyRead); 209 assertFalse(serverCodec.anyWritten); 210 } 211 } else { 212 // Either identity or null is accepted. 213 assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY)) 214 .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null); 215 assertFalse(clientCodec.anyRead); 216 assertFalse(serverCodec.anyWritten); 217 } 218 219 if (serverAcceptEncoding) { 220 assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 221 } else { 222 assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 223 } 224 225 if (clientAcceptEncoding) { 226 assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 227 } else { 228 assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 229 } 230 231 // Second call, once the client knows what the server supports. 232 if (clientEncoding && serverAcceptEncoding) { 233 assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); 234 if (enableClientMessageCompression) { 235 assertTrue(clientCodec.anyWritten); 236 assertTrue(serverCodec.anyRead); 237 } else { 238 assertFalse(clientCodec.anyWritten); 239 assertFalse(serverCodec.anyRead); 240 } 241 } else { 242 assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); 243 assertFalse(clientCodec.anyWritten); 244 assertFalse(serverCodec.anyRead); 245 } 246 } 247 248 private static final class LocalServer extends TestServiceGrpc.TestServiceImplBase { 249 @Override unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver)250 public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) { 251 responseObserver.onNext(SimpleResponse.newBuilder() 252 .setPayload(Payload.newBuilder() 253 .setBody(ByteString.copyFrom(new byte[]{127}))) 254 .build()); 255 responseObserver.onCompleted(); 256 } 257 } 258 259 private class ServerCompressorInterceptor implements ServerInterceptor { 260 @Override interceptCall( ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next)261 public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall( 262 ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { 263 if (serverEncoding) { 264 call.setCompression("fzip"); 265 } 266 call.setMessageCompression(enableServerMessageCompression); 267 Metadata headersCopy = new Metadata(); 268 headersCopy.merge(headers); 269 serverResponseHeaders = headersCopy; 270 return next.startCall(call, headers); 271 } 272 } 273 274 private class ClientCompressorInterceptor implements ClientInterceptor { 275 @Override interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)276 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 277 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 278 if (clientEncoding && serverAcceptEncoding) { 279 callOptions = callOptions.withCompression("fzip"); 280 } 281 ClientCall<ReqT, RespT> call = next.newCall(method, callOptions); 282 283 return new ClientCompressor<ReqT, RespT>(call); 284 } 285 } 286 287 private class ClientCompressor<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> { ClientCompressor(ClientCall<ReqT, RespT> delegate)288 protected ClientCompressor(ClientCall<ReqT, RespT> delegate) { 289 super(delegate); 290 } 291 292 @Override start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers)293 public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) { 294 super.start(new ClientHeadersCapture<RespT>(responseListener), headers); 295 setMessageCompression(enableClientMessageCompression); 296 } 297 } 298 299 private class ClientHeadersCapture<RespT> extends SimpleForwardingClientCallListener<RespT> { ClientHeadersCapture(Listener<RespT> delegate)300 private ClientHeadersCapture(Listener<RespT> delegate) { 301 super(delegate); 302 } 303 304 @Override onHeaders(Metadata headers)305 public void onHeaders(Metadata headers) { 306 super.onHeaders(headers); 307 Metadata headersCopy = new Metadata(); 308 headersCopy.merge(headers); 309 clientResponseHeaders = headersCopy; 310 } 311 } 312 assertEqualsString(String expected, byte[] actual)313 private static void assertEqualsString(String expected, byte[] actual) { 314 assertEquals(expected, new String(actual, Charset.forName("US-ASCII"))); 315 } 316 } 317