1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; 22 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; 23 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; 24 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; 25 import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; 26 import static io.grpc.netty.Utils.HTTP_METHOD; 27 import static io.grpc.netty.Utils.TE_HEADER; 28 import static io.grpc.netty.Utils.TE_TRAILERS; 29 import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; 30 import static io.netty.handler.codec.http.HttpHeaderNames.HOST; 31 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO; 32 import static io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName.AUTHORITY; 33 34 import com.google.common.annotations.VisibleForTesting; 35 import com.google.common.base.Preconditions; 36 import com.google.common.base.Strings; 37 import com.google.common.base.Ticker; 38 import io.grpc.Attributes; 39 import io.grpc.ChannelLogger; 40 import io.grpc.ChannelLogger.ChannelLogLevel; 41 import io.grpc.InternalChannelz; 42 import io.grpc.InternalMetadata; 43 import io.grpc.InternalStatus; 44 import io.grpc.Metadata; 45 import io.grpc.ServerStreamTracer; 46 import io.grpc.Status; 47 import io.grpc.internal.GrpcUtil; 48 import io.grpc.internal.KeepAliveEnforcer; 49 import io.grpc.internal.KeepAliveManager; 50 import io.grpc.internal.LogExceptionRunnable; 51 import io.grpc.internal.MaxConnectionIdleManager; 52 import io.grpc.internal.ServerTransportListener; 53 import io.grpc.internal.StatsTraceContext; 54 import io.grpc.internal.TransportTracer; 55 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; 56 import io.netty.buffer.ByteBuf; 57 import io.netty.buffer.ByteBufUtil; 58 import io.netty.buffer.Unpooled; 59 import io.netty.channel.ChannelFuture; 60 import io.netty.channel.ChannelFutureListener; 61 import io.netty.channel.ChannelHandlerContext; 62 import io.netty.channel.ChannelPromise; 63 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; 64 import io.netty.handler.codec.http2.DefaultHttp2Connection; 65 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; 66 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; 67 import io.netty.handler.codec.http2.DefaultHttp2FrameReader; 68 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; 69 import io.netty.handler.codec.http2.DefaultHttp2Headers; 70 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; 71 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController; 72 import io.netty.handler.codec.http2.Http2Connection; 73 import io.netty.handler.codec.http2.Http2ConnectionAdapter; 74 import io.netty.handler.codec.http2.Http2ConnectionDecoder; 75 import io.netty.handler.codec.http2.Http2ConnectionEncoder; 76 import io.netty.handler.codec.http2.Http2Error; 77 import io.netty.handler.codec.http2.Http2Exception; 78 import io.netty.handler.codec.http2.Http2Exception.StreamException; 79 import io.netty.handler.codec.http2.Http2FrameAdapter; 80 import io.netty.handler.codec.http2.Http2FrameLogger; 81 import io.netty.handler.codec.http2.Http2FrameReader; 82 import io.netty.handler.codec.http2.Http2FrameWriter; 83 import io.netty.handler.codec.http2.Http2Headers; 84 import io.netty.handler.codec.http2.Http2HeadersDecoder; 85 import io.netty.handler.codec.http2.Http2InboundFrameLogger; 86 import io.netty.handler.codec.http2.Http2OutboundFrameLogger; 87 import io.netty.handler.codec.http2.Http2Settings; 88 import io.netty.handler.codec.http2.Http2Stream; 89 import io.netty.handler.codec.http2.Http2StreamVisitor; 90 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; 91 import io.netty.handler.logging.LogLevel; 92 import io.netty.util.AsciiString; 93 import io.netty.util.ReferenceCountUtil; 94 import io.perfmark.PerfMark; 95 import io.perfmark.Tag; 96 import io.perfmark.TaskCloseable; 97 import java.text.MessageFormat; 98 import java.util.List; 99 import java.util.concurrent.Future; 100 import java.util.concurrent.ScheduledFuture; 101 import java.util.concurrent.TimeUnit; 102 import java.util.logging.Level; 103 import java.util.logging.Logger; 104 import javax.annotation.CheckForNull; 105 import javax.annotation.Nullable; 106 107 /** 108 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within 109 * the context of the Netty Channel thread. 110 */ 111 class NettyServerHandler extends AbstractNettyHandler { 112 private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); 113 private static final long KEEPALIVE_PING = 0xDEADL; 114 @VisibleForTesting 115 static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; 116 private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); 117 /** Temporary workaround for #8674. Fine to delete after v1.45 release, and maybe earlier. */ 118 private static final boolean DISABLE_CONNECTION_HEADER_CHECK = Boolean.parseBoolean( 119 System.getProperty("io.grpc.netty.disableConnectionHeaderCheck", "false")); 120 121 private final Http2Connection.PropertyKey streamKey; 122 private final ServerTransportListener transportListener; 123 private final int maxMessageSize; 124 private final long keepAliveTimeInNanos; 125 private final long keepAliveTimeoutInNanos; 126 private final long maxConnectionAgeInNanos; 127 private final long maxConnectionAgeGraceInNanos; 128 private final List<? extends ServerStreamTracer.Factory> streamTracerFactories; 129 private final TransportTracer transportTracer; 130 private final KeepAliveEnforcer keepAliveEnforcer; 131 private final Attributes eagAttributes; 132 /** Incomplete attributes produced by negotiator. */ 133 private Attributes negotiationAttributes; 134 private InternalChannelz.Security securityInfo; 135 /** Completed attributes produced by transportReady. */ 136 private Attributes attributes; 137 private Throwable connectionError; 138 private boolean teWarningLogged; 139 private WriteQueue serverWriteQueue; 140 private AsciiString lastKnownAuthority; 141 @CheckForNull 142 private KeepAliveManager keepAliveManager; 143 @CheckForNull 144 private MaxConnectionIdleManager maxConnectionIdleManager; 145 @CheckForNull 146 private ScheduledFuture<?> maxConnectionAgeMonitor; 147 @CheckForNull 148 private GracefulShutdown gracefulShutdown; 149 newHandler( ServerTransportListener transportListener, ChannelPromise channelUnused, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes)150 static NettyServerHandler newHandler( 151 ServerTransportListener transportListener, 152 ChannelPromise channelUnused, 153 List<? extends ServerStreamTracer.Factory> streamTracerFactories, 154 TransportTracer transportTracer, 155 int maxStreams, 156 boolean autoFlowControl, 157 int flowControlWindow, 158 int maxHeaderListSize, 159 int maxMessageSize, 160 long keepAliveTimeInNanos, 161 long keepAliveTimeoutInNanos, 162 long maxConnectionIdleInNanos, 163 long maxConnectionAgeInNanos, 164 long maxConnectionAgeGraceInNanos, 165 boolean permitKeepAliveWithoutCalls, 166 long permitKeepAliveTimeInNanos, 167 Attributes eagAttributes) { 168 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", 169 maxHeaderListSize); 170 Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); 171 Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize); 172 Http2FrameReader frameReader = new Http2InboundFrameLogger( 173 new DefaultHttp2FrameReader(headersDecoder), frameLogger); 174 Http2FrameWriter frameWriter = 175 new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); 176 return newHandler( 177 channelUnused, 178 frameReader, 179 frameWriter, 180 transportListener, 181 streamTracerFactories, 182 transportTracer, 183 maxStreams, 184 autoFlowControl, 185 flowControlWindow, 186 maxHeaderListSize, 187 maxMessageSize, 188 keepAliveTimeInNanos, 189 keepAliveTimeoutInNanos, 190 maxConnectionIdleInNanos, 191 maxConnectionAgeInNanos, 192 maxConnectionAgeGraceInNanos, 193 permitKeepAliveWithoutCalls, 194 permitKeepAliveTimeInNanos, 195 eagAttributes, 196 Ticker.systemTicker()); 197 } 198 newHandler( ChannelPromise channelUnused, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, Attributes eagAttributes, Ticker ticker)199 static NettyServerHandler newHandler( 200 ChannelPromise channelUnused, 201 Http2FrameReader frameReader, 202 Http2FrameWriter frameWriter, 203 ServerTransportListener transportListener, 204 List<? extends ServerStreamTracer.Factory> streamTracerFactories, 205 TransportTracer transportTracer, 206 int maxStreams, 207 boolean autoFlowControl, 208 int flowControlWindow, 209 int maxHeaderListSize, 210 int maxMessageSize, 211 long keepAliveTimeInNanos, 212 long keepAliveTimeoutInNanos, 213 long maxConnectionIdleInNanos, 214 long maxConnectionAgeInNanos, 215 long maxConnectionAgeGraceInNanos, 216 boolean permitKeepAliveWithoutCalls, 217 long permitKeepAliveTimeInNanos, 218 Attributes eagAttributes, 219 Ticker ticker) { 220 Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive: %s", maxStreams); 221 Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive: %s", 222 flowControlWindow); 223 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive: %s", 224 maxHeaderListSize); 225 Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive: %s", 226 maxMessageSize); 227 228 final Http2Connection connection = new DefaultHttp2Connection(true); 229 WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); 230 dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. 231 DefaultHttp2RemoteFlowController controller = 232 new DefaultHttp2RemoteFlowController(connection, dist); 233 connection.remote().flowController(controller); 234 final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer( 235 permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS); 236 237 // Create the local flow controller configured to auto-refill the connection window. 238 connection.local().flowController( 239 new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); 240 frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); 241 Http2ConnectionEncoder encoder = 242 new DefaultHttp2ConnectionEncoder(connection, frameWriter); 243 encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); 244 Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, 245 frameReader); 246 247 Http2Settings settings = new Http2Settings(); 248 settings.initialWindowSize(flowControlWindow); 249 settings.maxConcurrentStreams(maxStreams); 250 settings.maxHeaderListSize(maxHeaderListSize); 251 252 if (ticker == null) { 253 ticker = Ticker.systemTicker(); 254 } 255 256 return new NettyServerHandler( 257 channelUnused, 258 connection, 259 transportListener, 260 streamTracerFactories, 261 transportTracer, 262 decoder, encoder, settings, 263 maxMessageSize, 264 keepAliveTimeInNanos, keepAliveTimeoutInNanos, 265 maxConnectionIdleInNanos, 266 maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, 267 keepAliveEnforcer, 268 autoFlowControl, 269 eagAttributes, ticker); 270 } 271 NettyServerHandler( ChannelPromise channelUnused, final Http2Connection connection, ServerTransportListener transportListener, List<? extends ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer, boolean autoFlowControl, Attributes eagAttributes, Ticker ticker)272 private NettyServerHandler( 273 ChannelPromise channelUnused, 274 final Http2Connection connection, 275 ServerTransportListener transportListener, 276 List<? extends ServerStreamTracer.Factory> streamTracerFactories, 277 TransportTracer transportTracer, 278 Http2ConnectionDecoder decoder, 279 Http2ConnectionEncoder encoder, 280 Http2Settings settings, 281 int maxMessageSize, 282 long keepAliveTimeInNanos, 283 long keepAliveTimeoutInNanos, 284 long maxConnectionIdleInNanos, 285 long maxConnectionAgeInNanos, 286 long maxConnectionAgeGraceInNanos, 287 final KeepAliveEnforcer keepAliveEnforcer, 288 boolean autoFlowControl, 289 Attributes eagAttributes, 290 Ticker ticker) { 291 super(channelUnused, decoder, encoder, settings, new ServerChannelLogger(), 292 autoFlowControl, null, ticker); 293 294 final MaxConnectionIdleManager maxConnectionIdleManager; 295 if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { 296 maxConnectionIdleManager = null; 297 } else { 298 maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos); 299 } 300 301 connection.addListener(new Http2ConnectionAdapter() { 302 @Override 303 public void onStreamActive(Http2Stream stream) { 304 if (connection.numActiveStreams() == 1) { 305 keepAliveEnforcer.onTransportActive(); 306 if (maxConnectionIdleManager != null) { 307 maxConnectionIdleManager.onTransportActive(); 308 } 309 } 310 } 311 312 @Override 313 public void onStreamClosed(Http2Stream stream) { 314 if (connection.numActiveStreams() == 0) { 315 keepAliveEnforcer.onTransportIdle(); 316 if (maxConnectionIdleManager != null) { 317 maxConnectionIdleManager.onTransportIdle(); 318 } 319 } 320 } 321 }); 322 323 checkArgument(maxMessageSize >= 0, "maxMessageSize must be non-negative: %s", maxMessageSize); 324 this.maxMessageSize = maxMessageSize; 325 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 326 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 327 this.maxConnectionIdleManager = maxConnectionIdleManager; 328 this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; 329 this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; 330 this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer"); 331 this.eagAttributes = checkNotNull(eagAttributes, "eagAttributes"); 332 333 streamKey = encoder.connection().newKey(); 334 this.transportListener = checkNotNull(transportListener, "transportListener"); 335 this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); 336 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 337 // Set the frame listener on the decoder. 338 decoder().frameListener(new FrameListener()); 339 } 340 341 @Nullable connectionError()342 Throwable connectionError() { 343 return connectionError; 344 } 345 346 @Override handlerAdded(final ChannelHandlerContext ctx)347 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { 348 serverWriteQueue = new WriteQueue(ctx.channel()); 349 350 // init max connection age monitor 351 if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { 352 maxConnectionAgeMonitor = ctx.executor().schedule( 353 new LogExceptionRunnable(new Runnable() { 354 @Override 355 public void run() { 356 if (gracefulShutdown == null) { 357 gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos); 358 gracefulShutdown.start(ctx); 359 ctx.flush(); 360 } 361 } 362 }), 363 maxConnectionAgeInNanos, 364 TimeUnit.NANOSECONDS); 365 } 366 367 if (maxConnectionIdleManager != null) { 368 maxConnectionIdleManager.start(new Runnable() { 369 @Override 370 public void run() { 371 if (gracefulShutdown == null) { 372 gracefulShutdown = new GracefulShutdown("max_idle", null); 373 gracefulShutdown.start(ctx); 374 ctx.flush(); 375 } 376 } 377 }, ctx.executor()); 378 } 379 380 if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) { 381 keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(), 382 keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */); 383 keepAliveManager.onTransportStarted(); 384 } 385 386 assert encoder().connection().equals(decoder().connection()); 387 transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(encoder().connection())); 388 389 super.handlerAdded(ctx); 390 } 391 onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)392 private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) 393 throws Http2Exception { 394 try { 395 // Connection-specific header fields makes a request malformed. Ideally this would be handled 396 // by Netty. RFC 7540 section 8.1.2.2 397 if (!DISABLE_CONNECTION_HEADER_CHECK && headers.contains(CONNECTION)) { 398 resetStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise()); 399 return; 400 } 401 402 if (headers.authority() == null) { 403 List<CharSequence> hosts = headers.getAll(HOST); 404 if (hosts.size() > 1) { 405 // RFC 7230 section 5.4 406 respondWithHttpError(ctx, streamId, 400, Status.Code.INTERNAL, 407 "Multiple host headers"); 408 return; 409 } 410 if (!hosts.isEmpty()) { 411 headers.add(AUTHORITY.value(), hosts.get(0)); 412 } 413 } 414 headers.remove(HOST); 415 416 // Remove the leading slash of the path and get the fully qualified method name 417 CharSequence path = headers.path(); 418 419 if (path == null) { 420 respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, 421 "Expected path but is missing"); 422 return; 423 } 424 425 if (path.charAt(0) != '/') { 426 respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, 427 String.format("Expected path to start with /: %s", path)); 428 return; 429 } 430 431 String method = path.subSequence(1, path.length()).toString(); 432 433 // Verify that the Content-Type is correct in the request. 434 CharSequence contentType = headers.get(CONTENT_TYPE_HEADER); 435 if (contentType == null) { 436 respondWithHttpError( 437 ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request"); 438 return; 439 } 440 String contentTypeString = contentType.toString(); 441 if (!GrpcUtil.isGrpcContentType(contentTypeString)) { 442 respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL, 443 String.format("Content-Type '%s' is not supported", contentTypeString)); 444 return; 445 } 446 447 if (!HTTP_METHOD.contentEquals(headers.method())) { 448 respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL, 449 String.format("Method '%s' is not supported", headers.method())); 450 return; 451 } 452 453 if (!teWarningLogged && !TE_TRAILERS.contentEquals(headers.get(TE_HEADER))) { 454 logger.warning(String.format("Expected header TE: %s, but %s is received. This means " 455 + "some intermediate proxy may not support trailers", 456 TE_TRAILERS, headers.get(TE_HEADER))); 457 teWarningLogged = true; 458 } 459 460 // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this 461 // method. 462 Http2Stream http2Stream = requireHttp2Stream(streamId); 463 464 Metadata metadata = Utils.convertHeaders(headers); 465 StatsTraceContext statsTraceCtx = 466 StatsTraceContext.newServerContext(streamTracerFactories, method, metadata); 467 468 NettyServerStream.TransportState state = new NettyServerStream.TransportState( 469 this, 470 ctx.channel().eventLoop(), 471 http2Stream, 472 maxMessageSize, 473 statsTraceCtx, 474 transportTracer, 475 method); 476 477 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onHeadersRead")) { 478 PerfMark.attachTag(state.tag()); 479 String authority = getOrUpdateAuthority((AsciiString) headers.authority()); 480 NettyServerStream stream = new NettyServerStream( 481 ctx.channel(), 482 state, 483 attributes, 484 authority, 485 statsTraceCtx, 486 transportTracer); 487 transportListener.streamCreated(stream, method, metadata); 488 state.onStreamAllocated(); 489 http2Stream.setProperty(streamKey, state); 490 } 491 } catch (Exception e) { 492 logger.log(Level.WARNING, "Exception in onHeadersRead()", e); 493 // Throw an exception that will get handled by onStreamError. 494 throw newStreamException(streamId, e); 495 } 496 } 497 getOrUpdateAuthority(AsciiString authority)498 private String getOrUpdateAuthority(AsciiString authority) { 499 if (authority == null) { 500 return null; 501 } else if (!authority.equals(lastKnownAuthority)) { 502 lastKnownAuthority = authority; 503 } 504 505 // AsciiString.toString() is internally cached, so subsequent calls will not 506 // result in recomputing the String representation of lastKnownAuthority. 507 return lastKnownAuthority.toString(); 508 } 509 onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)510 private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) 511 throws Http2Exception { 512 flowControlPing().onDataRead(data.readableBytes(), padding); 513 try { 514 NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); 515 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onDataRead")) { 516 PerfMark.attachTag(stream.tag()); 517 stream.inboundDataReceived(data, endOfStream); 518 } 519 } catch (Throwable e) { 520 logger.log(Level.WARNING, "Exception in onDataRead()", e); 521 // Throw an exception that will get handled by onStreamError. 522 throw newStreamException(streamId, e); 523 } 524 } 525 onRstStreamRead(int streamId, long errorCode)526 private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception { 527 try { 528 NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); 529 if (stream != null) { 530 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onRstStreamRead")) { 531 PerfMark.attachTag(stream.tag()); 532 stream.transportReportStatus( 533 Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); 534 } 535 } 536 } catch (Throwable e) { 537 logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); 538 // Throw an exception that will get handled by onStreamError. 539 throw newStreamException(streamId, e); 540 } 541 } 542 543 @Override onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)544 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 545 Http2Exception http2Ex) { 546 logger.log(Level.FINE, "Connection Error", cause); 547 connectionError = cause; 548 super.onConnectionError(ctx, outbound, cause, http2Ex); 549 } 550 551 @Override onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, StreamException http2Ex)552 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 553 StreamException http2Ex) { 554 NettyServerStream.TransportState serverStream = serverStream( 555 connection().stream(Http2Exception.streamId(http2Ex))); 556 Level level = Level.WARNING; 557 if (serverStream == null && http2Ex.error() == Http2Error.STREAM_CLOSED) { 558 level = Level.FINE; 559 } 560 logger.log(level, "Stream Error", cause); 561 Tag tag = serverStream != null ? serverStream.tag() : PerfMark.createTag(); 562 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.onStreamError")) { 563 PerfMark.attachTag(tag); 564 if (serverStream != null) { 565 serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); 566 } 567 // TODO(ejona): Abort the stream by sending headers to help the client with debugging. 568 // Delegate to the base class to send a RST_STREAM. 569 super.onStreamError(ctx, outbound, cause, http2Ex); 570 } 571 } 572 573 @Override handleProtocolNegotiationCompleted( Attributes attrs, InternalChannelz.Security securityInfo)574 public void handleProtocolNegotiationCompleted( 575 Attributes attrs, InternalChannelz.Security securityInfo) { 576 negotiationAttributes = attrs; 577 this.securityInfo = securityInfo; 578 super.handleProtocolNegotiationCompleted(attrs, securityInfo); 579 NettyClientHandler.writeBufferingAndRemove(ctx().channel()); 580 } 581 582 @Override getEagAttributes()583 public Attributes getEagAttributes() { 584 return eagAttributes; 585 } 586 getSecurityInfo()587 InternalChannelz.Security getSecurityInfo() { 588 return securityInfo; 589 } 590 591 @VisibleForTesting getKeepAliveManagerForTest()592 KeepAliveManager getKeepAliveManagerForTest() { 593 return keepAliveManager; 594 } 595 596 @VisibleForTesting setKeepAliveManagerForTest(KeepAliveManager keepAliveManager)597 void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { 598 this.keepAliveManager = keepAliveManager; 599 } 600 601 /** 602 * Handler for the Channel shutting down. 603 */ 604 @Override channelInactive(ChannelHandlerContext ctx)605 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 606 try { 607 if (keepAliveManager != null) { 608 keepAliveManager.onTransportTermination(); 609 } 610 if (maxConnectionIdleManager != null) { 611 maxConnectionIdleManager.onTransportTermination(); 612 } 613 if (maxConnectionAgeMonitor != null) { 614 maxConnectionAgeMonitor.cancel(false); 615 } 616 final Status status = 617 Status.UNAVAILABLE.withDescription("connection terminated for unknown reason"); 618 // Any streams that are still active must be closed 619 connection().forEachActiveStream(new Http2StreamVisitor() { 620 @Override 621 public boolean visit(Http2Stream stream) throws Http2Exception { 622 NettyServerStream.TransportState serverStream = serverStream(stream); 623 if (serverStream != null) { 624 serverStream.transportReportStatus(status); 625 } 626 return true; 627 } 628 }); 629 } finally { 630 super.channelInactive(ctx); 631 } 632 } 633 getWriteQueue()634 WriteQueue getWriteQueue() { 635 return serverWriteQueue; 636 } 637 638 /** 639 * Handler for commands sent from the stream. 640 */ 641 @Override write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)642 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 643 throws Exception { 644 if (msg instanceof SendGrpcFrameCommand) { 645 sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise); 646 } else if (msg instanceof SendResponseHeadersCommand) { 647 sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise); 648 } else if (msg instanceof CancelServerStreamCommand) { 649 cancelStream(ctx, (CancelServerStreamCommand) msg, promise); 650 } else if (msg instanceof GracefulServerCloseCommand) { 651 gracefulClose(ctx, (GracefulServerCloseCommand) msg, promise); 652 } else if (msg instanceof ForcefulCloseCommand) { 653 forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); 654 } else { 655 AssertionError e = 656 new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); 657 ReferenceCountUtil.release(msg); 658 promise.setFailure(e); 659 throw e; 660 } 661 } 662 663 @Override close(ChannelHandlerContext ctx, ChannelPromise promise)664 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 665 gracefulClose(ctx, new GracefulServerCloseCommand("app_requested"), promise); 666 ctx.flush(); 667 } 668 669 /** 670 * Returns the given processed bytes back to inbound flow control. 671 */ returnProcessedBytes(Http2Stream http2Stream, int bytes)672 void returnProcessedBytes(Http2Stream http2Stream, int bytes) { 673 try { 674 decoder().flowController().consumeBytes(http2Stream, bytes); 675 } catch (Http2Exception e) { 676 throw new RuntimeException(e); 677 } 678 } 679 closeStreamWhenDone(ChannelPromise promise, int streamId)680 private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception { 681 final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); 682 promise.addListener(new ChannelFutureListener() { 683 @Override 684 public void operationComplete(ChannelFuture future) { 685 stream.complete(); 686 } 687 }); 688 } 689 690 /** 691 * Sends the given gRPC frame to the client. 692 */ sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)693 private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, 694 ChannelPromise promise) throws Http2Exception { 695 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendGrpcFrame")) { 696 PerfMark.attachTag(cmd.stream().tag()); 697 PerfMark.linkIn(cmd.getLink()); 698 if (cmd.endStream()) { 699 closeStreamWhenDone(promise, cmd.stream().id()); 700 } 701 // Call the base class to write the HTTP/2 DATA frame. 702 encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); 703 } 704 } 705 706 /** 707 * Sends the response headers to the client. 708 */ sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise)709 private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, 710 ChannelPromise promise) throws Http2Exception { 711 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.sendResponseHeaders")) { 712 PerfMark.attachTag(cmd.stream().tag()); 713 PerfMark.linkIn(cmd.getLink()); 714 // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 715 // is fixed. 716 int streamId = cmd.stream().id(); 717 Http2Stream stream = connection().stream(streamId); 718 if (stream == null) { 719 resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); 720 return; 721 } 722 if (cmd.endOfStream()) { 723 closeStreamWhenDone(promise, streamId); 724 } 725 encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); 726 } 727 } 728 cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise)729 private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, 730 ChannelPromise promise) { 731 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.cancelStream")) { 732 PerfMark.attachTag(cmd.stream().tag()); 733 PerfMark.linkIn(cmd.getLink()); 734 // Notify the listener if we haven't already. 735 cmd.stream().transportReportStatus(cmd.reason()); 736 // Terminate the stream. 737 encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); 738 } 739 } 740 gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg, ChannelPromise promise)741 private void gracefulClose(final ChannelHandlerContext ctx, final GracefulServerCloseCommand msg, 742 ChannelPromise promise) throws Exception { 743 // Ideally we'd adjust a pre-existing graceful shutdown's grace period to at least what is 744 // requested here. But that's an edge case and seems bug-prone. 745 if (gracefulShutdown == null) { 746 Long graceTimeInNanos = null; 747 if (msg.getGraceTimeUnit() != null) { 748 graceTimeInNanos = msg.getGraceTimeUnit().toNanos(msg.getGraceTime()); 749 } 750 gracefulShutdown = new GracefulShutdown(msg.getGoAwayDebugString(), graceTimeInNanos); 751 gracefulShutdown.start(ctx); 752 } 753 promise.setSuccess(); 754 } 755 forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)756 private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, 757 ChannelPromise promise) throws Exception { 758 super.close(ctx, promise); 759 connection().forEachActiveStream(new Http2StreamVisitor() { 760 @Override 761 public boolean visit(Http2Stream stream) throws Http2Exception { 762 NettyServerStream.TransportState serverStream = serverStream(stream); 763 if (serverStream != null) { 764 try (TaskCloseable ignore = PerfMark.traceTask("NettyServerHandler.forcefulClose")) { 765 PerfMark.attachTag(serverStream.tag()); 766 PerfMark.linkIn(msg.getLink()); 767 serverStream.transportReportStatus(msg.getStatus()); 768 resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); 769 } 770 } 771 stream.close(); 772 return true; 773 } 774 }); 775 } 776 respondWithHttpError( ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg)777 private void respondWithHttpError( 778 ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) { 779 Metadata metadata = new Metadata(); 780 metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus()); 781 metadata.put(InternalStatus.MESSAGE_KEY, msg); 782 byte[][] serialized = InternalMetadata.serialize(metadata); 783 784 Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2) 785 .status("" + code) 786 .set(CONTENT_TYPE_HEADER, "text/plain; charset=utf-8"); 787 for (int i = 0; i < serialized.length; i += 2) { 788 headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false)); 789 } 790 encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); 791 ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg); 792 encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise()); 793 } 794 requireHttp2Stream(int streamId)795 private Http2Stream requireHttp2Stream(int streamId) { 796 Http2Stream stream = connection().stream(streamId); 797 if (stream == null) { 798 // This should never happen. 799 throw new AssertionError("Stream does not exist: " + streamId); 800 } 801 return stream; 802 } 803 804 /** 805 * Returns the server stream associated to the given HTTP/2 stream object. 806 */ serverStream(Http2Stream stream)807 private NettyServerStream.TransportState serverStream(Http2Stream stream) { 808 return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey); 809 } 810 newStreamException(int streamId, Throwable cause)811 private Http2Exception newStreamException(int streamId, Throwable cause) { 812 return Http2Exception.streamError( 813 streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage())); 814 } 815 816 private class FrameListener extends Http2FrameAdapter { 817 private boolean firstSettings = true; 818 819 @Override onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)820 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { 821 if (firstSettings) { 822 firstSettings = false; 823 // Delay transportReady until we see the client's HTTP handshake, for coverage with 824 // handshakeTimeout 825 attributes = transportListener.transportReady(negotiationAttributes); 826 } 827 } 828 829 @Override onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)830 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, 831 boolean endOfStream) throws Http2Exception { 832 if (keepAliveManager != null) { 833 keepAliveManager.onDataReceived(); 834 } 835 NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream); 836 return padding; 837 } 838 839 @Override onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)840 public void onHeadersRead(ChannelHandlerContext ctx, 841 int streamId, 842 Http2Headers headers, 843 int streamDependency, 844 short weight, 845 boolean exclusive, 846 int padding, 847 boolean endStream) throws Http2Exception { 848 if (keepAliveManager != null) { 849 keepAliveManager.onDataReceived(); 850 } 851 NettyServerHandler.this.onHeadersRead(ctx, streamId, headers); 852 if (endStream) { 853 NettyServerHandler.this.onDataRead(streamId, Unpooled.EMPTY_BUFFER, 0, endStream); 854 } 855 } 856 857 @Override onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)858 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) 859 throws Http2Exception { 860 if (keepAliveManager != null) { 861 keepAliveManager.onDataReceived(); 862 } 863 NettyServerHandler.this.onRstStreamRead(streamId, errorCode); 864 } 865 866 @Override onPingRead(ChannelHandlerContext ctx, long data)867 public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception { 868 if (keepAliveManager != null) { 869 keepAliveManager.onDataReceived(); 870 } 871 if (!keepAliveEnforcer.pingAcceptable()) { 872 ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings"); 873 goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(), 874 debugData, ctx.newPromise()); 875 Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"); 876 try { 877 forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise()); 878 } catch (Exception ex) { 879 onError(ctx, /* outbound= */ true, ex); 880 } 881 } 882 } 883 884 @Override onPingAckRead(ChannelHandlerContext ctx, long data)885 public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception { 886 if (keepAliveManager != null) { 887 keepAliveManager.onDataReceived(); 888 } 889 if (data == flowControlPing().payload()) { 890 flowControlPing().updateWindow(); 891 logger.log(Level.FINE, "Window: {0}", 892 decoder().flowController().initialWindowSize(connection().connectionStream())); 893 } else if (data == GRACEFUL_SHUTDOWN_PING) { 894 if (gracefulShutdown == null) { 895 // this should never happen 896 logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null"); 897 } else { 898 gracefulShutdown.secondGoAwayAndClose(ctx); 899 } 900 } else if (data != KEEPALIVE_PING) { 901 logger.warning("Received unexpected ping ack. No ping outstanding"); 902 } 903 } 904 } 905 906 private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger { 907 final ChannelHandlerContext ctx; 908 KeepAlivePinger(ChannelHandlerContext ctx)909 KeepAlivePinger(ChannelHandlerContext ctx) { 910 this.ctx = ctx; 911 } 912 913 @Override ping()914 public void ping() { 915 ChannelFuture pingFuture = encoder().writePing( 916 ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise()); 917 ctx.flush(); 918 pingFuture.addListener(new ChannelFutureListener() { 919 @Override 920 public void operationComplete(ChannelFuture future) throws Exception { 921 if (future.isSuccess()) { 922 transportTracer.reportKeepAliveSent(); 923 } 924 } 925 }); 926 } 927 928 @Override onPingTimeout()929 public void onPingTimeout() { 930 try { 931 forcefulClose( 932 ctx, 933 new ForcefulCloseCommand(Status.UNAVAILABLE 934 .withDescription("Keepalive failed. The connection is likely gone")), 935 ctx.newPromise()); 936 } catch (Exception ex) { 937 try { 938 exceptionCaught(ctx, ex); 939 } catch (Exception ex2) { 940 logger.log(Level.WARNING, "Exception while propagating exception", ex2); 941 logger.log(Level.WARNING, "Original failure", ex); 942 } 943 } 944 } 945 } 946 947 private final class GracefulShutdown { 948 String goAwayMessage; 949 950 /** 951 * The grace time between starting graceful shutdown and closing the netty channel, 952 * {@code null} is unspecified. 953 */ 954 @CheckForNull 955 Long graceTimeInNanos; 956 957 /** 958 * True if ping is Acked or ping is timeout. 959 */ 960 boolean pingAckedOrTimeout; 961 962 Future<?> pingFuture; 963 GracefulShutdown(String goAwayMessage, @Nullable Long graceTimeInNanos)964 GracefulShutdown(String goAwayMessage, 965 @Nullable Long graceTimeInNanos) { 966 this.goAwayMessage = goAwayMessage; 967 this.graceTimeInNanos = graceTimeInNanos; 968 } 969 970 /** 971 * Sends out first GOAWAY and ping, and schedules second GOAWAY and close. 972 */ start(final ChannelHandlerContext ctx)973 void start(final ChannelHandlerContext ctx) { 974 goAway( 975 ctx, 976 Integer.MAX_VALUE, 977 Http2Error.NO_ERROR.code(), 978 ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), 979 ctx.newPromise()); 980 981 pingFuture = ctx.executor().schedule( 982 new Runnable() { 983 @Override 984 public void run() { 985 secondGoAwayAndClose(ctx); 986 } 987 }, 988 GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, 989 TimeUnit.NANOSECONDS); 990 991 encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise()); 992 } 993 secondGoAwayAndClose(ChannelHandlerContext ctx)994 void secondGoAwayAndClose(ChannelHandlerContext ctx) { 995 if (pingAckedOrTimeout) { 996 return; 997 } 998 pingAckedOrTimeout = true; 999 1000 checkNotNull(pingFuture, "pingFuture"); 1001 pingFuture.cancel(false); 1002 1003 // send the second GOAWAY with last stream id 1004 goAway( 1005 ctx, 1006 connection().remote().lastStreamCreated(), 1007 Http2Error.NO_ERROR.code(), 1008 ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), 1009 ctx.newPromise()); 1010 1011 // gracefully shutdown with specified grace time 1012 long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); 1013 long overriddenGraceTime = graceTimeOverrideMillis(savedGracefulShutdownTimeMillis); 1014 try { 1015 gracefulShutdownTimeoutMillis(overriddenGraceTime); 1016 NettyServerHandler.super.close(ctx, ctx.newPromise()); 1017 } catch (Exception e) { 1018 onError(ctx, /* outbound= */ true, e); 1019 } finally { 1020 gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis); 1021 } 1022 } 1023 graceTimeOverrideMillis(long originalMillis)1024 private long graceTimeOverrideMillis(long originalMillis) { 1025 if (graceTimeInNanos == null) { 1026 return originalMillis; 1027 } 1028 if (graceTimeInNanos == MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE) { 1029 // netty treats -1 as "no timeout" 1030 return -1L; 1031 } 1032 return TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos); 1033 } 1034 } 1035 1036 // Use a frame writer so that we know when frames are through flow control and actually being 1037 // written. 1038 private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter { 1039 private final KeepAliveEnforcer keepAliveEnforcer; 1040 WriteMonitoringFrameWriter(Http2FrameWriter delegate, KeepAliveEnforcer keepAliveEnforcer)1041 public WriteMonitoringFrameWriter(Http2FrameWriter delegate, 1042 KeepAliveEnforcer keepAliveEnforcer) { 1043 super(delegate); 1044 this.keepAliveEnforcer = keepAliveEnforcer; 1045 } 1046 1047 @Override writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)1048 public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, 1049 int padding, boolean endStream, ChannelPromise promise) { 1050 keepAliveEnforcer.resetCounters(); 1051 return super.writeData(ctx, streamId, data, padding, endStream, promise); 1052 } 1053 1054 @Override writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)1055 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, 1056 int padding, boolean endStream, ChannelPromise promise) { 1057 keepAliveEnforcer.resetCounters(); 1058 return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); 1059 } 1060 1061 @Override writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)1062 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, 1063 int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, 1064 ChannelPromise promise) { 1065 keepAliveEnforcer.resetCounters(); 1066 return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, 1067 padding, endStream, promise); 1068 } 1069 } 1070 1071 private static class ServerChannelLogger extends ChannelLogger { 1072 private static final Logger log = Logger.getLogger(ChannelLogger.class.getName()); 1073 1074 @Override log(ChannelLogLevel level, String message)1075 public void log(ChannelLogLevel level, String message) { 1076 log.log(toJavaLogLevel(level), message); 1077 } 1078 1079 @Override log(ChannelLogLevel level, String messageFormat, Object... args)1080 public void log(ChannelLogLevel level, String messageFormat, Object... args) { 1081 log(level, MessageFormat.format(messageFormat, args)); 1082 } 1083 } 1084 toJavaLogLevel(ChannelLogLevel level)1085 private static Level toJavaLogLevel(ChannelLogLevel level) { 1086 switch (level) { 1087 case ERROR: 1088 return Level.FINE; 1089 case WARNING: 1090 return Level.FINER; 1091 default: 1092 return Level.FINEST; 1093 } 1094 } 1095 } 1096