• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21 import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
22 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
23 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
24 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
25 import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER;
26 import static io.grpc.netty.Utils.HTTP_METHOD;
27 import static io.grpc.netty.Utils.TE_HEADER;
28 import static io.grpc.netty.Utils.TE_TRAILERS;
29 import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION;
30 import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
31 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
32 import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY;
33 
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Ticker;
38 import io.grpc.Attributes;
39 import io.grpc.ChannelLogger;
40 import io.grpc.ChannelLogger.ChannelLogLevel;
41 import io.grpc.InternalChannelz;
42 import io.grpc.InternalMetadata;
43 import io.grpc.InternalStatus;
44 import io.grpc.Metadata;
45 import io.grpc.ServerStreamTracer;
46 import io.grpc.Status;
47 import io.grpc.internal.GrpcUtil;
48 import io.grpc.internal.KeepAliveEnforcer;
49 import io.grpc.internal.KeepAliveManager;
50 import io.grpc.internal.LogExceptionRunnable;
51 import io.grpc.internal.MaxConnectionIdleManager;
52 import io.grpc.internal.ServerTransportListener;
53 import io.grpc.internal.StatsTraceContext;
54 import io.grpc.internal.TransportTracer;
55 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
56 import io.netty.buffer.ByteBuf;
57 import io.netty.buffer.ByteBufUtil;
58 import io.netty.buffer.Unpooled;
59 import io.netty.channel.ChannelFuture;
60 import io.netty.channel.ChannelFutureListener;
61 import io.netty.channel.ChannelHandlerContext;
62 import io.netty.channel.ChannelPromise;
63 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
64 import io.netty.handler.codec.http2.DefaultHttp2Connection;
65 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
66 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
67 import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
68 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
69 import io.netty.handler.codec.http2.DefaultHttp2Headers;
70 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
71 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
72 import io.netty.handler.codec.http2.Http2Connection;
73 import io.netty.handler.codec.http2.Http2ConnectionAdapter;
74 import io.netty.handler.codec.http2.Http2ConnectionDecoder;
75 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
76 import io.netty.handler.codec.http2.Http2Error;
77 import io.netty.handler.codec.http2.Http2Exception;
78 import io.netty.handler.codec.http2.Http2Exception.StreamException;
79 import io.netty.handler.codec.http2.Http2FrameAdapter;
80 import io.netty.handler.codec.http2.Http2FrameLogger;
81 import io.netty.handler.codec.http2.Http2FrameReader;
82 import io.netty.handler.codec.http2.Http2FrameWriter;
83 import io.netty.handler.codec.http2.Http2Headers;
84 import io.netty.handler.codec.http2.Http2HeadersDecoder;
85 import io.netty.handler.codec.http2.Http2InboundFrameLogger;
86 import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
87 import io.netty.handler.codec.http2.Http2Settings;
88 import io.netty.handler.codec.http2.Http2Stream;
89 import io.netty.handler.codec.http2.Http2StreamVisitor;
90 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
91 import io.netty.handler.logging.LogLevel;
92 import io.netty.util.AsciiString;
93 import io.netty.util.ReferenceCountUtil;
94 import io.perfmark.PerfMark;
95 import io.perfmark.Tag;
96 import io.perfmark.TaskCloseable;
97 import java.text.MessageFormat;
98 import java.util.List;
99 import java.util.concurrent.Future;
100 import java.util.concurrent.ScheduledFuture;
101 import java.util.concurrent.TimeUnit;
102 import java.util.logging.Level;
103 import java.util.logging.Logger;
104 import javax.annotation.CheckForNull;
105 import javax.annotation.Nullable;
106 
107 /**
108  * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within
109  * the context of the Netty Channel thread.
110  */
111 class NettyServerHandler extends AbstractNettyHandler {
112   private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
113   private static final long KEEPALIVE_PING = 0xDEADL;
114   @VisibleForTesting
115   static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
116   private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
117   /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */
118   private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean(
119       System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false"));
120 
121   private final Http2Connection.PropertyKey streamKey;
122   private final ServerTransportListener transportListener;
123   private final int maxMessageSize;
124   private final long keepAliveTimeInNanos;
125   private final long keepAliveTimeoutInNanos;
126   private final long maxConnectionAgeInNanos;
127   private final long maxConnectionAgeGraceInNanos;
128   private final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
129   private final TransportTracer transportTracer;
130   private final KeepAliveEnforcer keepAliveEnforcer;
131   private final Attributes eagAttributes;
132   /** Incomplete attributes produced by negotiator. */
133   private Attributes negotiationAttributes;
134   private InternalChannelz.Security securityInfo;
135   /** Completed attributes produced by transportReady. */
136   private Attributes attributes;
137   private Throwable connectionError;
138   private boolean teWarningLogged;
139   private WriteQueue serverWriteQueue;
140   private AsciiString lastKnownAuthority;
141   @CheckForNull
142   private KeepAliveManager keepAliveManager;
143   @CheckForNull
144   private MaxConnectionIdleManager maxConnectionIdleManager;
145   @CheckForNull
146   private ScheduledFuture<?> maxConnectionAgeMonitor;
147   @CheckForNull
148   private GracefulShutdown gracefulShutdown;
149 
newHandler( ServerTransportListener transportListener, ChannelPromise channelUnused, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes)150   static NettyServerHandler newHandler(
151       ServerTransportListener transportListener,
152       ChannelPromise channelUnused,
153       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
154       TransportTracer transportTracer,
155       int maxStreams,
156       boolean autoFlowControl,
157       int flowControlWindow,
158       int maxHeaderListSize,
159       int maxMessageSize,
160       long keepAliveTimeInNanos,
161       long keepAliveTimeoutInNanos,
162       long maxConnectionIdleInNanos,
163       long maxConnectionAgeInNanos,
164       long maxConnectionAgeGraceInNanos,
165       boolean permitKeepAliveWithoutCalls,
166       long permitKeepAliveTimeInNanos,
167       Attributes eagAttributes) {
168     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
169         maxHeaderListSize);
170     Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class);
171     Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize);
172     Http2FrameReader frameReader = new Http2InboundFrameLogger(
173         new DefaultHttp2FrameReader(headersDecoder), frameLogger);
174     Http2FrameWriter frameWriter =
175         new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger);
176     return newHandler(
177         channelUnused,
178         frameReader,
179         frameWriter,
180         transportListener,
181         streamTracerFactories,
182         transportTracer,
183         maxStreams,
184         autoFlowControl,
185         flowControlWindow,
186         maxHeaderListSize,
187         maxMessageSize,
188         keepAliveTimeInNanos,
189         keepAliveTimeoutInNanos,
190         maxConnectionIdleInNanos,
191         maxConnectionAgeInNanos,
192         maxConnectionAgeGraceInNanos,
193         permitKeepAliveWithoutCalls,
194         permitKeepAliveTimeInNanos,
195         eagAttributes,
196         Ticker.systemTicker());
197   }
198 
newHandler( ChannelPromise channelUnused, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes, Ticker ticker)199   static NettyServerHandler newHandler(
200       ChannelPromise channelUnused,
201       Http2FrameReader frameReader,
202       Http2FrameWriter frameWriter,
203       ServerTransportListener transportListener,
204       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
205       TransportTracer transportTracer,
206       int maxStreams,
207       boolean autoFlowControl,
208       int flowControlWindow,
209       int maxHeaderListSize,
210       int maxMessageSize,
211       long keepAliveTimeInNanos,
212       long keepAliveTimeoutInNanos,
213       long maxConnectionIdleInNanos,
214       long maxConnectionAgeInNanos,
215       long maxConnectionAgeGraceInNanos,
216       boolean permitKeepAliveWithoutCalls,
217       long permitKeepAliveTimeInNanos,
218       Attributes eagAttributes,
219       Ticker ticker) {
220     Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams);
221     Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s",
222         flowControlWindow);
223     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s",
224         maxHeaderListSize);
225     Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s",
226         maxMessageSize);
227 
228     final Http2Connection connection = new DefaultHttp2Connection(true);
229     WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
230     dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
231     DefaultHttp2RemoteFlowController controller =
232         new DefaultHttp2RemoteFlowController(connection, dist);
233     connection.remote().flowController(controller);
234     final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(
235         permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
236 
237     // Create the local flow controller configured to auto-refill the connection window.
238     connection.local().flowController(
239         new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
240     frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);
241     Http2ConnectionEncoder encoder =
242         new DefaultHttp2ConnectionEncoder(connection, frameWriter);
243     encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);
244     Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
245         frameReader);
246 
247     Http2Settings settings = new Http2Settings();
248     settings.initialWindowSize(flowControlWindow);
249     settings.maxConcurrentStreams(maxStreams);
250     settings.maxHeaderListSize(maxHeaderListSize);
251 
252     if (ticker == null) {
253       ticker = Ticker.systemTicker();
254     }
255 
256     return new NettyServerHandler(
257         channelUnused,
258         connection,
259         transportListener,
260         streamTracerFactories,
261         transportTracer,
262         decoder, encoder, settings,
263         maxMessageSize,
264         keepAliveTimeInNanos, keepAliveTimeoutInNanos,
265         maxConnectionIdleInNanos,
266         maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
267         keepAliveEnforcer,
268         autoFlowControl,
269         eagAttributes, ticker);
270   }
271 
NettyServerHandler( ChannelPromise channelUnused, final Http2Connection connection, ServerTransportListener transportListener, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer, boolean autoFlowControl, Attributes eagAttributes, Ticker ticker)272   private NettyServerHandler(
273       ChannelPromise channelUnused,
274       final Http2Connection connection,
275       ServerTransportListener transportListener,
276       List<? extends ServerStreamTracer.Factory> streamTracerFactories,
277       TransportTracer transportTracer,
278       Http2ConnectionDecoder decoder,
279       Http2ConnectionEncoder encoder,
280       Http2Settings settings,
281       int maxMessageSize,
282       long keepAliveTimeInNanos,
283       long keepAliveTimeoutInNanos,
284       long maxConnectionIdleInNanos,
285       long maxConnectionAgeInNanos,
286       long maxConnectionAgeGraceInNanos,
287       final KeepAliveEnforcer keepAliveEnforcer,
288       boolean autoFlowControl,
289       Attributes eagAttributes,
290       Ticker ticker) {
291     super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(),
292         autoFlowControl, null, ticker);
293 
294     final MaxConnectionIdleManager maxConnectionIdleManager;
295     if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) {
296       maxConnectionIdleManager = null;
297     } else {
298       maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos);
299     }
300 
301     connection.addListener(new Http2ConnectionAdapter() {
302       @Override
303       public void onStreamActive(Http2Stream stream) {
304         if (connection.numActiveStreams() == 1) {
305           keepAliveEnforcer.onTransportActive();
306           if (maxConnectionIdleManager != null) {
307             maxConnectionIdleManager.onTransportActive();
308           }
309         }
310       }
311 
312       @Override
313       public void onStreamClosed(Http2Stream stream) {
314         if (connection.numActiveStreams() == 0) {
315           keepAliveEnforcer.onTransportIdle();
316           if (maxConnectionIdleManager != null) {
317             maxConnectionIdleManager.onTransportIdle();
318           }
319         }
320       }
321     });
322 
323     checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize);
324     this.maxMessageSize = maxMessageSize;
325     this.keepAliveTimeInNanos = keepAliveTimeInNanos;
326     this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos;
327     this.maxConnectionIdleManager = maxConnectionIdleManager;
328     this.maxConnectionAgeInNanos = maxConnectionAgeInNanos;
329     this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
330     this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer");
331     this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes");
332 
333     streamKey = encoder.connection().newKey();
334     this.transportListener = checkNotNull(transportListener, "transportListener");
335     this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
336     this.transportTracer = checkNotNull(transportTracer, "transportTracer");
337     // Set the frame listener on the decoder.
338     decoder().frameListener(new FrameListener());
339   }
340 
341   @Nullable
connectionError()342   Throwable connectionError() {
343     return connectionError;
344   }
345 
346   @Override
handlerAdded(final ChannelHandlerContext ctx)347   public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
348     serverWriteQueue = new WriteQueue(ctx.channel());
349 
350     // init max connection age monitor
351     if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
352       maxConnectionAgeMonitor = ctx.executor().schedule(
353           new LogExceptionRunnable(new Runnable() {
354             @Override
355             public void run() {
356               if (gracefulShutdown == null) {
357                 gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
358                 gracefulShutdown.start(ctx);
359                 ctx.flush();
360               }
361             }
362           }),
363           maxConnectionAgeInNanos,
364           TimeUnit.NANOSECONDS);
365     }
366 
367     if (maxConnectionIdleManager != null) {
368       maxConnectionIdleManager.start(new Runnable() {
369         @Override
370         public void run() {
371           if (gracefulShutdown == null) {
372             gracefulShutdown = new GracefulShutdown("max_idle", null);
373             gracefulShutdown.start(ctx);
374             ctx.flush();
375           }
376         }
377       }, ctx.executor());
378     }
379 
380     if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) {
381       keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(),
382           keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */);
383       keepAliveManager.onTransportStarted();
384     }
385 
386     assert encoder().connection().equals(decoder().connection());
387     transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection()));
388 
389     super.handlerAdded(ctx);
390   }
391 
onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)392   private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)
393       throws Http2Exception {
394     try {
395       // Connection-specific header fields makes a request malformed. Ideally this would be handled
396       // by Netty. RFC 7540 section 8.1.2.2
397       if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) {
398         resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
399         return;
400       }
401 
402       if (headers.authority() == null) {
403         List<CharSequence> hosts = headers.getAll(HOST);
404         if (hosts.size() > 1) {
405           // RFC 7230 section 5.4
406           respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL,
407               "Multiple host headers");
408           return;
409         }
410         if (!hosts.isEmpty()) {
411           headers.add(AUTHORITY.value(), hosts.get(0));
412         }
413       }
414       headers.remove(HOST);
415 
416       // Remove the leading slash of the path and get the fully qualified method name
417       CharSequence path = headers.path();
418 
419       if (path == null) {
420         respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
421             "Expected path but is missing");
422         return;
423       }
424 
425       if (path.charAt(0) != '/') {
426         respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED,
427             String.format("Expected path to start with /: %s", path));
428         return;
429       }
430 
431       String method = path.subSequence(1, path.length()).toString();
432 
433       // Verify that the Content-Type is correct in the request.
434       CharSequence contentType = headers.get(CONTENT_TYPE_HEADER);
435       if (contentType == null) {
436         respondWithHttpError(
437             ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request");
438         return;
439       }
440       String contentTypeString = contentType.toString();
441       if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
442         respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL,
443             String.format("Content-Type '%s' is not supported", contentTypeString));
444         return;
445       }
446 
447       if (!HTTP_METHOD.contentEquals(headers.method())) {
448         respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL,
449             String.format("Method '%s' is not supported", headers.method()));
450         return;
451       }
452 
453       if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) {
454         logger.warning(String.format("Expected header TE: %s, but %s is received. This means "
455                 + "some intermediate proxy may not support trailers",
456             TE_TRAILERS, headers.get(TE_HEADER)));
457         teWarningLogged = true;
458       }
459 
460       // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this
461       // method.
462       Http2Stream http2Stream = requireHttp2Stream(streamId);
463 
464       Metadata metadata = Utils.convertHeaders(headers);
465       StatsTraceContext statsTraceCtx =
466           StatsTraceContext.newServerContext(streamTracerFactories, method, metadata);
467 
468       NettyServerStream.TransportState state = new NettyServerStream.TransportState(
469           this,
470           ctx.channel().eventLoop(),
471           http2Stream,
472           maxMessageSize,
473           statsTraceCtx,
474           transportTracer,
475           method);
476 
477       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) {
478         PerfMark.attachTag(state.tag());
479         String authority = getOrUpdateAuthority((AsciiString) headers.authority());
480         NettyServerStream stream = new NettyServerStream(
481             ctx.channel(),
482             state,
483             attributes,
484             authority,
485             statsTraceCtx,
486             transportTracer);
487         transportListener.streamCreated(stream, method, metadata);
488         state.onStreamAllocated();
489         http2Stream.setProperty(streamKey, state);
490       }
491     } catch (Exception e) {
492       logger.log(Level.WARNING, "Exception in onHeadersRead()", e);
493       // Throw an exception that will get handled by onStreamError.
494       throw newStreamException(streamId, e);
495     }
496   }
497 
getOrUpdateAuthority(AsciiString authority)498   private String getOrUpdateAuthority(AsciiString authority) {
499     if (authority == null) {
500       return null;
501     } else if (!authority.equals(lastKnownAuthority)) {
502       lastKnownAuthority = authority;
503     }
504 
505     // AsciiString.toString() is internally cached, so subsequent calls will not
506     // result in recomputing the String representation of lastKnownAuthority.
507     return lastKnownAuthority.toString();
508   }
509 
onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)510   private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)
511       throws Http2Exception {
512     flowControlPing().onDataRead(data.readableBytes(), padding);
513     try {
514       NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
515       try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) {
516         PerfMark.attachTag(stream.tag());
517         stream.inboundDataReceived(data, endOfStream);
518       }
519     } catch (Throwable e) {
520       logger.log(Level.WARNING, "Exception in onDataRead()", e);
521       // Throw an exception that will get handled by onStreamError.
522       throw newStreamException(streamId, e);
523     }
524   }
525 
onRstStreamRead(int streamId, long errorCode)526   private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception {
527     try {
528       NettyServerStream.TransportState stream = serverStream(connection().stream(streamId));
529       if (stream != null) {
530         try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) {
531           PerfMark.attachTag(stream.tag());
532           stream.transportReportStatus(
533               Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode));
534         }
535       }
536     } catch (Throwable e) {
537       logger.log(Level.WARNING, "Exception in onRstStreamRead()", e);
538       // Throw an exception that will get handled by onStreamError.
539       throw newStreamException(streamId, e);
540     }
541   }
542 
543   @Override
onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)544   protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
545       Http2Exception http2Ex) {
546     logger.log(Level.FINE, "Connection Error", cause);
547     connectionError = cause;
548     super.onConnectionError(ctx, outbound, cause, http2Ex);
549   }
550 
551   @Override
onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, StreamException http2Ex)552   protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
553       StreamException http2Ex) {
554     NettyServerStream.TransportState serverStream = serverStream(
555         connection().stream(Http2Exception.streamId(http2Ex)));
556     Level level = Level.WARNING;
557     if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) {
558       level = Level.FINE;
559     }
560     logger.log(level, "Stream Error", cause);
561     Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag();
562     try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) {
563       PerfMark.attachTag(tag);
564       if (serverStream != null) {
565         serverStream.transportReportStatus(Utils.statusFromThrowable(cause));
566       }
567       // TODO(ejona): Abort the stream by sending headers to help the client with debugging.
568       // Delegate to the base class to send a RST_STREAM.
569       super.onStreamError(ctx, outbound, cause, http2Ex);
570     }
571   }
572 
573   @Override
handleProtocolNegotiationCompleted( Attributes attrs, InternalChannelz.Security securityInfo)574   public void handleProtocolNegotiationCompleted(
575       Attributes attrs, InternalChannelz.Security securityInfo) {
576     negotiationAttributes = attrs;
577     this.securityInfo = securityInfo;
578     super.handleProtocolNegotiationCompleted(attrs, securityInfo);
579     NettyClientHandler.writeBufferingAndRemove(ctx().channel());
580   }
581 
582   @Override
getEagAttributes()583   public Attributes getEagAttributes() {
584     return eagAttributes;
585   }
586 
getSecurityInfo()587   InternalChannelz.Security getSecurityInfo() {
588     return securityInfo;
589   }
590 
591   @VisibleForTesting
getKeepAliveManagerForTest()592   KeepAliveManager getKeepAliveManagerForTest() {
593     return keepAliveManager;
594   }
595 
596   @VisibleForTesting
setKeepAliveManagerForTest(KeepAliveManager keepAliveManager)597   void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) {
598     this.keepAliveManager = keepAliveManager;
599   }
600 
601   /**
602    * Handler for the Channel shutting down.
603    */
604   @Override
channelInactive(ChannelHandlerContext ctx)605   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
606     try {
607       if (keepAliveManager != null) {
608         keepAliveManager.onTransportTermination();
609       }
610       if (maxConnectionIdleManager != null) {
611         maxConnectionIdleManager.onTransportTermination();
612       }
613       if (maxConnectionAgeMonitor != null) {
614         maxConnectionAgeMonitor.cancel(false);
615       }
616       final Status status =
617           Status.UNAVAILABLE.withDescription("connection terminated for unknown reason");
618       // Any streams that are still active must be closed
619       connection().forEachActiveStream(new Http2StreamVisitor() {
620         @Override
621         public boolean visit(Http2Stream stream) throws Http2Exception {
622           NettyServerStream.TransportState serverStream = serverStream(stream);
623           if (serverStream != null) {
624             serverStream.transportReportStatus(status);
625           }
626           return true;
627         }
628       });
629     } finally {
630       super.channelInactive(ctx);
631     }
632   }
633 
getWriteQueue()634   WriteQueue getWriteQueue() {
635     return serverWriteQueue;
636   }
637 
638   /**
639    * Handler for commands sent from the stream.
640    */
641   @Override
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)642   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
643       throws Exception {
644     if (msg instanceof SendGrpcFrameCommand) {
645       sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
646     } else if (msg instanceof SendResponseHeadersCommand) {
647       sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
648     } else if (msg instanceof CancelServerStreamCommand) {
649       cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
650     } else if (msg instanceof GracefulServerCloseCommand) {
651       gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise);
652     } else if (msg instanceof ForcefulCloseCommand) {
653       forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
654     } else {
655       AssertionError e =
656           new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
657       ReferenceCountUtil.release(msg);
658       promise.setFailure(e);
659       throw e;
660     }
661   }
662 
663   @Override
close(ChannelHandlerContext ctx, ChannelPromise promise)664   public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
665     gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise);
666     ctx.flush();
667   }
668 
669   /**
670    * Returns the given processed bytes back to inbound flow control.
671    */
returnProcessedBytes(Http2Stream http2Stream, int bytes)672   void returnProcessedBytes(Http2Stream http2Stream, int bytes) {
673     try {
674       decoder().flowController().consumeBytes(http2Stream, bytes);
675     } catch (Http2Exception e) {
676       throw new RuntimeException(e);
677     }
678   }
679 
closeStreamWhenDone(ChannelPromise promise, int streamId)680   private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception {
681     final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId));
682     promise.addListener(new ChannelFutureListener() {
683       @Override
684       public void operationComplete(ChannelFuture future) {
685         stream.complete();
686       }
687     });
688   }
689 
690   /**
691    * Sends the given gRPC frame to the client.
692    */
sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)693   private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
694       ChannelPromise promise) throws Http2Exception {
695     try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) {
696       PerfMark.attachTag(cmd.stream().tag());
697       PerfMark.linkIn(cmd.getLink());
698       if (cmd.endStream()) {
699         closeStreamWhenDone(promise, cmd.stream().id());
700       }
701       // Call the base class to write the HTTP/2 DATA frame.
702       encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
703     }
704   }
705 
706   /**
707    * Sends the response headers to the client.
708    */
sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise)709   private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd,
710       ChannelPromise promise) throws Http2Exception {
711     try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) {
712       PerfMark.attachTag(cmd.stream().tag());
713       PerfMark.linkIn(cmd.getLink());
714       // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296
715       // is fixed.
716       int streamId = cmd.stream().id();
717       Http2Stream stream = connection().stream(streamId);
718       if (stream == null) {
719         resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise);
720         return;
721       }
722       if (cmd.endOfStream()) {
723         closeStreamWhenDone(promise, streamId);
724       }
725       encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise);
726     }
727   }
728 
cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise)729   private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
730       ChannelPromise promise) {
731     try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) {
732       PerfMark.attachTag(cmd.stream().tag());
733       PerfMark.linkIn(cmd.getLink());
734       // Notify the listener if we haven't already.
735       cmd.stream().transportReportStatus(cmd.reason());
736       // Terminate the stream.
737       encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
738     }
739   }
740 
gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg, ChannelPromise promise)741   private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg,
742       ChannelPromise promise) throws Exception {
743     // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is
744     // requested here. But that's an edge case and seems bug-prone.
745     if (gracefulShutdown == null) {
746       Long graceTimeInNanos = null;
747       if (msg.getGraceTimeUnit() != null) {
748         graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime());
749       }
750       gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos);
751       gracefulShutdown.start(ctx);
752     }
753     promise.setSuccess();
754   }
755 
forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)756   private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
757       ChannelPromise promise) throws Exception {
758     super.close(ctx, promise);
759     connection().forEachActiveStream(new Http2StreamVisitor() {
760       @Override
761       public boolean visit(Http2Stream stream) throws Http2Exception {
762         NettyServerStream.TransportState serverStream = serverStream(stream);
763         if (serverStream != null) {
764           try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) {
765             PerfMark.attachTag(serverStream.tag());
766             PerfMark.linkIn(msg.getLink());
767             serverStream.transportReportStatus(msg.getStatus());
768             resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
769           }
770         }
771         stream.close();
772         return true;
773       }
774     });
775   }
776 
respondWithHttpError( ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg)777   private void respondWithHttpError(
778       ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) {
779     Metadata metadata = new Metadata();
780     metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
781     metadata.put(InternalStatus.MESSAGE_KEY, msg);
782     byte[][] serialized = InternalMetadata.serialize(metadata);
783 
784     Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2)
785         .status("" + code)
786         .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8");
787     for (int i = 0; i < serialized.length; i += 2) {
788       headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false));
789     }
790     encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise());
791     ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg);
792     encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise());
793   }
794 
requireHttp2Stream(int streamId)795   private Http2Stream requireHttp2Stream(int streamId) {
796     Http2Stream stream = connection().stream(streamId);
797     if (stream == null) {
798       // This should never happen.
799       throw new AssertionError("Stream does not exist: " + streamId);
800     }
801     return stream;
802   }
803 
804   /**
805    * Returns the server stream associated to the given HTTP/2 stream object.
806    */
serverStream(Http2Stream stream)807   private NettyServerStream.TransportState serverStream(Http2Stream stream) {
808     return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey);
809   }
810 
newStreamException(int streamId, Throwable cause)811   private Http2Exception newStreamException(int streamId, Throwable cause) {
812     return Http2Exception.streamError(
813         streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage()));
814   }
815 
816   private class FrameListener extends Http2FrameAdapter {
817     private boolean firstSettings = true;
818 
819     @Override
onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)820     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
821       if (firstSettings) {
822         firstSettings = false;
823         // Delay transportReady until we see the client's HTTP handshake, for coverage with
824         // handshakeTimeout
825         attributes = transportListener.transportReady(negotiationAttributes);
826       }
827     }
828 
829     @Override
onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)830     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
831         boolean endOfStream) throws Http2Exception {
832       if (keepAliveManager != null) {
833         keepAliveManager.onDataReceived();
834       }
835       NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream);
836       return padding;
837     }
838 
839     @Override
onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)840     public void onHeadersRead(ChannelHandlerContext ctx,
841         int streamId,
842         Http2Headers headers,
843         int streamDependency,
844         short weight,
845         boolean exclusive,
846         int padding,
847         boolean endStream) throws Http2Exception {
848       if (keepAliveManager != null) {
849         keepAliveManager.onDataReceived();
850       }
851       NettyServerHandler.this.onHeadersRead(ctx, streamId, headers);
852       if (endStream) {
853         NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream);
854       }
855     }
856 
857     @Override
onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)858     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
859         throws Http2Exception {
860       if (keepAliveManager != null) {
861         keepAliveManager.onDataReceived();
862       }
863       NettyServerHandler.this.onRstStreamRead(streamId, errorCode);
864     }
865 
866     @Override
onPingRead(ChannelHandlerContext ctx, long data)867     public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
868       if (keepAliveManager != null) {
869         keepAliveManager.onDataReceived();
870       }
871       if (!keepAliveEnforcer.pingAcceptable()) {
872         ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings");
873         goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(),
874             debugData, ctx.newPromise());
875         Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client");
876         try {
877           forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise());
878         } catch (Exception ex) {
879           onError(ctx, /* outbound= */ true, ex);
880         }
881       }
882     }
883 
884     @Override
onPingAckRead(ChannelHandlerContext ctx, long data)885     public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
886       if (keepAliveManager != null) {
887         keepAliveManager.onDataReceived();
888       }
889       if (data == flowControlPing().payload()) {
890         flowControlPing().updateWindow();
891         logger.log(Level.FINE, "Window: {0}",
892             decoder().flowController().initialWindowSize(connection().connectionStream()));
893       } else if (data == GRACEFUL_SHUTDOWN_PING) {
894         if (gracefulShutdown == null) {
895           // this should never happen
896           logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
897         } else {
898           gracefulShutdown.secondGoAwayAndClose(ctx);
899         }
900       } else if (data != KEEPALIVE_PING) {
901         logger.warning("Received unexpected ping ack. No ping outstanding");
902       }
903     }
904   }
905 
906   private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
907     final ChannelHandlerContext ctx;
908 
KeepAlivePinger(ChannelHandlerContext ctx)909     KeepAlivePinger(ChannelHandlerContext ctx) {
910       this.ctx = ctx;
911     }
912 
913     @Override
ping()914     public void ping() {
915       ChannelFuture pingFuture = encoder().writePing(
916           ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
917       ctx.flush();
918       pingFuture.addListener(new ChannelFutureListener() {
919         @Override
920         public void operationComplete(ChannelFuture future) throws Exception {
921           if (future.isSuccess()) {
922             transportTracer.reportKeepAliveSent();
923           }
924         }
925       });
926     }
927 
928     @Override
onPingTimeout()929     public void onPingTimeout() {
930       try {
931         forcefulClose(
932             ctx,
933             new ForcefulCloseCommand(Status.UNAVAILABLE
934                 .withDescription("Keepalive failed. The connection is likely gone")),
935             ctx.newPromise());
936       } catch (Exception ex) {
937         try {
938           exceptionCaught(ctx, ex);
939         } catch (Exception ex2) {
940           logger.log(Level.WARNING, "Exception while propagating exception", ex2);
941           logger.log(Level.WARNING, "Original failure", ex);
942         }
943       }
944     }
945   }
946 
947   private final class GracefulShutdown {
948     String goAwayMessage;
949 
950     /**
951      * The grace time between starting graceful shutdown and closing the netty channel,
952      * {@code null} is unspecified.
953      */
954     @CheckForNull
955     Long graceTimeInNanos;
956 
957     /**
958      * True if ping is Acked or ping is timeout.
959      */
960     boolean pingAckedOrTimeout;
961 
962     Future<?> pingFuture;
963 
GracefulShutdown(String goAwayMessage, @Nullable Long graceTimeInNanos)964     GracefulShutdown(String goAwayMessage,
965         @Nullable Long graceTimeInNanos) {
966       this.goAwayMessage = goAwayMessage;
967       this.graceTimeInNanos = graceTimeInNanos;
968     }
969 
970     /**
971      * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
972      */
start(final ChannelHandlerContext ctx)973     void start(final ChannelHandlerContext ctx) {
974       goAway(
975           ctx,
976           Integer.MAX_VALUE,
977           Http2Error.NO_ERROR.code(),
978           ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
979           ctx.newPromise());
980 
981       pingFuture = ctx.executor().schedule(
982           new Runnable() {
983             @Override
984             public void run() {
985               secondGoAwayAndClose(ctx);
986             }
987           },
988           GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
989           TimeUnit.NANOSECONDS);
990 
991       encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
992     }
993 
secondGoAwayAndClose(ChannelHandlerContext ctx)994     void secondGoAwayAndClose(ChannelHandlerContext ctx) {
995       if (pingAckedOrTimeout) {
996         return;
997       }
998       pingAckedOrTimeout = true;
999 
1000       checkNotNull(pingFuture, "pingFuture");
1001       pingFuture.cancel(false);
1002 
1003       // send the second GOAWAY with last stream id
1004       goAway(
1005           ctx,
1006           connection().remote().lastStreamCreated(),
1007           Http2Error.NO_ERROR.code(),
1008           ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
1009           ctx.newPromise());
1010 
1011       // gracefully shutdown with specified grace time
1012       long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
1013       long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis);
1014       try {
1015         gracefulShutdownTimeoutMillis(overriddenGraceTime);
1016         NettyServerHandler.super.close(ctx, ctx.newPromise());
1017       } catch (Exception e) {
1018         onError(ctx, /* outbound= */ true, e);
1019       } finally {
1020         gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
1021       }
1022     }
1023 
graceTimeOverrideMillis(long originalMillis)1024     private long graceTimeOverrideMillis(long originalMillis) {
1025       if (graceTimeInNanos == null) {
1026         return originalMillis;
1027       }
1028       if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) {
1029         // netty treats -1 as "no timeout"
1030         return -1L;
1031       }
1032       return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
1033     }
1034   }
1035 
1036   // Use a frame writer so that we know when frames are through flow control and actually being
1037   // written.
1038   private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
1039     private final KeepAliveEnforcer keepAliveEnforcer;
1040 
WriteMonitoringFrameWriter(Http2FrameWriter delegate, KeepAliveEnforcer keepAliveEnforcer)1041     public WriteMonitoringFrameWriter(Http2FrameWriter delegate,
1042         KeepAliveEnforcer keepAliveEnforcer) {
1043       super(delegate);
1044       this.keepAliveEnforcer = keepAliveEnforcer;
1045     }
1046 
1047     @Override
writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)1048     public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
1049         int padding, boolean endStream, ChannelPromise promise) {
1050       keepAliveEnforcer.resetCounters();
1051       return super.writeData(ctx, streamId, data, padding, endStream, promise);
1052     }
1053 
1054     @Override
writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)1055     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1056         int padding, boolean endStream, ChannelPromise promise) {
1057       keepAliveEnforcer.resetCounters();
1058       return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
1059     }
1060 
1061     @Override
writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)1062     public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1063         int streamDependency, short weight, boolean exclusive, int padding, boolean endStream,
1064         ChannelPromise promise) {
1065       keepAliveEnforcer.resetCounters();
1066       return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1067           padding, endStream, promise);
1068     }
1069   }
1070 
1071   private static class ServerChannelLogger extends ChannelLogger {
1072     private static final Logger log = Logger.getLogger(ChannelLogger.class.getName());
1073 
1074     @Override
log(ChannelLogLevel level, String message)1075     public void log(ChannelLogLevel level, String message) {
1076       log.log(toJavaLogLevel(level), message);
1077     }
1078 
1079     @Override
log(ChannelLogLevel level, String messageFormat, Object... args)1080     public void log(ChannelLogLevel level, String messageFormat, Object... args) {
1081       log(level, MessageFormat.format(messageFormat, args));
1082     }
1083   }
1084 
toJavaLogLevel(ChannelLogLevel level)1085   private static Level toJavaLogLevel(ChannelLogLevel level) {
1086     switch (level) {
1087       case ERROR:
1088         return Level.FINE;
1089       case WARNING:
1090         return Level.FINER;
1091       default:
1092         return Level.FINEST;
1093     }
1094   }
1095 }
1096