• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
23 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
24 import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
25 import static io.grpc.netty.Utils.HTTP_METHOD;
26 import static io.grpc.netty.Utils.TE_HEADER;
27 import static io.grpc.netty.Utils.TE_TRAILERS;
28 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
29 
30 import com.google.common.annotations.VisibleForTesting;
31 import com.google.common.base.Preconditions;
32 import com.google.common.base.Strings;
33 import io.grpc.Attributes;
34 import io.grpc.InternalChannelz;
35 import io.grpc.InternalMetadata;
36 import io.grpc.InternalStatus;
37 import io.grpc.Metadata;
38 import io.grpc.ServerStreamTracer;
39 import io.grpc.Status;
40 import io.grpc.internal.GrpcUtil;
41 import io.grpc.internal.KeepAliveManager;
42 import io.grpc.internal.LogExceptionRunnable;
43 import io.grpc.internal.ServerTransportListener;
44 import io.grpc.internal.StatsTraceContext;
45 import io.grpc.internal.TransportTracer;
46 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
47 import io.netty.buffer.ByteBuf;
48 import io.netty.buffer.ByteBufUtil;
49 import io.netty.channel.ChannelFuture;
50 import io.netty.channel.ChannelFutureListener;
51 import io.netty.channel.ChannelHandlerContext;
52 import io.netty.channel.ChannelPromise;
53 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
54 import io.netty.handler.codec.http2.DefaultHttp2Connection;
55 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
56 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
57 import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
58 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
59 import io.netty.handler.codec.http2.DefaultHttp2Headers;
60 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
61 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
62 import io.netty.handler.codec.http2.Http2Connection;
63 import io.netty.handler.codec.http2.Http2ConnectionAdapter;
64 import io.netty.handler.codec.http2.Http2ConnectionDecoder;
65 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
66 import io.netty.handler.codec.http2.Http2Error;
67 import io.netty.handler.codec.http2.Http2Exception;
68 import io.netty.handler.codec.http2.Http2Exception.StreamException;
69 import io.netty.handler.codec.http2.Http2FlowController;
70 import io.netty.handler.codec.http2.Http2FrameAdapter;
71 import io.netty.handler.codec.http2.Http2FrameLogger;
72 import io.netty.handler.codec.http2.Http2FrameReader;
73 import io.netty.handler.codec.http2.Http2FrameWriter;
74 import io.netty.handler.codec.http2.Http2Headers;
75 import io.netty.handler.codec.http2.Http2HeadersDecoder;
76 import io.netty.handler.codec.http2.Http2InboundFrameLogger;
77 import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
78 import io.netty.handler.codec.http2.Http2Settings;
79 import io.netty.handler.codec.http2.Http2Stream;
80 import io.netty.handler.codec.http2.Http2StreamVisitor;
81 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
82 import io.netty.handler.logging.LogLevel;
83 import io.netty.util.AsciiString;
84 import io.netty.util.ReferenceCountUtil;
85 import java.util.List;
86 import java.util.concurrent.Future;
87 import java.util.concurrent.ScheduledFuture;
88 import java.util.concurrent.TimeUnit;
89 import java.util.logging.Level;
90 import java.util.logging.Logger;
91 import javax.annotation.CheckForNull;
92 import javax.annotation.Nullable;
93 
94 /**
95  * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
96  * the context of the Netty Channel thread.
97  */
98 class NettyServerHandler extends AbstractNettyHandler {
99   private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
100   private static final long KEEPALIVE_PING = 0xDEADL;
101   private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
102   private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
103 
104   private final Http2Connection.PropertyKey streamKey;
105   private final ServerTransportListener transportListener;
106   private final int maxMessageSize;
107   private final long keepAliveTimeInNanos;
108   private final long keepAliveTimeoutInNanos;
109   private final long maxConnectionAgeInNanos;
110   private final long maxConnectionAgeGraceInNanos;
111   private final List<ServerStreamTracer.Factory> streamTracerFactories;
112   private final TransportTracer transportTracer;
113   private final KeepAliveEnforcer keepAliveEnforcer;
114   /** Incomplete attributes produced by negotiator. */
115   private Attributes negotiationAttributes;
116   private InternalChannelz.Security securityInfo;
117   /** Completed attributes produced by transportReady. */
118   private Attributes attributes;
119   private Throwable connectionError;
120   private boolean teWarningLogged;
121   private WriteQueue serverWriteQueue;
122   private AsciiString lastKnownAuthority;
123   @CheckForNull
124   private KeepAliveManager keepAliveManager;
125   @CheckForNull
126   private MaxConnectionIdleManager maxConnectionIdleManager;
127   @CheckForNull
128   private ScheduledFuture<?> maxConnectionAgeMonitor;
129   @CheckForNull
130   private GracefulShutdown gracefulShutdown;
131 
newHandler( ServerTransportListener transportListener, ChannelPromise channelUnused, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)132   static NettyServerHandler newHandler(
133       ServerTransportListener transportListener,
134       ChannelPromise channelUnused,
135       List<ServerStreamTracer.Factory> streamTracerFactories,
136       TransportTracer transportTracer,
137       int maxStreams,
138       int flowControlWindow,
139       int maxHeaderListSize,
140       int maxMessageSize,
141       long keepAliveTimeInNanos,
142       long keepAliveTimeoutInNanos,
143       long maxConnectionIdleInNanos,
144       long maxConnectionAgeInNanos,
145       long maxConnectionAgeGraceInNanos,
146       boolean permitKeepAliveWithoutCalls,
147       long permitKeepAliveTimeInNanos) {
148     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
149     Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
150     Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
151     Http2FrameReader frameReader = new Http2InboundFrameLogger(
152         new DefaultHttp2FrameReader(headersDecoder), frameLogger);
153     Http2FrameWriter frameWriter =
154         new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
155     return newHandler(
156         channelUnused,
157         frameReader,
158         frameWriter,
159         transportListener,
160         streamTracerFactories,
161         transportTracer,
162         maxStreams,
163         flowControlWindow,
164         maxHeaderListSize,
165         maxMessageSize,
166         keepAliveTimeInNanos,
167         keepAliveTimeoutInNanos,
168         maxConnectionIdleInNanos,
169         maxConnectionAgeInNanos,
170         maxConnectionAgeGraceInNanos,
171         permitKeepAliveWithoutCalls,
172         permitKeepAliveTimeInNanos);
173   }
174 
175   @VisibleForTesting
newHandler( ChannelPromise channelUnused, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)176   static NettyServerHandler newHandler(
177       ChannelPromise channelUnused,
178       Http2FrameReader frameReader,
179       Http2FrameWriter frameWriter,
180       ServerTransportListener transportListener,
181       List<ServerStreamTracer.Factory> streamTracerFactories,
182       TransportTracer transportTracer,
183       int maxStreams,
184       int flowControlWindow,
185       int maxHeaderListSize,
186       int maxMessageSize,
187       long keepAliveTimeInNanos,
188       long keepAliveTimeoutInNanos,
189       long maxConnectionIdleInNanos,
190       long maxConnectionAgeInNanos,
191       long maxConnectionAgeGraceInNanos,
192       boolean permitKeepAliveWithoutCalls,
193       long permitKeepAliveTimeInNanos) {
194     Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive");
195     Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
196     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
197     Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive");
198 
199     final Http2Connection connection = new DefaultHttp2Connection(true);
200     WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
201     dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
202     DefaultHttp2RemoteFlowController controller =
203         new DefaultHttp2RemoteFlowController(connection, dist);
204     connection.remote().flowController(controller);
205     final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
206         permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
207 
208     // Create the local flow controller configured to auto-refill the connection window.
209     connection.local().flowController(
210         new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
211     frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
212     Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);
213     Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
214         frameReader);
215 
216     Http2Settings settings = new Http2Settings();
217     settings.initialWindowSize(flowControlWindow);
218     settings.maxConcurrentStreams(maxStreams);
219     settings.maxHeaderListSize(maxHeaderListSize);
220 
221     return new NettyServerHandler(
222         channelUnused,
223         connection,
224         transportListener,
225         streamTracerFactories,
226         transportTracer,
227         decoder, encoder, settings,
228         maxMessageSize,
229         keepAliveTimeInNanos, keepAliveTimeoutInNanos,
230         maxConnectionIdleInNanos,
231         maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
232         keepAliveEnforcer);
233   }
234 
NettyServerHandler( ChannelPromise channelUnused, final Http2Connection connection, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer)235   private NettyServerHandler(
236       ChannelPromise channelUnused,
237       final Http2Connection connection,
238       ServerTransportListener transportListener,
239       List<ServerStreamTracer.Factory> streamTracerFactories,
240       TransportTracer transportTracer,
241       Http2ConnectionDecoder decoder,
242       Http2ConnectionEncoder encoder,
243       Http2Settings settings,
244       int maxMessageSize,
245       long keepAliveTimeInNanos,
246       long keepAliveTimeoutInNanos,
247       long maxConnectionIdleInNanos,
248       long maxConnectionAgeInNanos,
249       long maxConnectionAgeGraceInNanos,
250       final KeepAliveEnforcer keepAliveEnforcer) {
251     super(channelUnused, decoder, encoder, settings);
252 
253     final MaxConnectionIdleManager maxConnectionIdleManager;
254     if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
255       maxConnectionIdleManager = null;
256     } else {
257       maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) {
258         @Override
259         void close(ChannelHandlerContext ctx) {
260           if (gracefulShutdown == null) {
261             gracefulShutdown = new GracefulShutdown("max_idle", null);
262             gracefulShutdown.start(ctx);
263             ctx.flush();
264           }
265         }
266       };
267     }
268 
269     connection.addListener(new Http2ConnectionAdapter() {
270       @Override
271       public void onStreamActive(Http2Stream stream) {
272         if (connection.numActiveStreams() == 1) {
273           keepAliveEnforcer.onTransportActive();
274           if (maxConnectionIdleManager != null) {
275             maxConnectionIdleManager.onTransportActive();
276           }
277         }
278       }
279 
280       @Override
281       public void onStreamClosed(Http2Stream stream) {
282         if (connection.numActiveStreams() == 0) {
283           keepAliveEnforcer.onTransportIdle();
284           if (maxConnectionIdleManager != null) {
285             maxConnectionIdleManager.onTransportIdle();
286           }
287         }
288       }
289     });
290 
291     checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
292     this.maxMessageSize = maxMessageSize;
293     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
294     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
295     this.maxConnectionIdleManager = maxConnectionIdleManager;
296     this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
297     this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
298     this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
299 
300     streamKey = encoder.connection().newKey();
301     this.transportListener = checkNotNull(transportListener, "transportListener");
302     this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
303     this.transportTracer = checkNotNull(transportTracer, "transportTracer");
304 
305     // Set the frame listener on the decoder.
306     decoder().frameListener(new FrameListener());
307   }
308 
309   @Nullable
connectionError()310   Throwable connectionError() {
311     return connectionError;
312   }
313 
314   @Override
handlerAdded(final ChannelHandlerContext ctx)315   public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
316     serverWriteQueue = new WriteQueue(ctx.channel());
317 
318     // init max connection age monitor
319     if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
320       maxConnectionAgeMonitor = ctx.executor().schedule(
321           new LogExceptionRunnable(new Runnable() {
322             @Override
323             public void run() {
324               if (gracefulShutdown == null) {
325                 gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
326                 gracefulShutdown.start(ctx);
327                 ctx.flush();
328               }
329             }
330           }),
331           maxConnectionAgeInNanos,
332           TimeUnit.NANOSECONDS);
333     }
334 
335     if (maxConnectionIdleManager != null) {
336       maxConnectionIdleManager.start(ctx);
337     }
338 
339     if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
340       keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
341           keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
342       keepAliveManager.onTransportStarted();
343     }
344 
345 
346     if (transportTracer != null) {
347       assert encoder().connection().equals(decoder().connection());
348       final Http2Connection connection = encoder().connection();
349       transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
350         private final Http2FlowController local = connection.local().flowController();
351         private final Http2FlowController remote = connection.remote().flowController();
352 
353         @Override
354         public TransportTracer.FlowControlWindows read() {
355           assert ctx.executor().inEventLoop();
356           return new TransportTracer.FlowControlWindows(
357               local.windowSize(connection.connectionStream()),
358               remote.windowSize(connection.connectionStream()));
359         }
360       });
361     }
362 
363     super.handlerAdded(ctx);
364   }
365 
onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)366   private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
367       throws Http2Exception {
368     if (!teWarningLogged && !TE_TRAILERS.equals(headers.get(TE_HEADER))) {
369       logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
370               + "some intermediate proxy may not support trailers",
371           TE_TRAILERS, headers.get(TE_HEADER)));
372       teWarningLogged = true;
373     }
374 
375     try {
376 
377       // Remove the leading slash of the path and get the fully qualified method name
378       CharSequence path = headers.path();
379 
380       if (path == null) {
381         respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
382             "Expected path but is missing");
383         return;
384       }
385 
386       if (path.charAt(0) != '/') {
387         respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
388             String.format("Expected path to start with /: %s", path));
389         return;
390       }
391 
392       String method = path.subSequence(1, path.length()).toString();
393 
394       // Verify that the Content-Type is correct in the request.
395       CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
396       if (contentType == null) {
397         respondWithHttpError(
398             ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
399         return;
400       }
401       String contentTypeString = contentType.toString();
402       if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
403         respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
404             String.format("Content-Type '%s' is not supported", contentTypeString));
405         return;
406       }
407 
408       if (!HTTP_METHOD.equals(headers.method())) {
409         respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
410             String.format("Method '%s' is not supported", headers.method()));
411         return;
412       }
413 
414       // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
415       // method.
416       Http2Stream http2Stream = requireHttp2Stream(streamId);
417 
418       Metadata metadata = Utils.convertHeaders(headers);
419       StatsTraceContext statsTraceCtx =
420           StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
421 
422       NettyServerStream.TransportState state = new NettyServerStream.TransportState(
423           this,
424           ctx.channel().eventLoop(),
425           http2Stream,
426           maxMessageSize,
427           statsTraceCtx,
428           transportTracer);
429       String authority = getOrUpdateAuthority((AsciiString) headers.authority());
430       NettyServerStream stream = new NettyServerStream(
431           ctx.channel(),
432           state,
433           attributes,
434           authority,
435           statsTraceCtx,
436           transportTracer);
437       transportListener.streamCreated(stream, method, metadata);
438       state.onStreamAllocated();
439       http2Stream.setProperty(streamKey, state);
440     } catch (Exception e) {
441       logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
442       // Throw an exception that will get handled by onStreamError.
443       throw newStreamException(streamId, e);
444     }
445   }
446 
getOrUpdateAuthority(AsciiString authority)447   private String getOrUpdateAuthority(AsciiString authority) {
448     if (authority == null) {
449       return null;
450     } else if (!authority.equals(lastKnownAuthority)) {
451       lastKnownAuthority = authority;
452     }
453 
454     // AsciiString.toString() is internally cached, so subsequent calls will not
455     // result in recomputing the String representation of lastKnownAuthority.
456     return lastKnownAuthority.toString();
457   }
458 
onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)459   private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
460       throws Http2Exception {
461     flowControlPing().onDataRead(data.readableBytes(), padding);
462     try {
463       NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
464       stream.inboundDataReceived(data, endOfStream);
465     } catch (Throwable e) {
466       logger.log(Level.WARNING, "Exception in onDataRead()", e);
467       // Throw an exception that will get handled by onStreamError.
468       throw newStreamException(streamId, e);
469     }
470   }
471 
onRstStreamRead(int streamId, long errorCode)472   private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
473     try {
474       NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
475       if (stream != null) {
476         stream.transportReportStatus(
477             Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
478       }
479     } catch (Throwable e) {
480       logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
481       // Throw an exception that will get handled by onStreamError.
482       throw newStreamException(streamId, e);
483     }
484   }
485 
486   @Override
onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)487   protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
488       Http2Exception http2Ex) {
489     logger.log(Level.FINE, "Connection Error", cause);
490     connectionError = cause;
491     super.onConnectionError(ctx, outbound, cause, http2Ex);
492   }
493 
494   @Override
onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, StreamException http2Ex)495   protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
496       StreamException http2Ex) {
497     logger.log(Level.WARNING, "Stream Error", cause);
498     NettyServerStream.TransportState serverStream = serverStream(
499         connection().stream(Http2Exception.streamId(http2Ex)));
500     if (serverStream != null) {
501       serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
502     }
503     // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
504     // Delegate to the base class to send a RST_STREAM.
505     super.onStreamError(ctx, outbound, cause, http2Ex);
506   }
507 
508   @Override
handleProtocolNegotiationCompleted( Attributes attrs, InternalChannelz.Security securityInfo)509   public void handleProtocolNegotiationCompleted(
510       Attributes attrs, InternalChannelz.Security securityInfo) {
511     negotiationAttributes = attrs;
512     this.securityInfo = securityInfo;
513   }
514 
getSecurityInfo()515   InternalChannelz.Security getSecurityInfo() {
516     return securityInfo;
517   }
518 
519   @VisibleForTesting
getKeepAliveManagerForTest()520   KeepAliveManager getKeepAliveManagerForTest() {
521     return keepAliveManager;
522   }
523 
524   @VisibleForTesting
setKeepAliveManagerForTest(KeepAliveManager keepAliveManager)525   void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
526     this.keepAliveManager = keepAliveManager;
527   }
528 
529   /**
530    * Handler for the Channel shutting down.
531    */
532   @Override
channelInactive(ChannelHandlerContext ctx)533   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
534     try {
535       if (keepAliveManager != null) {
536         keepAliveManager.onTransportTermination();
537       }
538       if (maxConnectionIdleManager != null) {
539         maxConnectionIdleManager.onTransportTermination();
540       }
541       if (maxConnectionAgeMonitor != null) {
542         maxConnectionAgeMonitor.cancel(false);
543       }
544       final Status status =
545           Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
546       // Any streams that are still active must be closed
547       connection().forEachActiveStream(new Http2StreamVisitor() {
548         @Override
549         public boolean visit(Http2Stream stream) throws Http2Exception {
550           NettyServerStream.TransportState serverStream = serverStream(stream);
551           if (serverStream != null) {
552             serverStream.transportReportStatus(status);
553           }
554           return true;
555         }
556       });
557     } finally {
558       super.channelInactive(ctx);
559     }
560   }
561 
getWriteQueue()562   WriteQueue getWriteQueue() {
563     return serverWriteQueue;
564   }
565 
566   /**
567    * Handler for commands sent from the stream.
568    */
569   @Override
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)570   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
571       throws Exception {
572     if (msg instanceof SendGrpcFrameCommand) {
573       sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
574     } else if (msg instanceof SendResponseHeadersCommand) {
575       sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
576     } else if (msg instanceof CancelServerStreamCommand) {
577       cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
578     } else if (msg instanceof ForcefulCloseCommand) {
579       forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
580     } else {
581       AssertionError e =
582           new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
583       ReferenceCountUtil.release(msg);
584       promise.setFailure(e);
585       throw e;
586     }
587   }
588 
589   /**
590    * Returns the given processed bytes back to inbound flow control.
591    */
returnProcessedBytes(Http2Stream http2Stream, int bytes)592   void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
593     try {
594       decoder().flowController().consumeBytes(http2Stream, bytes);
595     } catch (Http2Exception e) {
596       throw new RuntimeException(e);
597     }
598   }
599 
closeStreamWhenDone(ChannelPromise promise, int streamId)600   private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
601     final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
602     promise.addListener(new ChannelFutureListener() {
603       @Override
604       public void operationComplete(ChannelFuture future) {
605         stream.complete();
606       }
607     });
608   }
609 
610   /**
611    * Sends the given gRPC frame to the client.
612    */
sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)613   private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
614       ChannelPromise promise) throws Http2Exception {
615     if (cmd.endStream()) {
616       closeStreamWhenDone(promise, cmd.streamId());
617     }
618     // Call the base class to write the HTTP/2 DATA frame.
619     encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
620   }
621 
622   /**
623    * Sends the response headers to the client.
624    */
sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise)625   private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
626       ChannelPromise promise) throws Http2Exception {
627     // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is
628     // fixed.
629     int streamId = cmd.stream().id();
630     Http2Stream stream = connection().stream(streamId);
631     if (stream == null) {
632       resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
633       return;
634     }
635     if (cmd.endOfStream()) {
636       closeStreamWhenDone(promise, streamId);
637     }
638     encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
639   }
640 
cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise)641   private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
642       ChannelPromise promise) {
643     // Notify the listener if we haven't already.
644     cmd.stream().transportReportStatus(cmd.reason());
645     // Terminate the stream.
646     encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
647   }
648 
forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)649   private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
650       ChannelPromise promise) throws Exception {
651     close(ctx, promise);
652     connection().forEachActiveStream(new Http2StreamVisitor() {
653       @Override
654       public boolean visit(Http2Stream stream) throws Http2Exception {
655         NettyServerStream.TransportState serverStream = serverStream(stream);
656         if (serverStream != null) {
657           serverStream.transportReportStatus(msg.getStatus());
658           resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
659         }
660         stream.close();
661         return true;
662       }
663     });
664   }
665 
respondWithHttpError( ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg)666   private void respondWithHttpError(
667       ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
668     Metadata metadata = new Metadata();
669     metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
670     metadata.put(InternalStatus.MESSAGE_KEY, msg);
671     byte[][] serialized = InternalMetadata.serialize(metadata);
672 
673     Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
674         .status("" + code)
675         .set(CONTENT_TYPE_HEADER, "text/plain; encoding=utf-8");
676     for (int i = 0; i < serialized.length; i += 2) {
677       headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
678     }
679     encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
680     ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
681     encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
682   }
683 
requireHttp2Stream(int streamId)684   private Http2Stream requireHttp2Stream(int streamId) {
685     Http2Stream stream = connection().stream(streamId);
686     if (stream == null) {
687       // This should never happen.
688       throw new AssertionError("Stream does not exist: " + streamId);
689     }
690     return stream;
691   }
692 
693   /**
694    * Returns the server stream associated to the given HTTP/2 stream object.
695    */
serverStream(Http2Stream stream)696   private NettyServerStream.TransportState serverStream(Http2Stream stream) {
697     return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
698   }
699 
newStreamException(int streamId, Throwable cause)700   private Http2Exception newStreamException(int streamId, Throwable cause) {
701     return Http2Exception.streamError(
702         streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
703   }
704 
705   private class FrameListener extends Http2FrameAdapter {
706     private boolean firstSettings = true;
707 
708     @Override
onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)709     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
710       if (firstSettings) {
711         firstSettings = false;
712         // Delay transportReady until we see the client's HTTP handshake, for coverage with
713         // handshakeTimeout
714         attributes = transportListener.transportReady(negotiationAttributes);
715       }
716     }
717 
718     @Override
onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)719     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
720         boolean endOfStream) throws Http2Exception {
721       if (keepAliveManager != null) {
722         keepAliveManager.onDataReceived();
723       }
724       NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
725       return padding;
726     }
727 
728     @Override
onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)729     public void onHeadersRead(ChannelHandlerContext ctx,
730         int streamId,
731         Http2Headers headers,
732         int streamDependency,
733         short weight,
734         boolean exclusive,
735         int padding,
736         boolean endStream) throws Http2Exception {
737       if (keepAliveManager != null) {
738         keepAliveManager.onDataReceived();
739       }
740       NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
741     }
742 
743     @Override
onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)744     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
745         throws Http2Exception {
746       if (keepAliveManager != null) {
747         keepAliveManager.onDataReceived();
748       }
749       NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
750     }
751 
752     @Override
onPingRead(ChannelHandlerContext ctx, long data)753     public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
754       if (keepAliveManager != null) {
755         keepAliveManager.onDataReceived();
756       }
757       if (!keepAliveEnforcer.pingAcceptable()) {
758         ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
759         goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
760             debugData, ctx.newPromise());
761         Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
762         try {
763           forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
764         } catch (Exception ex) {
765           onError(ctx, /* outbound= */ true, ex);
766         }
767       }
768     }
769 
770     @Override
onPingAckRead(ChannelHandlerContext ctx, long data)771     public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
772       if (keepAliveManager != null) {
773         keepAliveManager.onDataReceived();
774       }
775       if (data == flowControlPing().payload()) {
776         flowControlPing().updateWindow();
777         if (logger.isLoggable(Level.FINE)) {
778           logger.log(Level.FINE, String.format("Window: %d",
779               decoder().flowController().initialWindowSize(connection().connectionStream())));
780         }
781       } else if (data == GRACEFUL_SHUTDOWN_PING) {
782         if (gracefulShutdown == null) {
783           // this should never happen
784           logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
785         } else {
786           gracefulShutdown.secondGoAwayAndClose(ctx);
787         }
788       } else if (data != KEEPALIVE_PING) {
789         logger.warning("Received unexpected ping ack. No ping outstanding");
790       }
791     }
792   }
793 
794   private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
795     final ChannelHandlerContext ctx;
796 
KeepAlivePinger(ChannelHandlerContext ctx)797     KeepAlivePinger(ChannelHandlerContext ctx) {
798       this.ctx = ctx;
799     }
800 
801     @Override
ping()802     public void ping() {
803       ChannelFuture pingFuture = encoder().writePing(
804           ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
805       ctx.flush();
806       if (transportTracer != null) {
807         pingFuture.addListener(new ChannelFutureListener() {
808           @Override
809           public void operationComplete(ChannelFuture future) throws Exception {
810             if (future.isSuccess()) {
811               transportTracer.reportKeepAliveSent();
812             }
813           }
814         });
815       }
816     }
817 
818     @Override
onPingTimeout()819     public void onPingTimeout() {
820       try {
821         forcefulClose(
822             ctx,
823             new ForcefulCloseCommand(Status.UNAVAILABLE
824                 .withDescription("Keepalive failed. The connection is likely gone")),
825             ctx.newPromise());
826       } catch (Exception ex) {
827         try {
828           exceptionCaught(ctx, ex);
829         } catch (Exception ex2) {
830           logger.log(Level.WARNING, "Exception while propagating exception", ex2);
831           logger.log(Level.WARNING, "Original failure", ex);
832         }
833       }
834     }
835   }
836 
837   private final class GracefulShutdown {
838     String goAwayMessage;
839 
840     /**
841      * The grace time between starting graceful shutdown and closing the netty channel,
842      * {@code null} is unspecified.
843      */
844     @CheckForNull
845     Long graceTimeInNanos;
846 
847     /**
848      * True if ping is Acked or ping is timeout.
849      */
850     boolean pingAckedOrTimeout;
851 
852     Future<?> pingFuture;
853 
GracefulShutdown(String goAwayMessage, @Nullable Long graceTimeInNanos)854     GracefulShutdown(String goAwayMessage,
855         @Nullable Long graceTimeInNanos) {
856       this.goAwayMessage = goAwayMessage;
857       this.graceTimeInNanos = graceTimeInNanos;
858     }
859 
860     /**
861      * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
862      */
start(final ChannelHandlerContext ctx)863     void start(final ChannelHandlerContext ctx) {
864       goAway(
865           ctx,
866           Integer.MAX_VALUE,
867           Http2Error.NO_ERROR.code(),
868           ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
869           ctx.newPromise());
870 
871       pingFuture = ctx.executor().schedule(
872           new Runnable() {
873             @Override
874             public void run() {
875               secondGoAwayAndClose(ctx);
876             }
877           },
878           GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
879           TimeUnit.NANOSECONDS);
880 
881       encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
882     }
883 
secondGoAwayAndClose(ChannelHandlerContext ctx)884     void secondGoAwayAndClose(ChannelHandlerContext ctx) {
885       if (pingAckedOrTimeout) {
886         return;
887       }
888       pingAckedOrTimeout = true;
889 
890       checkNotNull(pingFuture, "pingFuture");
891       pingFuture.cancel(false);
892 
893       // send the second GOAWAY with last stream id
894       goAway(
895           ctx,
896           connection().remote().lastStreamCreated(),
897           Http2Error.NO_ERROR.code(),
898           ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
899           ctx.newPromise());
900 
901       // gracefully shutdown with specified grace time
902       long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
903       long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis;
904       if (graceTimeInNanos != null) {
905         gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
906       }
907       try {
908         gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
909         close(ctx, ctx.newPromise());
910       } catch (Exception e) {
911         onError(ctx, /* outbound= */ true, e);
912       } finally {
913         gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
914       }
915     }
916   }
917 
918   // Use a frame writer so that we know when frames are through flow control and actually being
919   // written.
920   private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
921     private final KeepAliveEnforcer keepAliveEnforcer;
922 
WriteMonitoringFrameWriter(Http2FrameWriter delegate, KeepAliveEnforcer keepAliveEnforcer)923     public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
924         KeepAliveEnforcer keepAliveEnforcer) {
925       super(delegate);
926       this.keepAliveEnforcer = keepAliveEnforcer;
927     }
928 
929     @Override
writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)930     public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
931         int padding, boolean endStream, ChannelPromise promise) {
932       keepAliveEnforcer.resetCounters();
933       return super.writeData(ctx, streamId, data, padding, endStream, promise);
934     }
935 
936     @Override
writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)937     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
938         int padding, boolean endStream, ChannelPromise promise) {
939       keepAliveEnforcer.resetCounters();
940       return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
941     }
942 
943     @Override
writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)944     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
945         int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
946         ChannelPromise promise) {
947       keepAliveEnforcer.resetCounters();
948       return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
949           padding, endStream, promise);
950     }
951   }
952 }
953