• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static org.junit.Assert.assertEquals;
20 import static org.junit.Assert.assertTrue;
21 
22 import com.google.protobuf.ByteString;
23 import io.grpc.CallOptions;
24 import io.grpc.Channel;
25 import io.grpc.ClientCall;
26 import io.grpc.ClientInterceptor;
27 import io.grpc.Codec;
28 import io.grpc.CompressorRegistry;
29 import io.grpc.DecompressorRegistry;
30 import io.grpc.ForwardingClientCall;
31 import io.grpc.ForwardingClientCallListener;
32 import io.grpc.InsecureServerCredentials;
33 import io.grpc.Metadata;
34 import io.grpc.MethodDescriptor;
35 import io.grpc.ServerBuilder;
36 import io.grpc.ServerCall;
37 import io.grpc.ServerCall.Listener;
38 import io.grpc.ServerCallHandler;
39 import io.grpc.ServerInterceptor;
40 import io.grpc.internal.GrpcUtil;
41 import io.grpc.netty.InternalNettyChannelBuilder;
42 import io.grpc.netty.InternalNettyServerBuilder;
43 import io.grpc.netty.NettyChannelBuilder;
44 import io.grpc.netty.NettyServerBuilder;
45 import io.grpc.testing.integration.Messages.BoolValue;
46 import io.grpc.testing.integration.Messages.Payload;
47 import io.grpc.testing.integration.Messages.SimpleRequest;
48 import io.grpc.testing.integration.Messages.SimpleResponse;
49 import java.io.FilterInputStream;
50 import java.io.FilterOutputStream;
51 import java.io.IOException;
52 import java.io.InputStream;
53 import java.io.OutputStream;
54 import org.junit.Before;
55 import org.junit.BeforeClass;
56 import org.junit.Test;
57 import org.junit.runner.RunWith;
58 import org.junit.runners.JUnit4;
59 
60 /**
61  * Tests that compression is turned on.
62  */
63 @RunWith(JUnit4.class)
64 public class TransportCompressionTest extends AbstractInteropTest {
65 
66   // Masquerade as identity.
67   private static final Fzip FZIPPER = new Fzip("gzip", new Codec.Gzip());
68   private volatile boolean expectFzip;
69 
70   private static final DecompressorRegistry decompressors = DecompressorRegistry.emptyInstance()
71       .with(Codec.Identity.NONE, false)
72       .with(FZIPPER, true);
73   private static final CompressorRegistry compressors = CompressorRegistry.newEmptyInstance();
74 
75   @Before
beforeTests()76   public void beforeTests() {
77     FZIPPER.anyRead = false;
78     FZIPPER.anyWritten = false;
79   }
80 
81   @BeforeClass
registerCompressors()82   public static void registerCompressors() {
83     compressors.register(FZIPPER);
84     compressors.register(Codec.Identity.NONE);
85   }
86 
87   @Override
getServerBuilder()88   protected ServerBuilder<?> getServerBuilder() {
89     NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
90         .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
91         .compressorRegistry(compressors)
92         .decompressorRegistry(decompressors)
93         .intercept(new ServerInterceptor() {
94             @Override
95             public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
96                 Metadata headers, ServerCallHandler<ReqT, RespT> next) {
97               Listener<ReqT> listener = next.startCall(call, headers);
98               // TODO(carl-mastrangelo): check that encoding was set.
99               call.setMessageCompression(true);
100               return listener;
101             }
102           });
103     // Disable the default census stats tracer, use testing tracer instead.
104     InternalNettyServerBuilder.setStatsEnabled(builder, false);
105     return builder.addStreamTracerFactory(createCustomCensusTracerFactory());
106   }
107 
108   @Test
compresses()109   public void compresses() {
110     expectFzip = true;
111     final SimpleRequest request = SimpleRequest.newBuilder()
112         .setResponseSize(314159)
113         .setResponseCompressed(BoolValue.newBuilder().setValue(true))
114         .setPayload(Payload.newBuilder()
115             .setBody(ByteString.copyFrom(new byte[271828])))
116         .build();
117     final SimpleResponse goldenResponse = SimpleResponse.newBuilder()
118         .setPayload(Payload.newBuilder()
119             .setBody(ByteString.copyFrom(new byte[314159])))
120         .build();
121 
122 
123     assertEquals(goldenResponse, blockingStub.unaryCall(request));
124     // Assert that compression took place
125     assertTrue(FZIPPER.anyRead);
126     assertTrue(FZIPPER.anyWritten);
127   }
128 
129   @Override
createChannelBuilder()130   protected NettyChannelBuilder createChannelBuilder() {
131     NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
132         .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
133         .decompressorRegistry(decompressors)
134         .compressorRegistry(compressors)
135         .intercept(new ClientInterceptor() {
136           @Override
137           public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
138               MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
139             final ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
140             return new ForwardingClientCall<ReqT, RespT>() {
141 
142               @Override
143               protected ClientCall<ReqT, RespT> delegate() {
144                 return call;
145               }
146 
147               @Override
148               public void start(
149                   final ClientCall.Listener<RespT> responseListener, Metadata headers) {
150                 ClientCall.Listener<RespT> listener = new ForwardingClientCallListener<RespT>() {
151 
152                   @Override
153                   protected io.grpc.ClientCall.Listener<RespT> delegate() {
154                     return responseListener;
155                   }
156 
157                   @Override
158                   public void onHeaders(Metadata headers) {
159                     super.onHeaders(headers);
160                     if (expectFzip) {
161                       String encoding = headers.get(GrpcUtil.MESSAGE_ENCODING_KEY);
162                       assertEquals(encoding, FZIPPER.getMessageEncoding());
163                     }
164                   }
165                 };
166                 super.start(listener, headers);
167                 setMessageCompression(true);
168               }
169             };
170           }
171         })
172         .usePlaintext();
173     // Disable the default census stats interceptor, use testing interceptor instead.
174     InternalNettyChannelBuilder.setStatsEnabled(builder, false);
175     return builder.intercept(createCensusStatsClientInterceptor());
176   }
177 
178   /**
179    * Fzip is a custom compressor.
180    */
181   static class Fzip implements Codec {
182     volatile boolean anyRead;
183     volatile boolean anyWritten;
184     volatile Codec delegate;
185 
186     private final String actualName;
187 
Fzip(String actualName, Codec delegate)188     public Fzip(String actualName, Codec delegate) {
189       this.actualName = actualName;
190       this.delegate = delegate;
191     }
192 
193     @Override
getMessageEncoding()194     public String getMessageEncoding() {
195       return actualName;
196     }
197 
198     @Override
compress(OutputStream os)199     public OutputStream compress(OutputStream os) throws IOException {
200       return new FilterOutputStream(delegate.compress(os)) {
201         @Override
202         public void write(int b) throws IOException {
203           super.write(b);
204           anyWritten = true;
205         }
206       };
207     }
208 
209     @Override
decompress(InputStream is)210     public InputStream decompress(InputStream is) throws IOException {
211       return new FilterInputStream(delegate.decompress(is)) {
212         @Override
213         public int read() throws IOException {
214           int val = super.read();
215           anyRead = true;
216           return val;
217         }
218 
219         @Override
220         public int read(byte[] b, int off, int len) throws IOException {
221           int total = super.read(b, off, len);
222           anyRead = true;
223           return total;
224         }
225       };
226     }
227   }
228 }
229