• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.netty.handler.codec.http2.Http2CodecUtil.getEmbeddedHttp2Exception;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Ticker;
25 import io.grpc.ChannelLogger;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.handler.codec.http2.Http2ConnectionDecoder;
29 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
30 import io.netty.handler.codec.http2.Http2Exception;
31 import io.netty.handler.codec.http2.Http2LocalFlowController;
32 import io.netty.handler.codec.http2.Http2Settings;
33 import io.netty.handler.codec.http2.Http2Stream;
34 import java.util.concurrent.TimeUnit;
35 
36 /**
37  * Base class for all Netty gRPC handlers. This class standardizes exception handling (always
38  * shutdown the connection) as well as sending the initial connection window at startup.
39  */
40 abstract class AbstractNettyHandler extends GrpcHttp2ConnectionHandler {
41   private static final long GRACEFUL_SHUTDOWN_NO_TIMEOUT = -1;
42 
43   private final int initialConnectionWindow;
44   private final FlowControlPinger flowControlPing;
45 
46   private boolean autoTuneFlowControlOn;
47   private ChannelHandlerContext ctx;
48   private boolean initialWindowSent = false;
49   private final Ticker ticker;
50 
51   private static final long BDP_MEASUREMENT_PING = 1234;
52 
AbstractNettyHandler( ChannelPromise channelUnused, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings, ChannelLogger negotiationLogger, boolean autoFlowControl, PingLimiter pingLimiter, Ticker ticker)53   AbstractNettyHandler(
54       ChannelPromise channelUnused,
55       Http2ConnectionDecoder decoder,
56       Http2ConnectionEncoder encoder,
57       Http2Settings initialSettings,
58       ChannelLogger negotiationLogger,
59       boolean autoFlowControl,
60       PingLimiter pingLimiter,
61       Ticker ticker) {
62     super(channelUnused, decoder, encoder, initialSettings, negotiationLogger);
63 
64     // During a graceful shutdown, wait until all streams are closed.
65     gracefulShutdownTimeoutMillis(GRACEFUL_SHUTDOWN_NO_TIMEOUT);
66 
67     // Extract the connection window from the settings if it was set.
68     this.initialConnectionWindow = initialSettings.initialWindowSize() == null ? -1 :
69         initialSettings.initialWindowSize();
70     this.autoTuneFlowControlOn = autoFlowControl;
71     if (pingLimiter == null) {
72       pingLimiter = new AllowPingLimiter();
73     }
74     this.flowControlPing = new FlowControlPinger(pingLimiter);
75     this.ticker = checkNotNull(ticker, "ticker");
76   }
77 
78   @Override
handlerAdded(ChannelHandlerContext ctx)79   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
80     this.ctx = ctx;
81     // Sends the connection preface if we haven't already.
82     super.handlerAdded(ctx);
83     sendInitialConnectionWindow();
84   }
85 
86   @Override
channelActive(ChannelHandlerContext ctx)87   public void channelActive(ChannelHandlerContext ctx) throws Exception {
88     // Sends connection preface if we haven't already.
89     super.channelActive(ctx);
90     sendInitialConnectionWindow();
91   }
92 
93   @Override
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)94   public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
95     Http2Exception embedded = getEmbeddedHttp2Exception(cause);
96     if (embedded == null) {
97       // There was no embedded Http2Exception, assume it's a connection error. Subclasses are
98       // responsible for storing the appropriate status and shutting down the connection.
99       onError(ctx, /* outbound= */ false, cause);
100     } else {
101       super.exceptionCaught(ctx, cause);
102     }
103   }
104 
ctx()105   protected final ChannelHandlerContext ctx() {
106     return ctx;
107   }
108 
109   /**
110    * Sends initial connection window to the remote endpoint if necessary.
111    */
sendInitialConnectionWindow()112   private void sendInitialConnectionWindow() throws Http2Exception {
113     if (!initialWindowSent && ctx.channel().isActive()) {
114       Http2Stream connectionStream = connection().connectionStream();
115       int currentSize = connection().local().flowController().windowSize(connectionStream);
116       int delta = initialConnectionWindow - currentSize;
117       decoder().flowController().incrementWindowSize(connectionStream, delta);
118       initialWindowSent = true;
119       ctx.flush();
120     }
121   }
122 
123   @VisibleForTesting
flowControlPing()124   FlowControlPinger flowControlPing() {
125     return flowControlPing;
126   }
127 
128   @VisibleForTesting
setAutoTuneFlowControl(boolean isOn)129   void setAutoTuneFlowControl(boolean isOn) {
130     autoTuneFlowControlOn = isOn;
131   }
132 
133   /**
134    * Class for handling flow control pinging and flow control window updates as necessary.
135    */
136   final class FlowControlPinger {
137 
138     private static final int MAX_WINDOW_SIZE = 8 * 1024 * 1024;
139     public static final int MAX_BACKOFF = 10;
140 
141     private final PingLimiter pingLimiter;
142     private int pingCount;
143     private int pingReturn;
144     private boolean pinging;
145     private int dataSizeSincePing;
146     private long lastBandwidth; // bytes per nanosecond
147     private long lastPingTime;
148     private int lastTargetWindow;
149     private int pingFrequencyMultiplier;
150 
FlowControlPinger(PingLimiter pingLimiter)151     public FlowControlPinger(PingLimiter pingLimiter) {
152       Preconditions.checkNotNull(pingLimiter, "pingLimiter");
153       this.pingLimiter = pingLimiter;
154     }
155 
payload()156     public long payload() {
157       return BDP_MEASUREMENT_PING;
158     }
159 
maxWindow()160     public int maxWindow() {
161       return MAX_WINDOW_SIZE;
162     }
163 
onDataRead(int dataLength, int paddingLength)164     public void onDataRead(int dataLength, int paddingLength) {
165       if (!autoTuneFlowControlOn) {
166         return;
167       }
168 
169       // Note that we are double counting around the ping initiation as the current data will be
170       // added at the end of this method, so will be available in the next check.  This at worst
171       // causes us to send a ping one data packet earlier, but makes startup faster if there are
172       // small packets before big ones.
173       int dataForCheck = getDataSincePing() + dataLength + paddingLength;
174       // Need to double the data here to account for targetWindow being set to twice the data below
175       if (!isPinging() && pingLimiter.isPingAllowed()
176           && dataForCheck * 2 >= lastTargetWindow * pingFrequencyMultiplier) {
177         setPinging(true);
178         sendPing(ctx());
179       }
180 
181       if (lastTargetWindow == 0) {
182         lastTargetWindow =
183             decoder().flowController().initialWindowSize(connection().connectionStream());
184       }
185 
186       incrementDataSincePing(dataLength + paddingLength);
187     }
188 
updateWindow()189     public void updateWindow() throws Http2Exception {
190       if (!autoTuneFlowControlOn) {
191         return;
192       }
193       pingReturn++;
194       setPinging(false);
195 
196       long elapsedTime = (ticker.read() - lastPingTime);
197       if (elapsedTime == 0) {
198         elapsedTime = 1;
199       }
200 
201       long bandwidth = (getDataSincePing() * TimeUnit.SECONDS.toNanos(1)) / elapsedTime;
202       // Calculate new window size by doubling the observed BDP, but cap at max window
203       int targetWindow = Math.min(getDataSincePing() * 2, MAX_WINDOW_SIZE);
204       Http2LocalFlowController fc = decoder().flowController();
205       int currentWindow = fc.initialWindowSize(connection().connectionStream());
206       if (bandwidth <= lastBandwidth || targetWindow <= currentWindow) {
207         pingFrequencyMultiplier = Math.min(pingFrequencyMultiplier + 1, MAX_BACKOFF);
208         return;
209       }
210 
211       pingFrequencyMultiplier = 0; // react quickly when size is changing
212       lastBandwidth = bandwidth;
213       lastTargetWindow = targetWindow;
214       int increase = targetWindow - currentWindow;
215       fc.incrementWindowSize(connection().connectionStream(), increase);
216       fc.initialWindowSize(targetWindow);
217       Http2Settings settings = new Http2Settings();
218       settings.initialWindowSize(targetWindow);
219       frameWriter().writeSettings(ctx(), settings, ctx().newPromise());
220     }
221 
isPinging()222     private boolean isPinging() {
223       return pinging;
224     }
225 
setPinging(boolean pingOut)226     private void setPinging(boolean pingOut) {
227       pinging = pingOut;
228     }
229 
sendPing(ChannelHandlerContext ctx)230     private void sendPing(ChannelHandlerContext ctx) {
231       setDataSizeSincePing(0);
232       lastPingTime = ticker.read();
233       encoder().writePing(ctx, false, BDP_MEASUREMENT_PING, ctx.newPromise());
234       pingCount++;
235     }
236 
incrementDataSincePing(int increase)237     private void incrementDataSincePing(int increase) {
238       int currentSize = getDataSincePing();
239       setDataSizeSincePing(currentSize + increase);
240     }
241 
242     @VisibleForTesting
getPingCount()243     int getPingCount() {
244       return pingCount;
245     }
246 
247     @VisibleForTesting
getPingReturn()248     int getPingReturn() {
249       return pingReturn;
250     }
251 
252     @VisibleForTesting
getDataSincePing()253     int getDataSincePing() {
254       return dataSizeSincePing;
255     }
256 
setDataSizeSincePing(int dataSize)257     private void setDataSizeSincePing(int dataSize) {
258       dataSizeSincePing = dataSize;
259     }
260 
261     // Only used in testing
262     @VisibleForTesting
setDataSizeAndSincePing(int dataSize)263     void setDataSizeAndSincePing(int dataSize) {
264       setDataSizeSincePing(dataSize);
265       pingFrequencyMultiplier = 1;
266       lastPingTime = ticker.read() ;
267     }
268   }
269 
270   /** Controls whether PINGs like those for BDP are permitted to be sent at the current time. */
271   public interface PingLimiter {
isPingAllowed()272     boolean isPingAllowed();
273   }
274 
275   private static final class AllowPingLimiter implements PingLimiter {
isPingAllowed()276     @Override public boolean isPingAllowed() {
277       return true;
278     }
279   }
280 }
281