1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Preconditions; 24 import com.google.common.base.Ticker; 25 import io.grpc.ChannelLogger; 26 import io.netty.channel.ChannelHandlerContext; 27 import io.netty.channel.ChannelPromise; 28 import io.netty.handler.codec.http2.Http2ConnectionDecoder; 29 import io.netty.handler.codec.http2.Http2ConnectionEncoder; 30 import io.netty.handler.codec.http2.Http2Exception; 31 import io.netty.handler.codec.http2.Http2LocalFlowController; 32 import io.netty.handler.codec.http2.Http2Settings; 33 import io.netty.handler.codec.http2.Http2Stream; 34 import java.util.concurrent.TimeUnit; 35 36 /** 37 * Base class for all Netty gRPC handlers. This class standardizes exception handling (always 38 * shutdown the connection) as well as sending the initial connection window at startup. 39 */ 40 abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { 41 private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; 42 43 private final int initialConnectionWindow; 44 private final FlowControlPinger flowControlPing; 45 46 private boolean autoTuneFlowControlOn; 47 private ChannelHandlerContext ctx; 48 private boolean initialWindowSent = false; 49 private final Ticker ticker; 50 51 private static final long BDP_MEASUREMENT_PING = 1234; 52 AbstractNettyHandler( ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings, ChannelLogger negotiationLogger, boolean autoFlowControl, PingLimiter pingLimiter, Ticker ticker)53 AbstractNettyHandler( 54 ChannelPromise channelUnused, 55 Http2ConnectionDecoder decoder, 56 Http2ConnectionEncoder encoder, 57 Http2Settings initialSettings, 58 ChannelLogger negotiationLogger, 59 boolean autoFlowControl, 60 PingLimiter pingLimiter, 61 Ticker ticker) { 62 super(channelUnused, decoder, encoder, initialSettings, negotiationLogger); 63 64 // During a graceful shutdown, wait until all streams are closed. 65 gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT); 66 67 // Extract the connection window from the settings if it was set. 68 this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : 69 initialSettings.initialWindowSize(); 70 this.autoTuneFlowControlOn = autoFlowControl; 71 if (pingLimiter == null) { 72 pingLimiter = new AllowPingLimiter(); 73 } 74 this.flowControlPing = new FlowControlPinger(pingLimiter); 75 this.ticker = checkNotNull(ticker, "ticker"); 76 } 77 78 @Override handlerAdded(ChannelHandlerContext ctx)79 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 80 this.ctx = ctx; 81 // Sends the connection preface if we haven't already. 82 super.handlerAdded(ctx); 83 sendInitialConnectionWindow(); 84 } 85 86 @Override channelActive(ChannelHandlerContext ctx)87 public void channelActive(ChannelHandlerContext ctx) throws Exception { 88 // Sends connection preface if we haven't already. 89 super.channelActive(ctx); 90 sendInitialConnectionWindow(); 91 } 92 93 @Override exceptionCaught(ChannelHandlerContext ctx, Throwable cause)94 public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 95 Http2Exception embedded = getEmbeddedHttp2Exception(cause); 96 if (embedded == null) { 97 // There was no embedded Http2Exception, assume it's a connection error. Subclasses are 98 // responsible for storing the appropriate status and shutting down the connection. 99 onError(ctx, /* outbound= */ false, cause); 100 } else { 101 super.exceptionCaught(ctx, cause); 102 } 103 } 104 ctx()105 protected final ChannelHandlerContext ctx() { 106 return ctx; 107 } 108 109 /** 110 * Sends initial connection window to the remote endpoint if necessary. 111 */ sendInitialConnectionWindow()112 private void sendInitialConnectionWindow() throws Http2Exception { 113 if (!initialWindowSent && ctx.channel().isActive()) { 114 Http2Stream connectionStream = connection().connectionStream(); 115 int currentSize = connection().local().flowController().windowSize(connectionStream); 116 int delta = initialConnectionWindow - currentSize; 117 decoder().flowController().incrementWindowSize(connectionStream, delta); 118 initialWindowSent = true; 119 ctx.flush(); 120 } 121 } 122 123 @VisibleForTesting flowControlPing()124 FlowControlPinger flowControlPing() { 125 return flowControlPing; 126 } 127 128 @VisibleForTesting setAutoTuneFlowControl(boolean isOn)129 void setAutoTuneFlowControl(boolean isOn) { 130 autoTuneFlowControlOn = isOn; 131 } 132 133 /** 134 * Class for handling flow control pinging and flow control window updates as necessary. 135 */ 136 final class FlowControlPinger { 137 138 private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; 139 public static final int MAX_BACKOFF = 10; 140 141 private final PingLimiter pingLimiter; 142 private int pingCount; 143 private int pingReturn; 144 private boolean pinging; 145 private int dataSizeSincePing; 146 private long lastBandwidth; // bytes per nanosecond 147 private long lastPingTime; 148 private int lastTargetWindow; 149 private int pingFrequencyMultiplier; 150 FlowControlPinger(PingLimiter pingLimiter)151 public FlowControlPinger(PingLimiter pingLimiter) { 152 Preconditions.checkNotNull(pingLimiter, "pingLimiter"); 153 this.pingLimiter = pingLimiter; 154 } 155 payload()156 public long payload() { 157 return BDP_MEASUREMENT_PING; 158 } 159 maxWindow()160 public int maxWindow() { 161 return MAX_WINDOW_SIZE; 162 } 163 onDataRead(int dataLength, int paddingLength)164 public void onDataRead(int dataLength, int paddingLength) { 165 if (!autoTuneFlowControlOn) { 166 return; 167 } 168 169 // Note that we are double counting around the ping initiation as the current data will be 170 // added at the end of this method, so will be available in the next check. This at worst 171 // causes us to send a ping one data packet earlier, but makes startup faster if there are 172 // small packets before big ones. 173 int dataForCheck = getDataSincePing() + dataLength + paddingLength; 174 // Need to double the data here to account for targetWindow being set to twice the data below 175 if (!isPinging() && pingLimiter.isPingAllowed() 176 && dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier) { 177 setPinging(true); 178 sendPing(ctx()); 179 } 180 181 if (lastTargetWindow == 0) { 182 lastTargetWindow = 183 decoder().flowController().initialWindowSize(connection().connectionStream()); 184 } 185 186 incrementDataSincePing(dataLength + paddingLength); 187 } 188 updateWindow()189 public void updateWindow() throws Http2Exception { 190 if (!autoTuneFlowControlOn) { 191 return; 192 } 193 pingReturn++; 194 setPinging(false); 195 196 long elapsedTime = (ticker.read() - lastPingTime); 197 if (elapsedTime == 0) { 198 elapsedTime = 1; 199 } 200 201 long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime; 202 // Calculate new window size by doubling the observed BDP, but cap at max window 203 int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE); 204 Http2LocalFlowController fc = decoder().flowController(); 205 int currentWindow = fc.initialWindowSize(connection().connectionStream()); 206 if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) { 207 pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF); 208 return; 209 } 210 211 pingFrequencyMultiplier = 0; // react quickly when size is changing 212 lastBandwidth = bandwidth; 213 lastTargetWindow = targetWindow; 214 int increase = targetWindow - currentWindow; 215 fc.incrementWindowSize(connection().connectionStream(), increase); 216 fc.initialWindowSize(targetWindow); 217 Http2Settings settings = new Http2Settings(); 218 settings.initialWindowSize(targetWindow); 219 frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); 220 } 221 isPinging()222 private boolean isPinging() { 223 return pinging; 224 } 225 setPinging(boolean pingOut)226 private void setPinging(boolean pingOut) { 227 pinging = pingOut; 228 } 229 sendPing(ChannelHandlerContext ctx)230 private void sendPing(ChannelHandlerContext ctx) { 231 setDataSizeSincePing(0); 232 lastPingTime = ticker.read(); 233 encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise()); 234 pingCount++; 235 } 236 incrementDataSincePing(int increase)237 private void incrementDataSincePing(int increase) { 238 int currentSize = getDataSincePing(); 239 setDataSizeSincePing(currentSize + increase); 240 } 241 242 @VisibleForTesting getPingCount()243 int getPingCount() { 244 return pingCount; 245 } 246 247 @VisibleForTesting getPingReturn()248 int getPingReturn() { 249 return pingReturn; 250 } 251 252 @VisibleForTesting getDataSincePing()253 int getDataSincePing() { 254 return dataSizeSincePing; 255 } 256 setDataSizeSincePing(int dataSize)257 private void setDataSizeSincePing(int dataSize) { 258 dataSizeSincePing = dataSize; 259 } 260 261 // Only used in testing 262 @VisibleForTesting setDataSizeAndSincePing(int dataSize)263 void setDataSizeAndSincePing(int dataSize) { 264 setDataSizeSincePing(dataSize); 265 pingFrequencyMultiplier = 1; 266 lastPingTime = ticker.read() ; 267 } 268 } 269 270 /** Controls whether PINGs like those for BDP are permitted to be sent at the current time. */ 271 public interface PingLimiter { isPingAllowed()272 boolean isPingAllowed(); 273 } 274 275 private static final class AllowPingLimiter implements PingLimiter { isPingAllowed()276 @Override public boolean isPingAllowed() { 277 return true; 278 } 279 } 280 } 281