1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED; 22 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; 23 import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; 24 import static io.grpc.netty.Utils.CONTENT_TYPE_HEADER; 25 import static io.grpc.netty.Utils.HTTP_METHOD; 26 import static io.grpc.netty.Utils.TE_HEADER; 27 import static io.grpc.netty.Utils.TE_TRAILERS; 28 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO; 29 30 import com.google.common.annotations.VisibleForTesting; 31 import com.google.common.base.Preconditions; 32 import com.google.common.base.Strings; 33 import io.grpc.Attributes; 34 import io.grpc.InternalChannelz; 35 import io.grpc.InternalMetadata; 36 import io.grpc.InternalStatus; 37 import io.grpc.Metadata; 38 import io.grpc.ServerStreamTracer; 39 import io.grpc.Status; 40 import io.grpc.internal.GrpcUtil; 41 import io.grpc.internal.KeepAliveManager; 42 import io.grpc.internal.LogExceptionRunnable; 43 import io.grpc.internal.ServerTransportListener; 44 import io.grpc.internal.StatsTraceContext; 45 import io.grpc.internal.TransportTracer; 46 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder; 47 import io.netty.buffer.ByteBuf; 48 import io.netty.buffer.ByteBufUtil; 49 import io.netty.channel.ChannelFuture; 50 import io.netty.channel.ChannelFutureListener; 51 import io.netty.channel.ChannelHandlerContext; 52 import io.netty.channel.ChannelPromise; 53 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; 54 import io.netty.handler.codec.http2.DefaultHttp2Connection; 55 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; 56 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; 57 import io.netty.handler.codec.http2.DefaultHttp2FrameReader; 58 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; 59 import io.netty.handler.codec.http2.DefaultHttp2Headers; 60 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; 61 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController; 62 import io.netty.handler.codec.http2.Http2Connection; 63 import io.netty.handler.codec.http2.Http2ConnectionAdapter; 64 import io.netty.handler.codec.http2.Http2ConnectionDecoder; 65 import io.netty.handler.codec.http2.Http2ConnectionEncoder; 66 import io.netty.handler.codec.http2.Http2Error; 67 import io.netty.handler.codec.http2.Http2Exception; 68 import io.netty.handler.codec.http2.Http2Exception.StreamException; 69 import io.netty.handler.codec.http2.Http2FlowController; 70 import io.netty.handler.codec.http2.Http2FrameAdapter; 71 import io.netty.handler.codec.http2.Http2FrameLogger; 72 import io.netty.handler.codec.http2.Http2FrameReader; 73 import io.netty.handler.codec.http2.Http2FrameWriter; 74 import io.netty.handler.codec.http2.Http2Headers; 75 import io.netty.handler.codec.http2.Http2HeadersDecoder; 76 import io.netty.handler.codec.http2.Http2InboundFrameLogger; 77 import io.netty.handler.codec.http2.Http2OutboundFrameLogger; 78 import io.netty.handler.codec.http2.Http2Settings; 79 import io.netty.handler.codec.http2.Http2Stream; 80 import io.netty.handler.codec.http2.Http2StreamVisitor; 81 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; 82 import io.netty.handler.logging.LogLevel; 83 import io.netty.util.AsciiString; 84 import io.netty.util.ReferenceCountUtil; 85 import java.util.List; 86 import java.util.concurrent.Future; 87 import java.util.concurrent.ScheduledFuture; 88 import java.util.concurrent.TimeUnit; 89 import java.util.logging.Level; 90 import java.util.logging.Logger; 91 import javax.annotation.CheckForNull; 92 import javax.annotation.Nullable; 93 94 /** 95 * Server-side Netty handler for GRPC processing. All event handlers are executed entirely within 96 * the context of the Netty Channel thread. 97 */ 98 class NettyServerHandler extends AbstractNettyHandler { 99 private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); 100 private static final long KEEPALIVE_PING = 0xDEADL; 101 private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; 102 private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); 103 104 private final Http2Connection.PropertyKey streamKey; 105 private final ServerTransportListener transportListener; 106 private final int maxMessageSize; 107 private final long keepAliveTimeInNanos; 108 private final long keepAliveTimeoutInNanos; 109 private final long maxConnectionAgeInNanos; 110 private final long maxConnectionAgeGraceInNanos; 111 private final List<ServerStreamTracer.Factory> streamTracerFactories; 112 private final TransportTracer transportTracer; 113 private final KeepAliveEnforcer keepAliveEnforcer; 114 /** Incomplete attributes produced by negotiator. */ 115 private Attributes negotiationAttributes; 116 private InternalChannelz.Security securityInfo; 117 /** Completed attributes produced by transportReady. */ 118 private Attributes attributes; 119 private Throwable connectionError; 120 private boolean teWarningLogged; 121 private WriteQueue serverWriteQueue; 122 private AsciiString lastKnownAuthority; 123 @CheckForNull 124 private KeepAliveManager keepAliveManager; 125 @CheckForNull 126 private MaxConnectionIdleManager maxConnectionIdleManager; 127 @CheckForNull 128 private ScheduledFuture<?> maxConnectionAgeMonitor; 129 @CheckForNull 130 private GracefulShutdown gracefulShutdown; 131 newHandler( ServerTransportListener transportListener, ChannelPromise channelUnused, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)132 static NettyServerHandler newHandler( 133 ServerTransportListener transportListener, 134 ChannelPromise channelUnused, 135 List<ServerStreamTracer.Factory> streamTracerFactories, 136 TransportTracer transportTracer, 137 int maxStreams, 138 int flowControlWindow, 139 int maxHeaderListSize, 140 int maxMessageSize, 141 long keepAliveTimeInNanos, 142 long keepAliveTimeoutInNanos, 143 long maxConnectionIdleInNanos, 144 long maxConnectionAgeInNanos, 145 long maxConnectionAgeGraceInNanos, 146 boolean permitKeepAliveWithoutCalls, 147 long permitKeepAliveTimeInNanos) { 148 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); 149 Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyServerHandler.class); 150 Http2HeadersDecoder headersDecoder = new GrpcHttp2ServerHeadersDecoder(maxHeaderListSize); 151 Http2FrameReader frameReader = new Http2InboundFrameLogger( 152 new DefaultHttp2FrameReader(headersDecoder), frameLogger); 153 Http2FrameWriter frameWriter = 154 new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(), frameLogger); 155 return newHandler( 156 channelUnused, 157 frameReader, 158 frameWriter, 159 transportListener, 160 streamTracerFactories, 161 transportTracer, 162 maxStreams, 163 flowControlWindow, 164 maxHeaderListSize, 165 maxMessageSize, 166 keepAliveTimeInNanos, 167 keepAliveTimeoutInNanos, 168 maxConnectionIdleInNanos, 169 maxConnectionAgeInNanos, 170 maxConnectionAgeGraceInNanos, 171 permitKeepAliveWithoutCalls, 172 permitKeepAliveTimeInNanos); 173 } 174 175 @VisibleForTesting newHandler( ChannelPromise channelUnused, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, int maxStreams, int flowControlWindow, int maxHeaderListSize, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos)176 static NettyServerHandler newHandler( 177 ChannelPromise channelUnused, 178 Http2FrameReader frameReader, 179 Http2FrameWriter frameWriter, 180 ServerTransportListener transportListener, 181 List<ServerStreamTracer.Factory> streamTracerFactories, 182 TransportTracer transportTracer, 183 int maxStreams, 184 int flowControlWindow, 185 int maxHeaderListSize, 186 int maxMessageSize, 187 long keepAliveTimeInNanos, 188 long keepAliveTimeoutInNanos, 189 long maxConnectionIdleInNanos, 190 long maxConnectionAgeInNanos, 191 long maxConnectionAgeGraceInNanos, 192 boolean permitKeepAliveWithoutCalls, 193 long permitKeepAliveTimeInNanos) { 194 Preconditions.checkArgument(maxStreams > 0, "maxStreams must be positive"); 195 Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); 196 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); 197 Preconditions.checkArgument(maxMessageSize > 0, "maxMessageSize must be positive"); 198 199 final Http2Connection connection = new DefaultHttp2Connection(true); 200 WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); 201 dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. 202 DefaultHttp2RemoteFlowController controller = 203 new DefaultHttp2RemoteFlowController(connection, dist); 204 connection.remote().flowController(controller); 205 final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer( 206 permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS); 207 208 // Create the local flow controller configured to auto-refill the connection window. 209 connection.local().flowController( 210 new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); 211 frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); 212 Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); 213 Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, 214 frameReader); 215 216 Http2Settings settings = new Http2Settings(); 217 settings.initialWindowSize(flowControlWindow); 218 settings.maxConcurrentStreams(maxStreams); 219 settings.maxHeaderListSize(maxHeaderListSize); 220 221 return new NettyServerHandler( 222 channelUnused, 223 connection, 224 transportListener, 225 streamTracerFactories, 226 transportTracer, 227 decoder, encoder, settings, 228 maxMessageSize, 229 keepAliveTimeInNanos, keepAliveTimeoutInNanos, 230 maxConnectionIdleInNanos, 231 maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, 232 keepAliveEnforcer); 233 } 234 NettyServerHandler( ChannelPromise channelUnused, final Http2Connection connection, ServerTransportListener transportListener, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer transportTracer, Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, int maxMessageSize, long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, final KeepAliveEnforcer keepAliveEnforcer)235 private NettyServerHandler( 236 ChannelPromise channelUnused, 237 final Http2Connection connection, 238 ServerTransportListener transportListener, 239 List<ServerStreamTracer.Factory> streamTracerFactories, 240 TransportTracer transportTracer, 241 Http2ConnectionDecoder decoder, 242 Http2ConnectionEncoder encoder, 243 Http2Settings settings, 244 int maxMessageSize, 245 long keepAliveTimeInNanos, 246 long keepAliveTimeoutInNanos, 247 long maxConnectionIdleInNanos, 248 long maxConnectionAgeInNanos, 249 long maxConnectionAgeGraceInNanos, 250 final KeepAliveEnforcer keepAliveEnforcer) { 251 super(channelUnused, decoder, encoder, settings); 252 253 final MaxConnectionIdleManager maxConnectionIdleManager; 254 if (maxConnectionIdleInNanos == MAX_CONNECTION_IDLE_NANOS_DISABLED) { 255 maxConnectionIdleManager = null; 256 } else { 257 maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { 258 @Override 259 void close(ChannelHandlerContext ctx) { 260 if (gracefulShutdown == null) { 261 gracefulShutdown = new GracefulShutdown("max_idle", null); 262 gracefulShutdown.start(ctx); 263 ctx.flush(); 264 } 265 } 266 }; 267 } 268 269 connection.addListener(new Http2ConnectionAdapter() { 270 @Override 271 public void onStreamActive(Http2Stream stream) { 272 if (connection.numActiveStreams() == 1) { 273 keepAliveEnforcer.onTransportActive(); 274 if (maxConnectionIdleManager != null) { 275 maxConnectionIdleManager.onTransportActive(); 276 } 277 } 278 } 279 280 @Override 281 public void onStreamClosed(Http2Stream stream) { 282 if (connection.numActiveStreams() == 0) { 283 keepAliveEnforcer.onTransportIdle(); 284 if (maxConnectionIdleManager != null) { 285 maxConnectionIdleManager.onTransportIdle(); 286 } 287 } 288 } 289 }); 290 291 checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0"); 292 this.maxMessageSize = maxMessageSize; 293 this.keepAliveTimeInNanos = keepAliveTimeInNanos; 294 this.keepAliveTimeoutInNanos = keepAliveTimeoutInNanos; 295 this.maxConnectionIdleManager = maxConnectionIdleManager; 296 this.maxConnectionAgeInNanos = maxConnectionAgeInNanos; 297 this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; 298 this.keepAliveEnforcer = checkNotNull(keepAliveEnforcer, "keepAliveEnforcer"); 299 300 streamKey = encoder.connection().newKey(); 301 this.transportListener = checkNotNull(transportListener, "transportListener"); 302 this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); 303 this.transportTracer = checkNotNull(transportTracer, "transportTracer"); 304 305 // Set the frame listener on the decoder. 306 decoder().frameListener(new FrameListener()); 307 } 308 309 @Nullable connectionError()310 Throwable connectionError() { 311 return connectionError; 312 } 313 314 @Override handlerAdded(final ChannelHandlerContext ctx)315 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { 316 serverWriteQueue = new WriteQueue(ctx.channel()); 317 318 // init max connection age monitor 319 if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { 320 maxConnectionAgeMonitor = ctx.executor().schedule( 321 new LogExceptionRunnable(new Runnable() { 322 @Override 323 public void run() { 324 if (gracefulShutdown == null) { 325 gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos); 326 gracefulShutdown.start(ctx); 327 ctx.flush(); 328 } 329 } 330 }), 331 maxConnectionAgeInNanos, 332 TimeUnit.NANOSECONDS); 333 } 334 335 if (maxConnectionIdleManager != null) { 336 maxConnectionIdleManager.start(ctx); 337 } 338 339 if (keepAliveTimeInNanos != SERVER_KEEPALIVE_TIME_NANOS_DISABLED) { 340 keepAliveManager = new KeepAliveManager(new KeepAlivePinger(ctx), ctx.executor(), 341 keepAliveTimeInNanos, keepAliveTimeoutInNanos, true /* keepAliveDuringTransportIdle */); 342 keepAliveManager.onTransportStarted(); 343 } 344 345 346 if (transportTracer != null) { 347 assert encoder().connection().equals(decoder().connection()); 348 final Http2Connection connection = encoder().connection(); 349 transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() { 350 private final Http2FlowController local = connection.local().flowController(); 351 private final Http2FlowController remote = connection.remote().flowController(); 352 353 @Override 354 public TransportTracer.FlowControlWindows read() { 355 assert ctx.executor().inEventLoop(); 356 return new TransportTracer.FlowControlWindows( 357 local.windowSize(connection.connectionStream()), 358 remote.windowSize(connection.connectionStream())); 359 } 360 }); 361 } 362 363 super.handlerAdded(ctx); 364 } 365 onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers)366 private void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers) 367 throws Http2Exception { 368 if (!teWarningLogged && !TE_TRAILERS.equals(headers.get(TE_HEADER))) { 369 logger.warning(String.format("Expected header TE: %s, but %s is received. This means " 370 + "some intermediate proxy may not support trailers", 371 TE_TRAILERS, headers.get(TE_HEADER))); 372 teWarningLogged = true; 373 } 374 375 try { 376 377 // Remove the leading slash of the path and get the fully qualified method name 378 CharSequence path = headers.path(); 379 380 if (path == null) { 381 respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, 382 "Expected path but is missing"); 383 return; 384 } 385 386 if (path.charAt(0) != '/') { 387 respondWithHttpError(ctx, streamId, 404, Status.Code.UNIMPLEMENTED, 388 String.format("Expected path to start with /: %s", path)); 389 return; 390 } 391 392 String method = path.subSequence(1, path.length()).toString(); 393 394 // Verify that the Content-Type is correct in the request. 395 CharSequence contentType = headers.get(CONTENT_TYPE_HEADER); 396 if (contentType == null) { 397 respondWithHttpError( 398 ctx, streamId, 415, Status.Code.INTERNAL, "Content-Type is missing from the request"); 399 return; 400 } 401 String contentTypeString = contentType.toString(); 402 if (!GrpcUtil.isGrpcContentType(contentTypeString)) { 403 respondWithHttpError(ctx, streamId, 415, Status.Code.INTERNAL, 404 String.format("Content-Type '%s' is not supported", contentTypeString)); 405 return; 406 } 407 408 if (!HTTP_METHOD.equals(headers.method())) { 409 respondWithHttpError(ctx, streamId, 405, Status.Code.INTERNAL, 410 String.format("Method '%s' is not supported", headers.method())); 411 return; 412 } 413 414 // The Http2Stream object was put by AbstractHttp2ConnectionHandler before calling this 415 // method. 416 Http2Stream http2Stream = requireHttp2Stream(streamId); 417 418 Metadata metadata = Utils.convertHeaders(headers); 419 StatsTraceContext statsTraceCtx = 420 StatsTraceContext.newServerContext(streamTracerFactories, method, metadata); 421 422 NettyServerStream.TransportState state = new NettyServerStream.TransportState( 423 this, 424 ctx.channel().eventLoop(), 425 http2Stream, 426 maxMessageSize, 427 statsTraceCtx, 428 transportTracer); 429 String authority = getOrUpdateAuthority((AsciiString) headers.authority()); 430 NettyServerStream stream = new NettyServerStream( 431 ctx.channel(), 432 state, 433 attributes, 434 authority, 435 statsTraceCtx, 436 transportTracer); 437 transportListener.streamCreated(stream, method, metadata); 438 state.onStreamAllocated(); 439 http2Stream.setProperty(streamKey, state); 440 } catch (Exception e) { 441 logger.log(Level.WARNING, "Exception in onHeadersRead()", e); 442 // Throw an exception that will get handled by onStreamError. 443 throw newStreamException(streamId, e); 444 } 445 } 446 getOrUpdateAuthority(AsciiString authority)447 private String getOrUpdateAuthority(AsciiString authority) { 448 if (authority == null) { 449 return null; 450 } else if (!authority.equals(lastKnownAuthority)) { 451 lastKnownAuthority = authority; 452 } 453 454 // AsciiString.toString() is internally cached, so subsequent calls will not 455 // result in recomputing the String representation of lastKnownAuthority. 456 return lastKnownAuthority.toString(); 457 } 458 onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)459 private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) 460 throws Http2Exception { 461 flowControlPing().onDataRead(data.readableBytes(), padding); 462 try { 463 NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); 464 stream.inboundDataReceived(data, endOfStream); 465 } catch (Throwable e) { 466 logger.log(Level.WARNING, "Exception in onDataRead()", e); 467 // Throw an exception that will get handled by onStreamError. 468 throw newStreamException(streamId, e); 469 } 470 } 471 onRstStreamRead(int streamId, long errorCode)472 private void onRstStreamRead(int streamId, long errorCode) throws Http2Exception { 473 try { 474 NettyServerStream.TransportState stream = serverStream(connection().stream(streamId)); 475 if (stream != null) { 476 stream.transportReportStatus( 477 Status.CANCELLED.withDescription("RST_STREAM received for code " + errorCode)); 478 } 479 } catch (Throwable e) { 480 logger.log(Level.WARNING, "Exception in onRstStreamRead()", e); 481 // Throw an exception that will get handled by onStreamError. 482 throw newStreamException(streamId, e); 483 } 484 } 485 486 @Override onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)487 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 488 Http2Exception http2Ex) { 489 logger.log(Level.FINE, "Connection Error", cause); 490 connectionError = cause; 491 super.onConnectionError(ctx, outbound, cause, http2Ex); 492 } 493 494 @Override onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, StreamException http2Ex)495 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 496 StreamException http2Ex) { 497 logger.log(Level.WARNING, "Stream Error", cause); 498 NettyServerStream.TransportState serverStream = serverStream( 499 connection().stream(Http2Exception.streamId(http2Ex))); 500 if (serverStream != null) { 501 serverStream.transportReportStatus(Utils.statusFromThrowable(cause)); 502 } 503 // TODO(ejona): Abort the stream by sending headers to help the client with debugging. 504 // Delegate to the base class to send a RST_STREAM. 505 super.onStreamError(ctx, outbound, cause, http2Ex); 506 } 507 508 @Override handleProtocolNegotiationCompleted( Attributes attrs, InternalChannelz.Security securityInfo)509 public void handleProtocolNegotiationCompleted( 510 Attributes attrs, InternalChannelz.Security securityInfo) { 511 negotiationAttributes = attrs; 512 this.securityInfo = securityInfo; 513 } 514 getSecurityInfo()515 InternalChannelz.Security getSecurityInfo() { 516 return securityInfo; 517 } 518 519 @VisibleForTesting getKeepAliveManagerForTest()520 KeepAliveManager getKeepAliveManagerForTest() { 521 return keepAliveManager; 522 } 523 524 @VisibleForTesting setKeepAliveManagerForTest(KeepAliveManager keepAliveManager)525 void setKeepAliveManagerForTest(KeepAliveManager keepAliveManager) { 526 this.keepAliveManager = keepAliveManager; 527 } 528 529 /** 530 * Handler for the Channel shutting down. 531 */ 532 @Override channelInactive(ChannelHandlerContext ctx)533 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 534 try { 535 if (keepAliveManager != null) { 536 keepAliveManager.onTransportTermination(); 537 } 538 if (maxConnectionIdleManager != null) { 539 maxConnectionIdleManager.onTransportTermination(); 540 } 541 if (maxConnectionAgeMonitor != null) { 542 maxConnectionAgeMonitor.cancel(false); 543 } 544 final Status status = 545 Status.UNAVAILABLE.withDescription("connection terminated for unknown reason"); 546 // Any streams that are still active must be closed 547 connection().forEachActiveStream(new Http2StreamVisitor() { 548 @Override 549 public boolean visit(Http2Stream stream) throws Http2Exception { 550 NettyServerStream.TransportState serverStream = serverStream(stream); 551 if (serverStream != null) { 552 serverStream.transportReportStatus(status); 553 } 554 return true; 555 } 556 }); 557 } finally { 558 super.channelInactive(ctx); 559 } 560 } 561 getWriteQueue()562 WriteQueue getWriteQueue() { 563 return serverWriteQueue; 564 } 565 566 /** 567 * Handler for commands sent from the stream. 568 */ 569 @Override write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)570 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 571 throws Exception { 572 if (msg instanceof SendGrpcFrameCommand) { 573 sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise); 574 } else if (msg instanceof SendResponseHeadersCommand) { 575 sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise); 576 } else if (msg instanceof CancelServerStreamCommand) { 577 cancelStream(ctx, (CancelServerStreamCommand) msg, promise); 578 } else if (msg instanceof ForcefulCloseCommand) { 579 forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); 580 } else { 581 AssertionError e = 582 new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); 583 ReferenceCountUtil.release(msg); 584 promise.setFailure(e); 585 throw e; 586 } 587 } 588 589 /** 590 * Returns the given processed bytes back to inbound flow control. 591 */ returnProcessedBytes(Http2Stream http2Stream, int bytes)592 void returnProcessedBytes(Http2Stream http2Stream, int bytes) { 593 try { 594 decoder().flowController().consumeBytes(http2Stream, bytes); 595 } catch (Http2Exception e) { 596 throw new RuntimeException(e); 597 } 598 } 599 closeStreamWhenDone(ChannelPromise promise, int streamId)600 private void closeStreamWhenDone(ChannelPromise promise, int streamId) throws Http2Exception { 601 final NettyServerStream.TransportState stream = serverStream(requireHttp2Stream(streamId)); 602 promise.addListener(new ChannelFutureListener() { 603 @Override 604 public void operationComplete(ChannelFuture future) { 605 stream.complete(); 606 } 607 }); 608 } 609 610 /** 611 * Sends the given gRPC frame to the client. 612 */ sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)613 private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, 614 ChannelPromise promise) throws Http2Exception { 615 if (cmd.endStream()) { 616 closeStreamWhenDone(promise, cmd.streamId()); 617 } 618 // Call the base class to write the HTTP/2 DATA frame. 619 encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise); 620 } 621 622 /** 623 * Sends the response headers to the client. 624 */ sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, ChannelPromise promise)625 private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersCommand cmd, 626 ChannelPromise promise) throws Http2Exception { 627 // TODO(carl-mastrangelo): remove this check once https://github.com/netty/netty/issues/6296 is 628 // fixed. 629 int streamId = cmd.stream().id(); 630 Http2Stream stream = connection().stream(streamId); 631 if (stream == null) { 632 resetStream(ctx, streamId, Http2Error.CANCEL.code(), promise); 633 return; 634 } 635 if (cmd.endOfStream()) { 636 closeStreamWhenDone(promise, streamId); 637 } 638 encoder().writeHeaders(ctx, streamId, cmd.headers(), 0, cmd.endOfStream(), promise); 639 } 640 cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, ChannelPromise promise)641 private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd, 642 ChannelPromise promise) { 643 // Notify the listener if we haven't already. 644 cmd.stream().transportReportStatus(cmd.reason()); 645 // Terminate the stream. 646 encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise); 647 } 648 forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)649 private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, 650 ChannelPromise promise) throws Exception { 651 close(ctx, promise); 652 connection().forEachActiveStream(new Http2StreamVisitor() { 653 @Override 654 public boolean visit(Http2Stream stream) throws Http2Exception { 655 NettyServerStream.TransportState serverStream = serverStream(stream); 656 if (serverStream != null) { 657 serverStream.transportReportStatus(msg.getStatus()); 658 resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); 659 } 660 stream.close(); 661 return true; 662 } 663 }); 664 } 665 respondWithHttpError( ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg)666 private void respondWithHttpError( 667 ChannelHandlerContext ctx, int streamId, int code, Status.Code statusCode, String msg) { 668 Metadata metadata = new Metadata(); 669 metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus()); 670 metadata.put(InternalStatus.MESSAGE_KEY, msg); 671 byte[][] serialized = InternalMetadata.serialize(metadata); 672 673 Http2Headers headers = new DefaultHttp2Headers(true, serialized.length / 2) 674 .status("" + code) 675 .set(CONTENT_TYPE_HEADER, "text/plain; encoding=utf-8"); 676 for (int i = 0; i < serialized.length; i += 2) { 677 headers.add(new AsciiString(serialized[i], false), new AsciiString(serialized[i + 1], false)); 678 } 679 encoder().writeHeaders(ctx, streamId, headers, 0, false, ctx.newPromise()); 680 ByteBuf msgBuf = ByteBufUtil.writeUtf8(ctx.alloc(), msg); 681 encoder().writeData(ctx, streamId, msgBuf, 0, true, ctx.newPromise()); 682 } 683 requireHttp2Stream(int streamId)684 private Http2Stream requireHttp2Stream(int streamId) { 685 Http2Stream stream = connection().stream(streamId); 686 if (stream == null) { 687 // This should never happen. 688 throw new AssertionError("Stream does not exist: " + streamId); 689 } 690 return stream; 691 } 692 693 /** 694 * Returns the server stream associated to the given HTTP/2 stream object. 695 */ serverStream(Http2Stream stream)696 private NettyServerStream.TransportState serverStream(Http2Stream stream) { 697 return stream == null ? null : (NettyServerStream.TransportState) stream.getProperty(streamKey); 698 } 699 newStreamException(int streamId, Throwable cause)700 private Http2Exception newStreamException(int streamId, Throwable cause) { 701 return Http2Exception.streamError( 702 streamId, Http2Error.INTERNAL_ERROR, cause, Strings.nullToEmpty(cause.getMessage())); 703 } 704 705 private class FrameListener extends Http2FrameAdapter { 706 private boolean firstSettings = true; 707 708 @Override onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)709 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { 710 if (firstSettings) { 711 firstSettings = false; 712 // Delay transportReady until we see the client's HTTP handshake, for coverage with 713 // handshakeTimeout 714 attributes = transportListener.transportReady(negotiationAttributes); 715 } 716 } 717 718 @Override onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)719 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, 720 boolean endOfStream) throws Http2Exception { 721 if (keepAliveManager != null) { 722 keepAliveManager.onDataReceived(); 723 } 724 NettyServerHandler.this.onDataRead(streamId, data, padding, endOfStream); 725 return padding; 726 } 727 728 @Override onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)729 public void onHeadersRead(ChannelHandlerContext ctx, 730 int streamId, 731 Http2Headers headers, 732 int streamDependency, 733 short weight, 734 boolean exclusive, 735 int padding, 736 boolean endStream) throws Http2Exception { 737 if (keepAliveManager != null) { 738 keepAliveManager.onDataReceived(); 739 } 740 NettyServerHandler.this.onHeadersRead(ctx, streamId, headers); 741 } 742 743 @Override onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)744 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) 745 throws Http2Exception { 746 if (keepAliveManager != null) { 747 keepAliveManager.onDataReceived(); 748 } 749 NettyServerHandler.this.onRstStreamRead(streamId, errorCode); 750 } 751 752 @Override onPingRead(ChannelHandlerContext ctx, long data)753 public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception { 754 if (keepAliveManager != null) { 755 keepAliveManager.onDataReceived(); 756 } 757 if (!keepAliveEnforcer.pingAcceptable()) { 758 ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "too_many_pings"); 759 goAway(ctx, connection().remote().lastStreamCreated(), Http2Error.ENHANCE_YOUR_CALM.code(), 760 debugData, ctx.newPromise()); 761 Status status = Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"); 762 try { 763 forcefulClose(ctx, new ForcefulCloseCommand(status), ctx.newPromise()); 764 } catch (Exception ex) { 765 onError(ctx, /* outbound= */ true, ex); 766 } 767 } 768 } 769 770 @Override onPingAckRead(ChannelHandlerContext ctx, long data)771 public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception { 772 if (keepAliveManager != null) { 773 keepAliveManager.onDataReceived(); 774 } 775 if (data == flowControlPing().payload()) { 776 flowControlPing().updateWindow(); 777 if (logger.isLoggable(Level.FINE)) { 778 logger.log(Level.FINE, String.format("Window: %d", 779 decoder().flowController().initialWindowSize(connection().connectionStream()))); 780 } 781 } else if (data == GRACEFUL_SHUTDOWN_PING) { 782 if (gracefulShutdown == null) { 783 // this should never happen 784 logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null"); 785 } else { 786 gracefulShutdown.secondGoAwayAndClose(ctx); 787 } 788 } else if (data != KEEPALIVE_PING) { 789 logger.warning("Received unexpected ping ack. No ping outstanding"); 790 } 791 } 792 } 793 794 private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger { 795 final ChannelHandlerContext ctx; 796 KeepAlivePinger(ChannelHandlerContext ctx)797 KeepAlivePinger(ChannelHandlerContext ctx) { 798 this.ctx = ctx; 799 } 800 801 @Override ping()802 public void ping() { 803 ChannelFuture pingFuture = encoder().writePing( 804 ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise()); 805 ctx.flush(); 806 if (transportTracer != null) { 807 pingFuture.addListener(new ChannelFutureListener() { 808 @Override 809 public void operationComplete(ChannelFuture future) throws Exception { 810 if (future.isSuccess()) { 811 transportTracer.reportKeepAliveSent(); 812 } 813 } 814 }); 815 } 816 } 817 818 @Override onPingTimeout()819 public void onPingTimeout() { 820 try { 821 forcefulClose( 822 ctx, 823 new ForcefulCloseCommand(Status.UNAVAILABLE 824 .withDescription("Keepalive failed. The connection is likely gone")), 825 ctx.newPromise()); 826 } catch (Exception ex) { 827 try { 828 exceptionCaught(ctx, ex); 829 } catch (Exception ex2) { 830 logger.log(Level.WARNING, "Exception while propagating exception", ex2); 831 logger.log(Level.WARNING, "Original failure", ex); 832 } 833 } 834 } 835 } 836 837 private final class GracefulShutdown { 838 String goAwayMessage; 839 840 /** 841 * The grace time between starting graceful shutdown and closing the netty channel, 842 * {@code null} is unspecified. 843 */ 844 @CheckForNull 845 Long graceTimeInNanos; 846 847 /** 848 * True if ping is Acked or ping is timeout. 849 */ 850 boolean pingAckedOrTimeout; 851 852 Future<?> pingFuture; 853 GracefulShutdown(String goAwayMessage, @Nullable Long graceTimeInNanos)854 GracefulShutdown(String goAwayMessage, 855 @Nullable Long graceTimeInNanos) { 856 this.goAwayMessage = goAwayMessage; 857 this.graceTimeInNanos = graceTimeInNanos; 858 } 859 860 /** 861 * Sends out first GOAWAY and ping, and schedules second GOAWAY and close. 862 */ start(final ChannelHandlerContext ctx)863 void start(final ChannelHandlerContext ctx) { 864 goAway( 865 ctx, 866 Integer.MAX_VALUE, 867 Http2Error.NO_ERROR.code(), 868 ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), 869 ctx.newPromise()); 870 871 pingFuture = ctx.executor().schedule( 872 new Runnable() { 873 @Override 874 public void run() { 875 secondGoAwayAndClose(ctx); 876 } 877 }, 878 GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, 879 TimeUnit.NANOSECONDS); 880 881 encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise()); 882 } 883 secondGoAwayAndClose(ChannelHandlerContext ctx)884 void secondGoAwayAndClose(ChannelHandlerContext ctx) { 885 if (pingAckedOrTimeout) { 886 return; 887 } 888 pingAckedOrTimeout = true; 889 890 checkNotNull(pingFuture, "pingFuture"); 891 pingFuture.cancel(false); 892 893 // send the second GOAWAY with last stream id 894 goAway( 895 ctx, 896 connection().remote().lastStreamCreated(), 897 Http2Error.NO_ERROR.code(), 898 ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), 899 ctx.newPromise()); 900 901 // gracefully shutdown with specified grace time 902 long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); 903 long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis; 904 if (graceTimeInNanos != null) { 905 gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos); 906 } 907 try { 908 gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis); 909 close(ctx, ctx.newPromise()); 910 } catch (Exception e) { 911 onError(ctx, /* outbound= */ true, e); 912 } finally { 913 gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis); 914 } 915 } 916 } 917 918 // Use a frame writer so that we know when frames are through flow control and actually being 919 // written. 920 private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter { 921 private final KeepAliveEnforcer keepAliveEnforcer; 922 WriteMonitoringFrameWriter(Http2FrameWriter delegate, KeepAliveEnforcer keepAliveEnforcer)923 public WriteMonitoringFrameWriter(Http2FrameWriter delegate, 924 KeepAliveEnforcer keepAliveEnforcer) { 925 super(delegate); 926 this.keepAliveEnforcer = keepAliveEnforcer; 927 } 928 929 @Override writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)930 public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data, 931 int padding, boolean endStream, ChannelPromise promise) { 932 keepAliveEnforcer.resetCounters(); 933 return super.writeData(ctx, streamId, data, padding, endStream, promise); 934 } 935 936 @Override writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)937 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, 938 int padding, boolean endStream, ChannelPromise promise) { 939 keepAliveEnforcer.resetCounters(); 940 return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); 941 } 942 943 @Override writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)944 public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers, 945 int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, 946 ChannelPromise promise) { 947 keepAliveEnforcer.resetCounters(); 948 return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, 949 padding, endStream, promise); 950 } 951 } 952 } 953