• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.handler.codec.http2.Http2ConnectionDecoder;
25 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
26 import io.netty.handler.codec.http2.Http2Exception;
27 import io.netty.handler.codec.http2.Http2LocalFlowController;
28 import io.netty.handler.codec.http2.Http2Settings;
29 import io.netty.handler.codec.http2.Http2Stream;
30 import java.util.concurrent.TimeUnit;
31 
32 /**
33  * Base class for all Netty gRPC handlers. This class standardizes exception handling (always
34  * shutdown the connection) as well as sending the initial connection window at startup.
35  */
36 abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
37   private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
38   private boolean autoTuneFlowControlOn = false;
39   private int initialConnectionWindow;
40   private ChannelHandlerContext ctx;
41   private final FlowControlPinger flowControlPing = new FlowControlPinger();
42 
43   private static final long BDP_MEASUREMENT_PING = 1234;
44 
AbstractNettyHandler( ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings)45   AbstractNettyHandler(
46       ChannelPromise channelUnused,
47       Http2ConnectionDecoder decoder,
48       Http2ConnectionEncoder encoder,
49       Http2Settings initialSettings) {
50     super(channelUnused, decoder, encoder, initialSettings);
51 
52     // During a graceful shutdown, wait until all streams are closed.
53     gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);
54 
55     // Extract the connection window from the settings if it was set.
56     this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
57             initialSettings.initialWindowSize();
58   }
59 
60   @Override
handlerAdded(ChannelHandlerContext ctx)61   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
62     this.ctx = ctx;
63     // Sends the connection preface if we haven't already.
64     super.handlerAdded(ctx);
65     sendInitialConnectionWindow();
66   }
67 
68   @Override
channelActive(ChannelHandlerContext ctx)69   public void channelActive(ChannelHandlerContext ctx) throws Exception {
70     // Sends connection preface if we haven't already.
71     super.channelActive(ctx);
72     sendInitialConnectionWindow();
73   }
74 
75   @Override
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)76   public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
77     Http2Exception embedded = getEmbeddedHttp2Exception(cause);
78     if (embedded == null) {
79       // There was no embedded Http2Exception, assume it's a connection error. Subclasses are
80       // responsible for storing the appropriate status and shutting down the connection.
81       onError(ctx, /* outbound= */ false, cause);
82     } else {
83       super.exceptionCaught(ctx, cause);
84     }
85   }
86 
ctx()87   protected final ChannelHandlerContext ctx() {
88     return ctx;
89   }
90 
91   /**
92    * Sends initial connection window to the remote endpoint if necessary.
93    */
sendInitialConnectionWindow()94   private void sendInitialConnectionWindow() throws Http2Exception {
95     if (ctx.channel().isActive() && initialConnectionWindow > 0) {
96       Http2Stream connectionStream = connection().connectionStream();
97       int currentSize = connection().local().flowController().windowSize(connectionStream);
98       int delta = initialConnectionWindow - currentSize;
99       decoder().flowController().incrementWindowSize(connectionStream, delta);
100       initialConnectionWindow = -1;
101       ctx.flush();
102     }
103   }
104 
105   @VisibleForTesting
flowControlPing()106   FlowControlPinger flowControlPing() {
107     return flowControlPing;
108   }
109 
110   @VisibleForTesting
setAutoTuneFlowControl(boolean isOn)111   void setAutoTuneFlowControl(boolean isOn) {
112     autoTuneFlowControlOn = isOn;
113   }
114 
115   /**
116    * Class for handling flow control pinging and flow control window updates as necessary.
117    */
118   final class FlowControlPinger {
119 
120     private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
121     private int pingCount;
122     private int pingReturn;
123     private boolean pinging;
124     private int dataSizeSincePing;
125     private float lastBandwidth; // bytes per second
126     private long lastPingTime;
127 
payload()128     public long payload() {
129       return BDP_MEASUREMENT_PING;
130     }
131 
maxWindow()132     public int maxWindow() {
133       return MAX_WINDOW_SIZE;
134     }
135 
onDataRead(int dataLength, int paddingLength)136     public void onDataRead(int dataLength, int paddingLength) {
137       if (!autoTuneFlowControlOn) {
138         return;
139       }
140       if (!isPinging()) {
141         setPinging(true);
142         sendPing(ctx());
143       }
144       incrementDataSincePing(dataLength + paddingLength);
145     }
146 
updateWindow()147     public void updateWindow() throws Http2Exception {
148       if (!autoTuneFlowControlOn) {
149         return;
150       }
151       pingReturn++;
152       long elapsedTime = (System.nanoTime() - lastPingTime);
153       if (elapsedTime == 0) {
154         elapsedTime = 1;
155       }
156       long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
157       Http2LocalFlowController fc = decoder().flowController();
158       // Calculate new window size by doubling the observed BDP, but cap at max window
159       int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
160       setPinging(false);
161       int currentWindow = fc.initialWindowSize(connection().connectionStream());
162       if (targetWindow > currentWindow && bandwidth > lastBandwidth) {
163         lastBandwidth = bandwidth;
164         int increase = targetWindow - currentWindow;
165         fc.incrementWindowSize(connection().connectionStream(), increase);
166         fc.initialWindowSize(targetWindow);
167         Http2Settings settings = new Http2Settings();
168         settings.initialWindowSize(targetWindow);
169         frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
170       }
171 
172     }
173 
isPinging()174     private boolean isPinging() {
175       return pinging;
176     }
177 
setPinging(boolean pingOut)178     private void setPinging(boolean pingOut) {
179       pinging = pingOut;
180     }
181 
sendPing(ChannelHandlerContext ctx)182     private void sendPing(ChannelHandlerContext ctx) {
183       setDataSizeSincePing(0);
184       lastPingTime = System.nanoTime();
185       encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
186       pingCount++;
187     }
188 
incrementDataSincePing(int increase)189     private void incrementDataSincePing(int increase) {
190       int currentSize = getDataSincePing();
191       setDataSizeSincePing(currentSize + increase);
192     }
193 
194     @VisibleForTesting
getPingCount()195     int getPingCount() {
196       return pingCount;
197     }
198 
199     @VisibleForTesting
getPingReturn()200     int getPingReturn() {
201       return pingReturn;
202     }
203 
204     @VisibleForTesting
getDataSincePing()205     int getDataSincePing() {
206       return dataSizeSincePing;
207     }
208 
209     @VisibleForTesting
setDataSizeSincePing(int dataSize)210     void setDataSizeSincePing(int dataSize) {
211       dataSizeSincePing = dataSize;
212     }
213   }
214 }
215