• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.testing.integration;
18 
19 import static org.junit.Assert.assertTrue;
20 
21 import com.google.common.collect.ImmutableList;
22 import io.grpc.ManagedChannel;
23 import io.grpc.Server;
24 import io.grpc.ServerBuilder;
25 import io.grpc.ServerInterceptor;
26 import io.grpc.ServerInterceptors;
27 import io.grpc.netty.GrpcHttp2ConnectionHandler;
28 import io.grpc.netty.InternalNettyChannelBuilder;
29 import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory;
30 import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
31 import io.grpc.netty.InternalProtocolNegotiators;
32 import io.grpc.netty.NegotiationType;
33 import io.grpc.netty.NettyChannelBuilder;
34 import io.grpc.netty.NettyServerBuilder;
35 import io.grpc.stub.StreamObserver;
36 import io.grpc.testing.integration.Messages.ResponseParameters;
37 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
38 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
39 import io.netty.channel.ChannelHandler;
40 import io.netty.handler.codec.http2.Http2Stream;
41 import io.netty.util.AsciiString;
42 import java.io.IOException;
43 import java.net.InetSocketAddress;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.Executors;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicReference;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Ignore;
51 import org.junit.Test;
52 import org.junit.runner.RunWith;
53 import org.junit.runners.JUnit4;
54 
55 @RunWith(JUnit4.class)
56 public class NettyFlowControlTest {
57 
58   // in bytes
59   private static final int LOW_BAND = 2 * 1024 * 1024;
60   private static final int HIGH_BAND = 30 * 1024 * 1024;
61 
62   // in milliseconds
63   private static final int MED_LAT = 10;
64 
65   // in bytes
66   private static final int TINY_WINDOW = 1;
67   private static final int REGULAR_WINDOW = 64 * 1024;
68   private static final int MAX_WINDOW = 8 * 1024 * 1024;
69 
70   private final CapturingProtocolNegotiationFactory capturingPnFactory
71       = new CapturingProtocolNegotiationFactory();
72   private ManagedChannel channel;
73   private Server server;
74   private TrafficControlProxy proxy;
75 
76   private int proxyPort;
77   private int serverPort;
78 
79   @Before
initTest()80   public void initTest() {
81     startServer(REGULAR_WINDOW);
82     serverPort = server.getPort();
83   }
84 
85   @After
endTest()86   public void endTest() throws IOException {
87     if (proxy != null) {
88       proxy.shutDown();
89     }
90     server.shutdownNow();
91     if (channel != null) {
92       channel.shutdownNow();
93     }
94   }
95 
96   @Test
largeBdp()97   public void largeBdp() throws InterruptedException, IOException {
98     proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
99     proxy.start();
100     proxyPort = proxy.getPort();
101     createAndStartChannel(REGULAR_WINDOW);
102     doTest(HIGH_BAND, MED_LAT);
103   }
104 
105   @Test
smallBdp()106   public void smallBdp() throws InterruptedException, IOException {
107     proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS);
108     proxy.start();
109     proxyPort = proxy.getPort();
110     createAndStartChannel(REGULAR_WINDOW);
111     doTest(LOW_BAND, MED_LAT);
112   }
113 
114   @Test
115   @Ignore("enable once 2 pings between data is no longer necessary")
verySmallWindowMakesProgress()116   public void verySmallWindowMakesProgress() throws InterruptedException, IOException {
117     proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS);
118     proxy.start();
119     proxyPort = proxy.getPort();
120     createAndStartChannel(TINY_WINDOW);
121     doTest(HIGH_BAND, MED_LAT);
122   }
123 
124   /**
125    * Main testing method. Streams 2 MB of data from a server and records the final window and
126    * average bandwidth usage.
127    */
doTest(int bandwidth, int latency)128   private void doTest(int bandwidth, int latency) throws InterruptedException {
129 
130     int streamSize = 1 * 1024 * 1024;
131     long expectedWindow = latency * (bandwidth / TimeUnit.SECONDS.toMillis(1));
132 
133     TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel);
134     StreamingOutputCallRequest.Builder builder = StreamingOutputCallRequest.newBuilder()
135         .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 16))
136         .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 16))
137         .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 8))
138         .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 4))
139         .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2));
140     StreamingOutputCallRequest request = builder.build();
141 
142     TestStreamObserver observer =
143         new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow);
144     stub.streamingOutputCall(request, observer);
145     int lastWindow = observer.waitFor(5, TimeUnit.SECONDS);
146 
147     // deal with cases that either don't cause a window update or hit max window
148     expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW));
149 
150     // Range looks large, but this allows for only one extra/missed window update plus
151     // bdpPing variations.
152     // (one extra update causes a 2x difference and one missed update causes a .5x difference)
153     assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
154         lastWindow < 2.2 * expectedWindow);
155     assertTrue("Window was " + lastWindow + " expecting " + expectedWindow,
156         expectedWindow < 2 * lastWindow);
157   }
158 
159   /**
160    * Resets client/server and their flow control windows.
161    */
createAndStartChannel(int clientFlowControlWindow)162   private void createAndStartChannel(int clientFlowControlWindow) {
163     NettyChannelBuilder channelBuilder =
164         NettyChannelBuilder
165             .forAddress(new InetSocketAddress("localhost", proxyPort))
166             .initialFlowControlWindow(clientFlowControlWindow)
167             .negotiationType(NegotiationType.PLAINTEXT);
168     InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory);
169     channel = channelBuilder.build();
170   }
171 
startServer(int serverFlowControlWindow)172   private void startServer(int serverFlowControlWindow) {
173     ServerBuilder<?> builder =
174         NettyServerBuilder
175             .forAddress(new InetSocketAddress("localhost", 0))
176             .initialFlowControlWindow(serverFlowControlWindow);
177     builder.addService(ServerInterceptors.intercept(
178         new TestServiceImpl(Executors.newScheduledThreadPool(2)),
179         ImmutableList.<ServerInterceptor>of()));
180     try {
181       server = builder.build().start();
182     } catch (IOException e) {
183       throw new RuntimeException(e);
184     }
185   }
186 
187   /**
188    * Simple stream observer to measure elapsed time of the call.
189    */
190   private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> {
191 
192     final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef;
193     final CountDownLatch latch = new CountDownLatch(1);
194     final long expectedWindow;
195     int lastWindow;
196     boolean wasCompleted;
197 
TestStreamObserver( AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window)198     public TestStreamObserver(
199         AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) {
200       this.grpcHandlerRef = grpcHandlerRef;
201       expectedWindow = window;
202     }
203 
204     @Override
onNext(StreamingOutputCallResponse value)205     public void onNext(StreamingOutputCallResponse value) {
206       GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get();
207       Http2Stream connectionStream = grpcHandler.connection().connectionStream();
208       int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream);
209       synchronized (this) {
210         if (curWindow >= expectedWindow) {
211           if (wasCompleted) {
212             return;
213           }
214           wasCompleted = true;
215           lastWindow = curWindow;
216           onCompleted();
217         } else if (!wasCompleted) {
218           lastWindow = curWindow;
219         }
220       }
221     }
222 
223     @Override
onError(Throwable t)224     public void onError(Throwable t) {
225       latch.countDown();
226       throw new RuntimeException(t);
227     }
228 
229     @Override
onCompleted()230     public void onCompleted() {
231       latch.countDown();
232     }
233 
waitFor(long duration, TimeUnit unit)234     public int waitFor(long duration, TimeUnit unit) throws InterruptedException {
235       latch.await(duration, unit);
236       return lastWindow;
237     }
238   }
239 
240   private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory {
241 
242     AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>();
243 
244     @Override
buildProtocolNegotiator()245     public ProtocolNegotiator buildProtocolNegotiator() {
246       return new CapturingProtocolNegotiator();
247     }
248 
249     private class CapturingProtocolNegotiator implements ProtocolNegotiator {
250 
251       final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext();
252 
253       @Override
scheme()254       public AsciiString scheme() {
255         return delegate.scheme();
256       }
257 
258       @Override
newHandler(GrpcHttp2ConnectionHandler grpcHandler)259       public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
260         CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler);
261         return delegate.newHandler(grpcHandler);
262       }
263 
264       @Override
close()265       public void close() {
266         delegate.close();
267       }
268     }
269   }
270 }
271