1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static com.google.common.truth.Truth.assertThat; 20 import static io.grpc.internal.GrpcUtil.MESSAGE_ACCEPT_ENCODING_KEY; 21 import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY; 22 import static org.junit.Assert.assertEquals; 23 import static org.junit.Assert.assertFalse; 24 import static org.junit.Assert.assertNull; 25 import static org.junit.Assert.assertTrue; 26 27 import com.google.protobuf.ByteString; 28 import io.grpc.CallOptions; 29 import io.grpc.Channel; 30 import io.grpc.ClientCall; 31 import io.grpc.ClientCall.Listener; 32 import io.grpc.ClientInterceptor; 33 import io.grpc.Codec; 34 import io.grpc.CompressorRegistry; 35 import io.grpc.DecompressorRegistry; 36 import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; 37 import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; 38 import io.grpc.Grpc; 39 import io.grpc.InsecureChannelCredentials; 40 import io.grpc.ManagedChannel; 41 import io.grpc.Metadata; 42 import io.grpc.MethodDescriptor; 43 import io.grpc.Server; 44 import io.grpc.ServerBuilder; 45 import io.grpc.ServerCall; 46 import io.grpc.ServerCallHandler; 47 import io.grpc.ServerInterceptor; 48 import io.grpc.ServerInterceptors; 49 import io.grpc.stub.StreamObserver; 50 import io.grpc.testing.integration.Messages.Payload; 51 import io.grpc.testing.integration.Messages.SimpleRequest; 52 import io.grpc.testing.integration.Messages.SimpleResponse; 53 import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub; 54 import io.grpc.testing.integration.TransportCompressionTest.Fzip; 55 import java.nio.charset.Charset; 56 import java.util.ArrayList; 57 import java.util.Collection; 58 import java.util.List; 59 import java.util.concurrent.Executors; 60 import java.util.concurrent.ScheduledExecutorService; 61 import org.junit.After; 62 import org.junit.Before; 63 import org.junit.Test; 64 import org.junit.runner.RunWith; 65 import org.junit.runners.Parameterized; 66 import org.junit.runners.Parameterized.Parameters; 67 68 /** 69 * Tests for compression configurations. 70 * 71 * <p>Because of the asymmetry of clients and servers, clients will not know what decompression 72 * methods the server supports. In cases where the client is willing to encode, and the server 73 * is willing to decode, a second RPC is sent to show that the client has learned and will use 74 * the encoding. 75 * 76 * <p>In cases where compression is negotiated, but either the client or the server doesn't 77 * actually want to encode, a dummy codec is used to record usage. If compression is not enabled, 78 * the codec will see no data pass through. This is checked on each test to ensure the code is 79 * doing the right thing. 80 */ 81 @RunWith(Parameterized.class) 82 public class CompressionTest { 83 private static final ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 84 // Ensures that both the request and response messages are more than 0 bytes. The framer/deframer 85 // may not use the compressor if the message is empty. 86 private static final SimpleRequest REQUEST = SimpleRequest.newBuilder() 87 .setResponseSize(1) 88 .build(); 89 90 private Fzip clientCodec = new Fzip("fzip", Codec.Identity.NONE); 91 private Fzip serverCodec = new Fzip("fzip", Codec.Identity.NONE); 92 private DecompressorRegistry clientDecompressors = DecompressorRegistry.emptyInstance(); 93 private DecompressorRegistry serverDecompressors = DecompressorRegistry.emptyInstance(); 94 private CompressorRegistry clientCompressors = CompressorRegistry.newEmptyInstance(); 95 private CompressorRegistry serverCompressors = CompressorRegistry.newEmptyInstance(); 96 97 /** The headers received by the server from the client. */ 98 private volatile Metadata serverResponseHeaders; 99 /** The headers received by the client from the server. */ 100 private volatile Metadata clientResponseHeaders; 101 102 // Params 103 private final boolean enableClientMessageCompression; 104 private final boolean enableServerMessageCompression; 105 private final boolean clientAcceptEncoding; 106 private final boolean clientEncoding; 107 private final boolean serverAcceptEncoding; 108 private final boolean serverEncoding; 109 110 private Server server; 111 private ManagedChannel channel; 112 private TestServiceBlockingStub stub; 113 114 /** 115 * Auto called by test. 116 */ CompressionTest( boolean enableClientMessageCompression, boolean clientAcceptEncoding, boolean clientEncoding, boolean enableServerMessageCompression, boolean serverAcceptEncoding, boolean serverEncoding)117 public CompressionTest( 118 boolean enableClientMessageCompression, 119 boolean clientAcceptEncoding, 120 boolean clientEncoding, 121 boolean enableServerMessageCompression, 122 boolean serverAcceptEncoding, 123 boolean serverEncoding) { 124 this.enableClientMessageCompression = enableClientMessageCompression; 125 this.clientAcceptEncoding = clientAcceptEncoding; 126 this.clientEncoding = clientEncoding; 127 this.enableServerMessageCompression = enableServerMessageCompression; 128 this.serverAcceptEncoding = serverAcceptEncoding; 129 this.serverEncoding = serverEncoding; 130 } 131 132 @Before setUp()133 public void setUp() throws Exception { 134 clientDecompressors = clientDecompressors.with(Codec.Identity.NONE, false); 135 serverDecompressors = serverDecompressors.with(Codec.Identity.NONE, false); 136 } 137 138 @After tearDown()139 public void tearDown() { 140 channel.shutdownNow(); 141 server.shutdownNow(); 142 executor.shutdownNow(); 143 } 144 145 /** 146 * Parameters for test. 147 */ 148 @Parameters params()149 public static Collection<Object[]> params() { 150 boolean[] bools = new boolean[]{false, true}; 151 List<Object[]> combos = new ArrayList<>(64); 152 for (boolean enableClientMessageCompression : bools) { 153 for (boolean clientAcceptEncoding : bools) { 154 for (boolean clientEncoding : bools) { 155 for (boolean enableServerMessageCompression : bools) { 156 for (boolean serverAcceptEncoding : bools) { 157 for (boolean serverEncoding : bools) { 158 combos.add(new Object[] { 159 enableClientMessageCompression, clientAcceptEncoding, clientEncoding, 160 enableServerMessageCompression, serverAcceptEncoding, serverEncoding}); 161 } 162 } 163 } 164 } 165 } 166 } 167 return combos; 168 } 169 170 @Test compression()171 public void compression() throws Exception { 172 if (clientAcceptEncoding) { 173 clientDecompressors = clientDecompressors.with(clientCodec, true); 174 } 175 if (clientEncoding) { 176 clientCompressors.register(clientCodec); 177 } 178 if (serverAcceptEncoding) { 179 serverDecompressors = serverDecompressors.with(serverCodec, true); 180 } 181 if (serverEncoding) { 182 serverCompressors.register(serverCodec); 183 } 184 185 server = ServerBuilder.forPort(0) 186 .addService( 187 ServerInterceptors.intercept(new LocalServer(), new ServerCompressorInterceptor())) 188 .compressorRegistry(serverCompressors) 189 .decompressorRegistry(serverDecompressors) 190 .build() 191 .start(); 192 193 channel = Grpc.newChannelBuilder( 194 "localhost:" + server.getPort(), InsecureChannelCredentials.create()) 195 .decompressorRegistry(clientDecompressors) 196 .compressorRegistry(clientCompressors) 197 .intercept(new ClientCompressorInterceptor()) 198 .build(); 199 stub = TestServiceGrpc.newBlockingStub(channel); 200 201 stub.unaryCall(REQUEST); 202 203 if (clientAcceptEncoding && serverEncoding) { 204 assertEquals("fzip", clientResponseHeaders.get(MESSAGE_ENCODING_KEY)); 205 if (enableServerMessageCompression) { 206 assertTrue(clientCodec.anyRead); 207 assertTrue(serverCodec.anyWritten); 208 } else { 209 assertFalse(clientCodec.anyRead); 210 assertFalse(serverCodec.anyWritten); 211 } 212 } else { 213 // Either identity or null is accepted. 214 assertThat(clientResponseHeaders.get(MESSAGE_ENCODING_KEY)) 215 .isAnyOf(Codec.Identity.NONE.getMessageEncoding(), null); 216 assertFalse(clientCodec.anyRead); 217 assertFalse(serverCodec.anyWritten); 218 } 219 220 if (serverAcceptEncoding) { 221 assertEqualsString("fzip", clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 222 } else { 223 assertNull(clientResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 224 } 225 226 if (clientAcceptEncoding) { 227 assertEqualsString("fzip", serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 228 } else { 229 assertNull(serverResponseHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY)); 230 } 231 232 // Second call, once the client knows what the server supports. 233 if (clientEncoding && serverAcceptEncoding) { 234 assertEquals("fzip", serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); 235 if (enableClientMessageCompression) { 236 assertTrue(clientCodec.anyWritten); 237 assertTrue(serverCodec.anyRead); 238 } else { 239 assertFalse(clientCodec.anyWritten); 240 assertFalse(serverCodec.anyRead); 241 } 242 } else { 243 assertNull(serverResponseHeaders.get(MESSAGE_ENCODING_KEY)); 244 assertFalse(clientCodec.anyWritten); 245 assertFalse(serverCodec.anyRead); 246 } 247 } 248 249 private static final class LocalServer extends TestServiceGrpc.TestServiceImplBase { 250 @Override unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver)251 public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) { 252 responseObserver.onNext(SimpleResponse.newBuilder() 253 .setPayload(Payload.newBuilder() 254 .setBody(ByteString.copyFrom(new byte[]{127}))) 255 .build()); 256 responseObserver.onCompleted(); 257 } 258 } 259 260 private class ServerCompressorInterceptor implements ServerInterceptor { 261 @Override interceptCall( ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next)262 public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall( 263 ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) { 264 if (serverEncoding) { 265 call.setCompression("fzip"); 266 } 267 call.setMessageCompression(enableServerMessageCompression); 268 Metadata headersCopy = new Metadata(); 269 headersCopy.merge(headers); 270 serverResponseHeaders = headersCopy; 271 return next.startCall(call, headers); 272 } 273 } 274 275 private class ClientCompressorInterceptor implements ClientInterceptor { 276 @Override interceptCall( MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next)277 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 278 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 279 if (clientEncoding && serverAcceptEncoding) { 280 callOptions = callOptions.withCompression("fzip"); 281 } 282 ClientCall<ReqT, RespT> call = next.newCall(method, callOptions); 283 284 return new ClientCompressor<>(call); 285 } 286 } 287 288 private class ClientCompressor<ReqT, RespT> extends SimpleForwardingClientCall<ReqT, RespT> { ClientCompressor(ClientCall<ReqT, RespT> delegate)289 protected ClientCompressor(ClientCall<ReqT, RespT> delegate) { 290 super(delegate); 291 } 292 293 @Override start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers)294 public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) { 295 super.start(new ClientHeadersCapture<>(responseListener), headers); 296 setMessageCompression(enableClientMessageCompression); 297 } 298 } 299 300 private class ClientHeadersCapture<RespT> extends SimpleForwardingClientCallListener<RespT> { ClientHeadersCapture(Listener<RespT> delegate)301 private ClientHeadersCapture(Listener<RespT> delegate) { 302 super(delegate); 303 } 304 305 @Override onHeaders(Metadata headers)306 public void onHeaders(Metadata headers) { 307 super.onHeaders(headers); 308 Metadata headersCopy = new Metadata(); 309 headersCopy.merge(headers); 310 clientResponseHeaders = headersCopy; 311 } 312 } 313 assertEqualsString(String expected, byte[] actual)314 private static void assertEqualsString(String expected, byte[] actual) { 315 assertEquals(expected, new String(actual, Charset.forName("US-ASCII"))); 316 } 317 } 318