1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static org.junit.Assert.assertEquals; 20 import static org.junit.Assert.assertTrue; 21 22 import com.google.protobuf.ByteString; 23 import io.grpc.CallOptions; 24 import io.grpc.Channel; 25 import io.grpc.ClientCall; 26 import io.grpc.ClientInterceptor; 27 import io.grpc.Codec; 28 import io.grpc.CompressorRegistry; 29 import io.grpc.DecompressorRegistry; 30 import io.grpc.ForwardingClientCall; 31 import io.grpc.ForwardingClientCallListener; 32 import io.grpc.InsecureServerCredentials; 33 import io.grpc.Metadata; 34 import io.grpc.MethodDescriptor; 35 import io.grpc.ServerBuilder; 36 import io.grpc.ServerCall; 37 import io.grpc.ServerCall.Listener; 38 import io.grpc.ServerCallHandler; 39 import io.grpc.ServerInterceptor; 40 import io.grpc.internal.GrpcUtil; 41 import io.grpc.netty.InternalNettyChannelBuilder; 42 import io.grpc.netty.InternalNettyServerBuilder; 43 import io.grpc.netty.NettyChannelBuilder; 44 import io.grpc.netty.NettyServerBuilder; 45 import io.grpc.testing.integration.Messages.BoolValue; 46 import io.grpc.testing.integration.Messages.Payload; 47 import io.grpc.testing.integration.Messages.SimpleRequest; 48 import io.grpc.testing.integration.Messages.SimpleResponse; 49 import java.io.FilterInputStream; 50 import java.io.FilterOutputStream; 51 import java.io.IOException; 52 import java.io.InputStream; 53 import java.io.OutputStream; 54 import org.junit.Before; 55 import org.junit.BeforeClass; 56 import org.junit.Test; 57 import org.junit.runner.RunWith; 58 import org.junit.runners.JUnit4; 59 60 /** 61 * Tests that compression is turned on. 62 */ 63 @RunWith(JUnit4.class) 64 public class TransportCompressionTest extends AbstractInteropTest { 65 66 // Masquerade as identity. 67 private static final Fzip FZIPPER = new Fzip("gzip", new Codec.Gzip()); 68 private volatile boolean expectFzip; 69 70 private static final DecompressorRegistry decompressors = DecompressorRegistry.emptyInstance() 71 .with(Codec.Identity.NONE, false) 72 .with(FZIPPER, true); 73 private static final CompressorRegistry compressors = CompressorRegistry.newEmptyInstance(); 74 75 @Before beforeTests()76 public void beforeTests() { 77 FZIPPER.anyRead = false; 78 FZIPPER.anyWritten = false; 79 } 80 81 @BeforeClass registerCompressors()82 public static void registerCompressors() { 83 compressors.register(FZIPPER); 84 compressors.register(Codec.Identity.NONE); 85 } 86 87 @Override getServerBuilder()88 protected ServerBuilder<?> getServerBuilder() { 89 NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) 90 .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) 91 .compressorRegistry(compressors) 92 .decompressorRegistry(decompressors) 93 .intercept(new ServerInterceptor() { 94 @Override 95 public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, 96 Metadata headers, ServerCallHandler<ReqT, RespT> next) { 97 Listener<ReqT> listener = next.startCall(call, headers); 98 // TODO(carl-mastrangelo): check that encoding was set. 99 call.setMessageCompression(true); 100 return listener; 101 } 102 }); 103 // Disable the default census stats tracer, use testing tracer instead. 104 InternalNettyServerBuilder.setStatsEnabled(builder, false); 105 return builder.addStreamTracerFactory(createCustomCensusTracerFactory()); 106 } 107 108 @Test compresses()109 public void compresses() { 110 expectFzip = true; 111 final SimpleRequest request = SimpleRequest.newBuilder() 112 .setResponseSize(314159) 113 .setResponseCompressed(BoolValue.newBuilder().setValue(true)) 114 .setPayload(Payload.newBuilder() 115 .setBody(ByteString.copyFrom(new byte[271828]))) 116 .build(); 117 final SimpleResponse goldenResponse = SimpleResponse.newBuilder() 118 .setPayload(Payload.newBuilder() 119 .setBody(ByteString.copyFrom(new byte[314159]))) 120 .build(); 121 122 123 assertEquals(goldenResponse, blockingStub.unaryCall(request)); 124 // Assert that compression took place 125 assertTrue(FZIPPER.anyRead); 126 assertTrue(FZIPPER.anyWritten); 127 } 128 129 @Override createChannelBuilder()130 protected NettyChannelBuilder createChannelBuilder() { 131 NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) 132 .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) 133 .decompressorRegistry(decompressors) 134 .compressorRegistry(compressors) 135 .intercept(new ClientInterceptor() { 136 @Override 137 public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall( 138 MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) { 139 final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions); 140 return new ForwardingClientCall<ReqT, RespT>() { 141 142 @Override 143 protected ClientCall<ReqT, RespT> delegate() { 144 return call; 145 } 146 147 @Override 148 public void start( 149 final ClientCall.Listener<RespT> responseListener, Metadata headers) { 150 ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() { 151 152 @Override 153 protected io.grpc.ClientCall.Listener<RespT> delegate() { 154 return responseListener; 155 } 156 157 @Override 158 public void onHeaders(Metadata headers) { 159 super.onHeaders(headers); 160 if (expectFzip) { 161 String encoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY); 162 assertEquals(encoding, FZIPPER.getMessageEncoding()); 163 } 164 } 165 }; 166 super.start(listener, headers); 167 setMessageCompression(true); 168 } 169 }; 170 } 171 }) 172 .usePlaintext(); 173 // Disable the default census stats interceptor, use testing interceptor instead. 174 InternalNettyChannelBuilder.setStatsEnabled(builder, false); 175 return builder.intercept(createCensusStatsClientInterceptor()); 176 } 177 178 /** 179 * Fzip is a custom compressor. 180 */ 181 static class Fzip implements Codec { 182 volatile boolean anyRead; 183 volatile boolean anyWritten; 184 volatile Codec delegate; 185 186 private final String actualName; 187 Fzip(String actualName, Codec delegate)188 public Fzip(String actualName, Codec delegate) { 189 this.actualName = actualName; 190 this.delegate = delegate; 191 } 192 193 @Override getMessageEncoding()194 public String getMessageEncoding() { 195 return actualName; 196 } 197 198 @Override compress(OutputStream os)199 public OutputStream compress(OutputStream os) throws IOException { 200 return new FilterOutputStream(delegate.compress(os)) { 201 @Override 202 public void write(int b) throws IOException { 203 super.write(b); 204 anyWritten = true; 205 } 206 }; 207 } 208 209 @Override decompress(InputStream is)210 public InputStream decompress(InputStream is) throws IOException { 211 return new FilterInputStream(delegate.decompress(is)) { 212 @Override 213 public int read() throws IOException { 214 int val = super.read(); 215 anyRead = true; 216 return val; 217 } 218 219 @Override 220 public int read(byte[] b, int off, int len) throws IOException { 221 int total = super.read(b, off, len); 222 anyRead = true; 223 return total; 224 } 225 }; 226 } 227 } 228 } 229