1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import io.netty.channel.ChannelHandlerContext; 23 import io.netty.channel.ChannelPromise; 24 import io.netty.handler.codec.http2.Http2ConnectionDecoder; 25 import io.netty.handler.codec.http2.Http2ConnectionEncoder; 26 import io.netty.handler.codec.http2.Http2Exception; 27 import io.netty.handler.codec.http2.Http2LocalFlowController; 28 import io.netty.handler.codec.http2.Http2Settings; 29 import io.netty.handler.codec.http2.Http2Stream; 30 import java.util.concurrent.TimeUnit; 31 32 /** 33 * Base class for all Netty gRPC handlers. This class standardizes exception handling (always 34 * shutdown the connection) as well as sending the initial connection window at startup. 35 */ 36 abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler { 37 private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1; 38 private boolean autoTuneFlowControlOn = false; 39 private int initialConnectionWindow; 40 private ChannelHandlerContext ctx; 41 private final FlowControlPinger flowControlPing = new FlowControlPinger(); 42 43 private static final long BDP_MEASUREMENT_PING = 1234; 44 AbstractNettyHandler( ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings)45 AbstractNettyHandler( 46 ChannelPromise channelUnused, 47 Http2ConnectionDecoder decoder, 48 Http2ConnectionEncoder encoder, 49 Http2Settings initialSettings) { 50 super(channelUnused, decoder, encoder, initialSettings); 51 52 // During a graceful shutdown, wait until all streams are closed. 53 gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT); 54 55 // Extract the connection window from the settings if it was set. 56 this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 : 57 initialSettings.initialWindowSize(); 58 } 59 60 @Override handlerAdded(ChannelHandlerContext ctx)61 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 62 this.ctx = ctx; 63 // Sends the connection preface if we haven't already. 64 super.handlerAdded(ctx); 65 sendInitialConnectionWindow(); 66 } 67 68 @Override channelActive(ChannelHandlerContext ctx)69 public void channelActive(ChannelHandlerContext ctx) throws Exception { 70 // Sends connection preface if we haven't already. 71 super.channelActive(ctx); 72 sendInitialConnectionWindow(); 73 } 74 75 @Override exceptionCaught(ChannelHandlerContext ctx, Throwable cause)76 public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 77 Http2Exception embedded = getEmbeddedHttp2Exception(cause); 78 if (embedded == null) { 79 // There was no embedded Http2Exception, assume it's a connection error. Subclasses are 80 // responsible for storing the appropriate status and shutting down the connection. 81 onError(ctx, /* outbound= */ false, cause); 82 } else { 83 super.exceptionCaught(ctx, cause); 84 } 85 } 86 ctx()87 protected final ChannelHandlerContext ctx() { 88 return ctx; 89 } 90 91 /** 92 * Sends initial connection window to the remote endpoint if necessary. 93 */ sendInitialConnectionWindow()94 private void sendInitialConnectionWindow() throws Http2Exception { 95 if (ctx.channel().isActive() && initialConnectionWindow > 0) { 96 Http2Stream connectionStream = connection().connectionStream(); 97 int currentSize = connection().local().flowController().windowSize(connectionStream); 98 int delta = initialConnectionWindow - currentSize; 99 decoder().flowController().incrementWindowSize(connectionStream, delta); 100 initialConnectionWindow = -1; 101 ctx.flush(); 102 } 103 } 104 105 @VisibleForTesting flowControlPing()106 FlowControlPinger flowControlPing() { 107 return flowControlPing; 108 } 109 110 @VisibleForTesting setAutoTuneFlowControl(boolean isOn)111 void setAutoTuneFlowControl(boolean isOn) { 112 autoTuneFlowControlOn = isOn; 113 } 114 115 /** 116 * Class for handling flow control pinging and flow control window updates as necessary. 117 */ 118 final class FlowControlPinger { 119 120 private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024; 121 private int pingCount; 122 private int pingReturn; 123 private boolean pinging; 124 private int dataSizeSincePing; 125 private float lastBandwidth; // bytes per second 126 private long lastPingTime; 127 payload()128 public long payload() { 129 return BDP_MEASUREMENT_PING; 130 } 131 maxWindow()132 public int maxWindow() { 133 return MAX_WINDOW_SIZE; 134 } 135 onDataRead(int dataLength, int paddingLength)136 public void onDataRead(int dataLength, int paddingLength) { 137 if (!autoTuneFlowControlOn) { 138 return; 139 } 140 if (!isPinging()) { 141 setPinging(true); 142 sendPing(ctx()); 143 } 144 incrementDataSincePing(dataLength + paddingLength); 145 } 146 updateWindow()147 public void updateWindow() throws Http2Exception { 148 if (!autoTuneFlowControlOn) { 149 return; 150 } 151 pingReturn++; 152 long elapsedTime = (System.nanoTime() - lastPingTime); 153 if (elapsedTime == 0) { 154 elapsedTime = 1; 155 } 156 long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime; 157 Http2LocalFlowController fc = decoder().flowController(); 158 // Calculate new window size by doubling the observed BDP, but cap at max window 159 int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE); 160 setPinging(false); 161 int currentWindow = fc.initialWindowSize(connection().connectionStream()); 162 if (targetWindow > currentWindow && bandwidth > lastBandwidth) { 163 lastBandwidth = bandwidth; 164 int increase = targetWindow - currentWindow; 165 fc.incrementWindowSize(connection().connectionStream(), increase); 166 fc.initialWindowSize(targetWindow); 167 Http2Settings settings = new Http2Settings(); 168 settings.initialWindowSize(targetWindow); 169 frameWriter().writeSettings(ctx(), settings, ctx().newPromise()); 170 } 171 172 } 173 isPinging()174 private boolean isPinging() { 175 return pinging; 176 } 177 setPinging(boolean pingOut)178 private void setPinging(boolean pingOut) { 179 pinging = pingOut; 180 } 181 sendPing(ChannelHandlerContext ctx)182 private void sendPing(ChannelHandlerContext ctx) { 183 setDataSizeSincePing(0); 184 lastPingTime = System.nanoTime(); 185 encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise()); 186 pingCount++; 187 } 188 incrementDataSincePing(int increase)189 private void incrementDataSincePing(int increase) { 190 int currentSize = getDataSincePing(); 191 setDataSizeSincePing(currentSize + increase); 192 } 193 194 @VisibleForTesting getPingCount()195 int getPingCount() { 196 return pingCount; 197 } 198 199 @VisibleForTesting getPingReturn()200 int getPingReturn() { 201 return pingReturn; 202 } 203 204 @VisibleForTesting getDataSincePing()205 int getDataSincePing() { 206 return dataSizeSincePing; 207 } 208 209 @VisibleForTesting setDataSizeSincePing(int dataSize)210 void setDataSizeSincePing(int dataSize) { 211 dataSizeSincePing = dataSize; 212 } 213 } 214 } 215