1 /* 2 * Copyright 2016 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.testing.integration; 18 19 import static org.junit.Assert.assertTrue; 20 21 import com.google.common.collect.ImmutableList; 22 import io.grpc.ManagedChannel; 23 import io.grpc.Server; 24 import io.grpc.ServerBuilder; 25 import io.grpc.ServerInterceptor; 26 import io.grpc.ServerInterceptors; 27 import io.grpc.netty.GrpcHttp2ConnectionHandler; 28 import io.grpc.netty.InternalNettyChannelBuilder; 29 import io.grpc.netty.InternalNettyChannelBuilder.ProtocolNegotiatorFactory; 30 import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator; 31 import io.grpc.netty.InternalProtocolNegotiators; 32 import io.grpc.netty.NegotiationType; 33 import io.grpc.netty.NettyChannelBuilder; 34 import io.grpc.netty.NettyServerBuilder; 35 import io.grpc.stub.StreamObserver; 36 import io.grpc.testing.integration.Messages.ResponseParameters; 37 import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; 38 import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; 39 import io.netty.channel.ChannelHandler; 40 import io.netty.handler.codec.http2.Http2Stream; 41 import io.netty.util.AsciiString; 42 import java.io.IOException; 43 import java.net.InetSocketAddress; 44 import java.util.concurrent.CountDownLatch; 45 import java.util.concurrent.Executors; 46 import java.util.concurrent.TimeUnit; 47 import java.util.concurrent.atomic.AtomicReference; 48 import org.junit.After; 49 import org.junit.Before; 50 import org.junit.Ignore; 51 import org.junit.Test; 52 import org.junit.runner.RunWith; 53 import org.junit.runners.JUnit4; 54 55 @RunWith(JUnit4.class) 56 public class NettyFlowControlTest { 57 58 // in bytes 59 private static final int LOW_BAND = 2 * 1024 * 1024; 60 private static final int HIGH_BAND = 30 * 1024 * 1024; 61 62 // in milliseconds 63 private static final int MED_LAT = 10; 64 65 // in bytes 66 private static final int TINY_WINDOW = 1; 67 private static final int REGULAR_WINDOW = 64 * 1024; 68 private static final int MAX_WINDOW = 8 * 1024 * 1024; 69 70 private final CapturingProtocolNegotiationFactory capturingPnFactory 71 = new CapturingProtocolNegotiationFactory(); 72 private ManagedChannel channel; 73 private Server server; 74 private TrafficControlProxy proxy; 75 76 private int proxyPort; 77 private int serverPort; 78 79 @Before initTest()80 public void initTest() { 81 startServer(REGULAR_WINDOW); 82 serverPort = server.getPort(); 83 } 84 85 @After endTest()86 public void endTest() throws IOException { 87 if (proxy != null) { 88 proxy.shutDown(); 89 } 90 server.shutdownNow(); 91 if (channel != null) { 92 channel.shutdownNow(); 93 } 94 } 95 96 @Test largeBdp()97 public void largeBdp() throws InterruptedException, IOException { 98 proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); 99 proxy.start(); 100 proxyPort = proxy.getPort(); 101 createAndStartChannel(REGULAR_WINDOW); 102 doTest(HIGH_BAND, MED_LAT); 103 } 104 105 @Test smallBdp()106 public void smallBdp() throws InterruptedException, IOException { 107 proxy = new TrafficControlProxy(serverPort, LOW_BAND, MED_LAT, TimeUnit.MILLISECONDS); 108 proxy.start(); 109 proxyPort = proxy.getPort(); 110 createAndStartChannel(REGULAR_WINDOW); 111 doTest(LOW_BAND, MED_LAT); 112 } 113 114 @Test 115 @Ignore("enable once 2 pings between data is no longer necessary") verySmallWindowMakesProgress()116 public void verySmallWindowMakesProgress() throws InterruptedException, IOException { 117 proxy = new TrafficControlProxy(serverPort, HIGH_BAND, MED_LAT, TimeUnit.MILLISECONDS); 118 proxy.start(); 119 proxyPort = proxy.getPort(); 120 createAndStartChannel(TINY_WINDOW); 121 doTest(HIGH_BAND, MED_LAT); 122 } 123 124 /** 125 * Main testing method. Streams 2 MB of data from a server and records the final window and 126 * average bandwidth usage. 127 */ doTest(int bandwidth, int latency)128 private void doTest(int bandwidth, int latency) throws InterruptedException { 129 130 int streamSize = 1 * 1024 * 1024; 131 long expectedWindow = latency * (bandwidth / TimeUnit.SECONDS.toMillis(1)); 132 133 TestServiceGrpc.TestServiceStub stub = TestServiceGrpc.newStub(channel); 134 StreamingOutputCallRequest.Builder builder = StreamingOutputCallRequest.newBuilder() 135 .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 16)) 136 .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 16)) 137 .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 8)) 138 .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 4)) 139 .addResponseParameters(ResponseParameters.newBuilder().setSize(streamSize / 2)); 140 StreamingOutputCallRequest request = builder.build(); 141 142 TestStreamObserver observer = 143 new TestStreamObserver(capturingPnFactory.grpcHandlerRef, expectedWindow); 144 stub.streamingOutputCall(request, observer); 145 int lastWindow = observer.waitFor(5, TimeUnit.SECONDS); 146 147 // deal with cases that either don't cause a window update or hit max window 148 expectedWindow = Math.min(MAX_WINDOW, Math.max(expectedWindow, REGULAR_WINDOW)); 149 150 // Range looks large, but this allows for only one extra/missed window update plus 151 // bdpPing variations. 152 // (one extra update causes a 2x difference and one missed update causes a .5x difference) 153 assertTrue("Window was " + lastWindow + " expecting " + expectedWindow, 154 lastWindow < 2.2 * expectedWindow); 155 assertTrue("Window was " + lastWindow + " expecting " + expectedWindow, 156 expectedWindow < 2 * lastWindow); 157 } 158 159 /** 160 * Resets client/server and their flow control windows. 161 */ createAndStartChannel(int clientFlowControlWindow)162 private void createAndStartChannel(int clientFlowControlWindow) { 163 NettyChannelBuilder channelBuilder = 164 NettyChannelBuilder 165 .forAddress(new InetSocketAddress("localhost", proxyPort)) 166 .initialFlowControlWindow(clientFlowControlWindow) 167 .negotiationType(NegotiationType.PLAINTEXT); 168 InternalNettyChannelBuilder.setProtocolNegotiatorFactory(channelBuilder, capturingPnFactory); 169 channel = channelBuilder.build(); 170 } 171 startServer(int serverFlowControlWindow)172 private void startServer(int serverFlowControlWindow) { 173 ServerBuilder<?> builder = 174 NettyServerBuilder 175 .forAddress(new InetSocketAddress("localhost", 0)) 176 .initialFlowControlWindow(serverFlowControlWindow); 177 builder.addService(ServerInterceptors.intercept( 178 new TestServiceImpl(Executors.newScheduledThreadPool(2)), 179 ImmutableList.<ServerInterceptor>of())); 180 try { 181 server = builder.build().start(); 182 } catch (IOException e) { 183 throw new RuntimeException(e); 184 } 185 } 186 187 /** 188 * Simple stream observer to measure elapsed time of the call. 189 */ 190 private static class TestStreamObserver implements StreamObserver<StreamingOutputCallResponse> { 191 192 final AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef; 193 final CountDownLatch latch = new CountDownLatch(1); 194 final long expectedWindow; 195 int lastWindow; 196 boolean wasCompleted; 197 TestStreamObserver( AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window)198 public TestStreamObserver( 199 AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef, long window) { 200 this.grpcHandlerRef = grpcHandlerRef; 201 expectedWindow = window; 202 } 203 204 @Override onNext(StreamingOutputCallResponse value)205 public void onNext(StreamingOutputCallResponse value) { 206 GrpcHttp2ConnectionHandler grpcHandler = grpcHandlerRef.get(); 207 Http2Stream connectionStream = grpcHandler.connection().connectionStream(); 208 int curWindow = grpcHandler.decoder().flowController().initialWindowSize(connectionStream); 209 synchronized (this) { 210 if (curWindow >= expectedWindow) { 211 if (wasCompleted) { 212 return; 213 } 214 wasCompleted = true; 215 lastWindow = curWindow; 216 onCompleted(); 217 } else if (!wasCompleted) { 218 lastWindow = curWindow; 219 } 220 } 221 } 222 223 @Override onError(Throwable t)224 public void onError(Throwable t) { 225 latch.countDown(); 226 throw new RuntimeException(t); 227 } 228 229 @Override onCompleted()230 public void onCompleted() { 231 latch.countDown(); 232 } 233 waitFor(long duration, TimeUnit unit)234 public int waitFor(long duration, TimeUnit unit) throws InterruptedException { 235 latch.await(duration, unit); 236 return lastWindow; 237 } 238 } 239 240 private static class CapturingProtocolNegotiationFactory implements ProtocolNegotiatorFactory { 241 242 AtomicReference<GrpcHttp2ConnectionHandler> grpcHandlerRef = new AtomicReference<>(); 243 244 @Override buildProtocolNegotiator()245 public ProtocolNegotiator buildProtocolNegotiator() { 246 return new CapturingProtocolNegotiator(); 247 } 248 249 private class CapturingProtocolNegotiator implements ProtocolNegotiator { 250 251 final ProtocolNegotiator delegate = InternalProtocolNegotiators.plaintext(); 252 253 @Override scheme()254 public AsciiString scheme() { 255 return delegate.scheme(); 256 } 257 258 @Override newHandler(GrpcHttp2ConnectionHandler grpcHandler)259 public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) { 260 CapturingProtocolNegotiationFactory.this.grpcHandlerRef.set(grpcHandler); 261 return delegate.newHandler(grpcHandler); 262 } 263 264 @Override close()265 public void close() { 266 delegate.close(); 267 } 268 } 269 } 270 } 271