1 /* 2 * Copyright 2014 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO; 20 import static io.netty.util.CharsetUtil.UTF_8; 21 import static io.netty.util.internal.ObjectUtil.checkNotNull; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.Preconditions; 25 import com.google.common.base.Stopwatch; 26 import com.google.common.base.Supplier; 27 import com.google.common.base.Ticker; 28 import io.grpc.Attributes; 29 import io.grpc.ChannelLogger; 30 import io.grpc.InternalChannelz; 31 import io.grpc.Metadata; 32 import io.grpc.Status; 33 import io.grpc.StatusException; 34 import io.grpc.internal.ClientStreamListener.RpcProgress; 35 import io.grpc.internal.ClientTransport.PingCallback; 36 import io.grpc.internal.GrpcAttributes; 37 import io.grpc.internal.GrpcUtil; 38 import io.grpc.internal.Http2Ping; 39 import io.grpc.internal.InUseStateAggregator; 40 import io.grpc.internal.KeepAliveManager; 41 import io.grpc.internal.TransportTracer; 42 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder; 43 import io.netty.buffer.ByteBuf; 44 import io.netty.buffer.ByteBufUtil; 45 import io.netty.buffer.Unpooled; 46 import io.netty.channel.Channel; 47 import io.netty.channel.ChannelFuture; 48 import io.netty.channel.ChannelFutureListener; 49 import io.netty.channel.ChannelHandlerContext; 50 import io.netty.channel.ChannelPromise; 51 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter; 52 import io.netty.handler.codec.http2.DefaultHttp2Connection; 53 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder; 54 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder; 55 import io.netty.handler.codec.http2.DefaultHttp2FrameReader; 56 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter; 57 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController; 58 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController; 59 import io.netty.handler.codec.http2.Http2CodecUtil; 60 import io.netty.handler.codec.http2.Http2Connection; 61 import io.netty.handler.codec.http2.Http2ConnectionAdapter; 62 import io.netty.handler.codec.http2.Http2ConnectionDecoder; 63 import io.netty.handler.codec.http2.Http2ConnectionEncoder; 64 import io.netty.handler.codec.http2.Http2Error; 65 import io.netty.handler.codec.http2.Http2Exception; 66 import io.netty.handler.codec.http2.Http2FrameAdapter; 67 import io.netty.handler.codec.http2.Http2FrameLogger; 68 import io.netty.handler.codec.http2.Http2FrameReader; 69 import io.netty.handler.codec.http2.Http2FrameWriter; 70 import io.netty.handler.codec.http2.Http2Headers; 71 import io.netty.handler.codec.http2.Http2HeadersDecoder; 72 import io.netty.handler.codec.http2.Http2InboundFrameLogger; 73 import io.netty.handler.codec.http2.Http2OutboundFrameLogger; 74 import io.netty.handler.codec.http2.Http2Settings; 75 import io.netty.handler.codec.http2.Http2Stream; 76 import io.netty.handler.codec.http2.Http2StreamVisitor; 77 import io.netty.handler.codec.http2.StreamBufferingEncoder; 78 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor; 79 import io.netty.handler.logging.LogLevel; 80 import io.perfmark.PerfMark; 81 import io.perfmark.Tag; 82 import io.perfmark.TaskCloseable; 83 import java.nio.channels.ClosedChannelException; 84 import java.util.concurrent.Executor; 85 import java.util.logging.Level; 86 import java.util.logging.Logger; 87 import javax.annotation.Nullable; 88 89 /** 90 * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within 91 * the context of the Netty Channel thread. 92 */ 93 class NettyClientHandler extends AbstractNettyHandler { 94 private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName()); 95 96 /** 97 * A message that simply passes through the channel without any real processing. It is useful to 98 * check if buffers have been drained and test the health of the channel in a single operation. 99 */ 100 static final Object NOOP_MESSAGE = new Object(); 101 102 /** 103 * Status used when the transport has exhausted the number of streams. 104 */ 105 private static final Status EXHAUSTED_STREAMS_STATUS = 106 Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted"); 107 private static final long USER_PING_PAYLOAD = 1111; 108 109 private final Http2Connection.PropertyKey streamKey; 110 private final ClientTransportLifecycleManager lifecycleManager; 111 private final KeepAliveManager keepAliveManager; 112 // Returns new unstarted stopwatches 113 private final Supplier<Stopwatch> stopwatchFactory; 114 private final TransportTracer transportTracer; 115 private final Attributes eagAttributes; 116 private final String authority; 117 private final InUseStateAggregator<Http2Stream> inUseState = 118 new InUseStateAggregator<Http2Stream>() { 119 @Override 120 protected void handleInUse() { 121 lifecycleManager.notifyInUse(true); 122 } 123 124 @Override 125 protected void handleNotInUse() { 126 lifecycleManager.notifyInUse(false); 127 } 128 }; 129 130 private WriteQueue clientWriteQueue; 131 private Http2Ping ping; 132 private Attributes attributes; 133 private InternalChannelz.Security securityInfo; 134 private Status abruptGoAwayStatus; 135 private Status channelInactiveReason; 136 newHandler( ClientTransportLifecycleManager lifecycleManager, @Nullable KeepAliveManager keepAliveManager, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, Ticker ticker)137 static NettyClientHandler newHandler( 138 ClientTransportLifecycleManager lifecycleManager, 139 @Nullable KeepAliveManager keepAliveManager, 140 boolean autoFlowControl, 141 int flowControlWindow, 142 int maxHeaderListSize, 143 Supplier<Stopwatch> stopwatchFactory, 144 Runnable tooManyPingsRunnable, 145 TransportTracer transportTracer, 146 Attributes eagAttributes, 147 String authority, 148 ChannelLogger negotiationLogger, 149 Ticker ticker) { 150 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); 151 Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize); 152 Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder); 153 Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter(); 154 Http2Connection connection = new DefaultHttp2Connection(false); 155 WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); 156 dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. 157 DefaultHttp2RemoteFlowController controller = 158 new DefaultHttp2RemoteFlowController(connection, dist); 159 connection.remote().flowController(controller); 160 161 return newHandler( 162 connection, 163 frameReader, 164 frameWriter, 165 lifecycleManager, 166 keepAliveManager, 167 autoFlowControl, 168 flowControlWindow, 169 maxHeaderListSize, 170 stopwatchFactory, 171 tooManyPingsRunnable, 172 transportTracer, 173 eagAttributes, 174 authority, 175 negotiationLogger, 176 ticker); 177 } 178 179 @VisibleForTesting newHandler( final Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, Ticker ticker)180 static NettyClientHandler newHandler( 181 final Http2Connection connection, 182 Http2FrameReader frameReader, 183 Http2FrameWriter frameWriter, 184 ClientTransportLifecycleManager lifecycleManager, 185 KeepAliveManager keepAliveManager, 186 boolean autoFlowControl, 187 int flowControlWindow, 188 int maxHeaderListSize, 189 Supplier<Stopwatch> stopwatchFactory, 190 Runnable tooManyPingsRunnable, 191 TransportTracer transportTracer, 192 Attributes eagAttributes, 193 String authority, 194 ChannelLogger negotiationLogger, 195 Ticker ticker) { 196 Preconditions.checkNotNull(connection, "connection"); 197 Preconditions.checkNotNull(frameReader, "frameReader"); 198 Preconditions.checkNotNull(lifecycleManager, "lifecycleManager"); 199 Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive"); 200 Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive"); 201 Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory"); 202 Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable"); 203 Preconditions.checkNotNull(eagAttributes, "eagAttributes"); 204 Preconditions.checkNotNull(authority, "authority"); 205 206 Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class); 207 frameReader = new Http2InboundFrameLogger(frameReader, frameLogger); 208 frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger); 209 210 PingCountingFrameWriter pingCounter; 211 frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter); 212 213 StreamBufferingEncoder encoder = 214 new StreamBufferingEncoder( 215 new DefaultHttp2ConnectionEncoder(connection, frameWriter)); 216 217 // Create the local flow controller configured to auto-refill the connection window. 218 connection.local().flowController( 219 new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); 220 221 Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, 222 frameReader); 223 224 transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection)); 225 226 Http2Settings settings = new Http2Settings(); 227 settings.pushEnabled(false); 228 settings.initialWindowSize(flowControlWindow); 229 settings.maxConcurrentStreams(0); 230 settings.maxHeaderListSize(maxHeaderListSize); 231 232 return new NettyClientHandler( 233 decoder, 234 encoder, 235 settings, 236 negotiationLogger, 237 lifecycleManager, 238 keepAliveManager, 239 stopwatchFactory, 240 tooManyPingsRunnable, 241 transportTracer, 242 eagAttributes, 243 authority, 244 autoFlowControl, 245 pingCounter, 246 ticker); 247 } 248 NettyClientHandler( Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, ChannelLogger negotiationLogger, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, Supplier<Stopwatch> stopwatchFactory, final Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, boolean autoFlowControl, PingLimiter pingLimiter, Ticker ticker)249 private NettyClientHandler( 250 Http2ConnectionDecoder decoder, 251 Http2ConnectionEncoder encoder, 252 Http2Settings settings, 253 ChannelLogger negotiationLogger, 254 ClientTransportLifecycleManager lifecycleManager, 255 KeepAliveManager keepAliveManager, 256 Supplier<Stopwatch> stopwatchFactory, 257 final Runnable tooManyPingsRunnable, 258 TransportTracer transportTracer, 259 Attributes eagAttributes, 260 String authority, 261 boolean autoFlowControl, 262 PingLimiter pingLimiter, 263 Ticker ticker) { 264 super(/* channelUnused= */ null, decoder, encoder, settings, 265 negotiationLogger, autoFlowControl, pingLimiter, ticker); 266 this.lifecycleManager = lifecycleManager; 267 this.keepAliveManager = keepAliveManager; 268 this.stopwatchFactory = stopwatchFactory; 269 this.transportTracer = Preconditions.checkNotNull(transportTracer); 270 this.eagAttributes = eagAttributes; 271 this.authority = authority; 272 this.attributes = Attributes.newBuilder() 273 .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build(); 274 275 // Set the frame listener on the decoder. 276 decoder().frameListener(new FrameListener()); 277 278 Http2Connection connection = encoder.connection(); 279 streamKey = connection.newKey(); 280 281 connection.addListener(new Http2ConnectionAdapter() { 282 @Override 283 public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) { 284 byte[] debugDataBytes = ByteBufUtil.getBytes(debugData); 285 goingAway(errorCode, debugDataBytes); 286 if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) { 287 String data = new String(debugDataBytes, UTF_8); 288 logger.log( 289 Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data); 290 if ("too_many_pings".equals(data)) { 291 tooManyPingsRunnable.run(); 292 } 293 } 294 } 295 296 @Override 297 public void onStreamActive(Http2Stream stream) { 298 if (connection().numActiveStreams() == 1 299 && NettyClientHandler.this.keepAliveManager != null) { 300 NettyClientHandler.this.keepAliveManager.onTransportActive(); 301 } 302 } 303 304 @Override 305 public void onStreamClosed(Http2Stream stream) { 306 // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in 307 // the first place, we don't propagate that option here, and it's safe to reset the in-use 308 // state for them, which will be a cheap no-op. 309 inUseState.updateObjectInUse(stream, false); 310 if (connection().numActiveStreams() == 0 311 && NettyClientHandler.this.keepAliveManager != null) { 312 NettyClientHandler.this.keepAliveManager.onTransportIdle(); 313 } 314 } 315 }); 316 } 317 318 /** 319 * The protocol negotiation attributes, available once the protocol negotiation completes; 320 * otherwise returns {@code Attributes.EMPTY}. 321 */ getAttributes()322 Attributes getAttributes() { 323 return attributes; 324 } 325 326 /** 327 * Handler for commands sent from the stream. 328 */ 329 @Override write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)330 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 331 throws Exception { 332 if (msg instanceof CreateStreamCommand) { 333 createStream((CreateStreamCommand) msg, promise); 334 } else if (msg instanceof SendGrpcFrameCommand) { 335 sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise); 336 } else if (msg instanceof CancelClientStreamCommand) { 337 cancelStream(ctx, (CancelClientStreamCommand) msg, promise); 338 } else if (msg instanceof SendPingCommand) { 339 sendPingFrame(ctx, (SendPingCommand) msg, promise); 340 } else if (msg instanceof GracefulCloseCommand) { 341 gracefulClose(ctx, (GracefulCloseCommand) msg, promise); 342 } else if (msg instanceof ForcefulCloseCommand) { 343 forcefulClose(ctx, (ForcefulCloseCommand) msg, promise); 344 } else if (msg == NOOP_MESSAGE) { 345 ctx.write(Unpooled.EMPTY_BUFFER, promise); 346 } else { 347 throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName()); 348 } 349 } 350 startWriteQueue(Channel channel)351 void startWriteQueue(Channel channel) { 352 clientWriteQueue = new WriteQueue(channel); 353 } 354 getWriteQueue()355 WriteQueue getWriteQueue() { 356 return clientWriteQueue; 357 } 358 getLifecycleManager()359 ClientTransportLifecycleManager getLifecycleManager() { 360 return lifecycleManager; 361 } 362 363 /** 364 * Returns the given processed bytes back to inbound flow control. 365 */ returnProcessedBytes(Http2Stream stream, int bytes)366 void returnProcessedBytes(Http2Stream stream, int bytes) { 367 try { 368 decoder().flowController().consumeBytes(stream, bytes); 369 } catch (Http2Exception e) { 370 throw new RuntimeException(e); 371 } 372 } 373 onHeadersRead(int streamId, Http2Headers headers, boolean endStream)374 private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) { 375 // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here: 376 if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) { 377 NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); 378 PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag()); 379 stream.transportHeadersReceived(headers, endStream); 380 } 381 382 if (keepAliveManager != null) { 383 keepAliveManager.onDataReceived(); 384 } 385 } 386 387 /** 388 * Handler for an inbound HTTP/2 DATA frame. 389 */ onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)390 private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) { 391 flowControlPing().onDataRead(data.readableBytes(), padding); 392 NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId)); 393 PerfMark.event("NettyClientHandler.onDataRead", stream.tag()); 394 stream.transportDataReceived(data, endOfStream); 395 if (keepAliveManager != null) { 396 keepAliveManager.onDataReceived(); 397 } 398 } 399 400 /** 401 * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream. 402 */ onRstStreamRead(int streamId, long errorCode)403 private void onRstStreamRead(int streamId, long errorCode) { 404 NettyClientStream.TransportState stream = clientStream(connection().stream(streamId)); 405 if (stream != null) { 406 PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag()); 407 Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null); 408 stream.transportReportStatus( 409 status, 410 errorCode == Http2Error.REFUSED_STREAM.code() 411 ? RpcProgress.REFUSED : RpcProgress.PROCESSED, 412 false /*stop delivery*/, 413 new Metadata()); 414 if (keepAliveManager != null) { 415 keepAliveManager.onDataReceived(); 416 } 417 } 418 } 419 420 @Override close(ChannelHandlerContext ctx, ChannelPromise promise)421 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 422 logger.fine("Network channel being closed by the application."); 423 if (ctx.channel().isActive()) { // Ignore notification that the socket was closed 424 lifecycleManager.notifyShutdown( 425 Status.UNAVAILABLE.withDescription("Transport closed for unknown reason")); 426 } 427 super.close(ctx, promise); 428 } 429 430 /** 431 * Handler for the Channel shutting down. 432 */ 433 @Override channelInactive(ChannelHandlerContext ctx)434 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 435 try { 436 logger.fine("Network channel is closed"); 437 Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason"); 438 lifecycleManager.notifyShutdown(status); 439 final Status streamStatus; 440 if (channelInactiveReason != null) { 441 streamStatus = channelInactiveReason; 442 } else { 443 streamStatus = lifecycleManager.getShutdownStatus(); 444 } 445 try { 446 cancelPing(lifecycleManager.getShutdownThrowable()); 447 // Report status to the application layer for any open streams 448 connection().forEachActiveStream(new Http2StreamVisitor() { 449 @Override 450 public boolean visit(Http2Stream stream) throws Http2Exception { 451 NettyClientStream.TransportState clientStream = clientStream(stream); 452 if (clientStream != null) { 453 clientStream.transportReportStatus(streamStatus, false, new Metadata()); 454 } 455 return true; 456 } 457 }); 458 } finally { 459 lifecycleManager.notifyTerminated(status); 460 } 461 } finally { 462 // Close any open streams 463 super.channelInactive(ctx); 464 if (keepAliveManager != null) { 465 keepAliveManager.onTransportTermination(); 466 } 467 } 468 } 469 470 @Override handleProtocolNegotiationCompleted( Attributes attributes, InternalChannelz.Security securityInfo)471 public void handleProtocolNegotiationCompleted( 472 Attributes attributes, InternalChannelz.Security securityInfo) { 473 this.attributes = this.attributes.toBuilder().setAll(attributes).build(); 474 this.securityInfo = securityInfo; 475 super.handleProtocolNegotiationCompleted(attributes, securityInfo); 476 writeBufferingAndRemove(ctx().channel()); 477 } 478 writeBufferingAndRemove(Channel channel)479 static void writeBufferingAndRemove(Channel channel) { 480 checkNotNull(channel, "channel"); 481 ChannelHandlerContext handlerCtx = 482 channel.pipeline().context(WriteBufferingAndExceptionHandler.class); 483 if (handlerCtx == null) { 484 return; 485 } 486 ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx); 487 } 488 489 @Override getEagAttributes()490 public Attributes getEagAttributes() { 491 return eagAttributes; 492 } 493 494 @Override getAuthority()495 public String getAuthority() { 496 return authority; 497 } 498 getSecurityInfo()499 InternalChannelz.Security getSecurityInfo() { 500 return securityInfo; 501 } 502 503 @Override onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)504 protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 505 Http2Exception http2Ex) { 506 logger.log(Level.FINE, "Caught a connection error", cause); 507 lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause)); 508 // Parent class will shut down the Channel 509 super.onConnectionError(ctx, outbound, cause, http2Ex); 510 } 511 512 @Override onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception.StreamException http2Ex)513 protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, 514 Http2Exception.StreamException http2Ex) { 515 // Close the stream with a status that contains the cause. 516 NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId())); 517 if (stream != null) { 518 stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata()); 519 } else { 520 logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause); 521 } 522 523 // Delegate to the base class to send a RST_STREAM. 524 super.onStreamError(ctx, outbound, cause, http2Ex); 525 } 526 527 @Override isGracefulShutdownComplete()528 protected boolean isGracefulShutdownComplete() { 529 // Only allow graceful shutdown to complete after all pending streams have completed. 530 return super.isGracefulShutdownComplete() 531 && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0; 532 } 533 534 /** 535 * Attempts to create a new stream from the given command. If there are too many active streams, 536 * the creation request is queued. 537 */ createStream(CreateStreamCommand command, ChannelPromise promise)538 private void createStream(CreateStreamCommand command, ChannelPromise promise) 539 throws Exception { 540 if (lifecycleManager.getShutdownThrowable() != null) { 541 command.stream().setNonExistent(); 542 // The connection is going away (it is really the GOAWAY case), 543 // just terminate the stream now. 544 command.stream().transportReportStatus( 545 lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata()); 546 promise.setFailure(lifecycleManager.getShutdownThrowable()); 547 return; 548 } 549 550 // Get the stream ID for the new stream. 551 int streamId; 552 try { 553 streamId = incrementAndGetNextStreamId(); 554 } catch (StatusException e) { 555 command.stream().setNonExistent(); 556 // Stream IDs have been exhausted for this connection. Fail the promise immediately. 557 promise.setFailure(e); 558 559 // Initiate a graceful shutdown if we haven't already. 560 if (!connection().goAwaySent()) { 561 logger.fine("Stream IDs have been exhausted for this connection. " 562 + "Initiating graceful shutdown of the connection."); 563 lifecycleManager.notifyShutdown(e.getStatus()); 564 close(ctx(), ctx().newPromise()); 565 } 566 return; 567 } 568 if (connection().goAwayReceived()) { 569 Status s = abruptGoAwayStatus; 570 int maxActiveStreams = connection().local().maxActiveStreams(); 571 int lastStreamId = connection().local().lastStreamKnownByPeer(); 572 if (s == null) { 573 // Should be impossible, but handle pseudo-gracefully 574 s = Status.INTERNAL.withDescription( 575 "Failed due to abrupt GOAWAY, but can't find GOAWAY details"); 576 } else if (streamId > lastStreamId) { 577 s = s.augmentDescription( 578 "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId); 579 } else if (connection().local().numActiveStreams() == maxActiveStreams) { 580 s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams); 581 } 582 if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) { 583 // This should only be reachable during onGoAwayReceived, as otherwise 584 // getShutdownThrowable() != null 585 command.stream().setNonExistent(); 586 command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata()); 587 promise.setFailure(s.asRuntimeException()); 588 return; 589 } 590 } 591 592 NettyClientStream.TransportState stream = command.stream(); 593 Http2Headers headers = command.headers(); 594 stream.setId(streamId); 595 596 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) { 597 PerfMark.linkIn(command.getLink()); 598 PerfMark.attachTag(stream.tag()); 599 createStreamTraced( 600 streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise); 601 } 602 } 603 createStreamTraced( final int streamId, final NettyClientStream.TransportState stream, final Http2Headers headers, boolean isGet, final boolean shouldBeCountedForInUse, final ChannelPromise promise)604 private void createStreamTraced( 605 final int streamId, 606 final NettyClientStream.TransportState stream, 607 final Http2Headers headers, 608 boolean isGet, 609 final boolean shouldBeCountedForInUse, 610 final ChannelPromise promise) { 611 // Create an intermediate promise so that we can intercept the failure reported back to the 612 // application. 613 ChannelPromise tempPromise = ctx().newPromise(); 614 encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise) 615 .addListener(new ChannelFutureListener() { 616 @Override 617 public void operationComplete(ChannelFuture future) throws Exception { 618 if (future.isSuccess()) { 619 // The http2Stream will be null in case a stream buffered in the encoder 620 // was canceled via RST_STREAM. 621 Http2Stream http2Stream = connection().stream(streamId); 622 if (http2Stream != null) { 623 stream.getStatsTraceContext().clientOutboundHeaders(); 624 http2Stream.setProperty(streamKey, stream); 625 626 // This delays the in-use state until the I/O completes, which technically may 627 // be later than we would like. 628 if (shouldBeCountedForInUse) { 629 inUseState.updateObjectInUse(http2Stream, true); 630 } 631 632 // Attach the client stream to the HTTP/2 stream object as user data. 633 stream.setHttp2Stream(http2Stream); 634 } 635 // Otherwise, the stream has been cancelled and Netty is sending a 636 // RST_STREAM frame which causes it to purge pending writes from the 637 // flow-controller and delete the http2Stream. The stream listener has already 638 // been notified of cancellation so there is nothing to do. 639 640 // Just forward on the success status to the original promise. 641 promise.setSuccess(); 642 } else { 643 Throwable cause = future.cause(); 644 if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) { 645 StreamBufferingEncoder.Http2GoAwayException e = 646 (StreamBufferingEncoder.Http2GoAwayException) cause; 647 Status status = statusFromH2Error( 648 Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream", 649 e.errorCode(), e.debugData()); 650 cause = status.asRuntimeException(); 651 stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata()); 652 } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) { 653 Status status = lifecycleManager.getShutdownStatus(); 654 if (status == null) { 655 status = Status.UNAVAILABLE.withCause(cause) 656 .withDescription("Connection closed while stream is buffered"); 657 } 658 stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata()); 659 } 660 promise.setFailure(cause); 661 } 662 } 663 }); 664 } 665 666 /** 667 * Cancels this stream. 668 */ cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise)669 private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, 670 ChannelPromise promise) { 671 NettyClientStream.TransportState stream = cmd.stream(); 672 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) { 673 PerfMark.attachTag(stream.tag()); 674 PerfMark.linkIn(cmd.getLink()); 675 Status reason = cmd.reason(); 676 if (reason != null) { 677 stream.transportReportStatus(reason, true, new Metadata()); 678 } 679 if (!cmd.stream().isNonExistent()) { 680 encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); 681 } else { 682 promise.setSuccess(); 683 } 684 } 685 } 686 687 /** 688 * Sends the given GRPC frame for the stream. 689 */ sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)690 private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, 691 ChannelPromise promise) { 692 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) { 693 PerfMark.attachTag(cmd.stream().tag()); 694 PerfMark.linkIn(cmd.getLink()); 695 // Call the base class to write the HTTP/2 DATA frame. 696 // Note: no need to flush since this is handled by the outbound flow controller. 697 encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise); 698 } 699 } 700 sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise)701 private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, 702 ChannelPromise promise) { 703 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) { 704 PerfMark.linkIn(msg.getLink()); 705 sendPingFrameTraced(ctx, msg, promise); 706 } 707 } 708 709 /** 710 * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is 711 * registered to be called when the existing operation completes, and no new frame is sent. 712 */ sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise)713 private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg, 714 ChannelPromise promise) { 715 // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown 716 // but before termination. After termination, messages will no longer arrive because the 717 // pipeline clears all handlers on channel close. 718 719 PingCallback callback = msg.callback(); 720 Executor executor = msg.executor(); 721 // we only allow one outstanding ping at a time, so just add the callback to 722 // any outstanding operation 723 if (ping != null) { 724 promise.setSuccess(); 725 ping.addCallback(callback, executor); 726 return; 727 } 728 729 // Use a new promise to prevent calling the callback twice on write failure: here and in 730 // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if 731 // ping != null above. 732 promise.setSuccess(); 733 promise = ctx().newPromise(); 734 // set outstanding operation 735 long data = USER_PING_PAYLOAD; 736 Stopwatch stopwatch = stopwatchFactory.get(); 737 stopwatch.start(); 738 ping = new Http2Ping(data, stopwatch); 739 ping.addCallback(callback, executor); 740 // and then write the ping 741 encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise); 742 ctx.flush(); 743 final Http2Ping finalPing = ping; 744 promise.addListener(new ChannelFutureListener() { 745 @Override 746 public void operationComplete(ChannelFuture future) throws Exception { 747 if (future.isSuccess()) { 748 transportTracer.reportKeepAliveSent(); 749 } else { 750 Throwable cause = future.cause(); 751 if (cause instanceof ClosedChannelException) { 752 cause = lifecycleManager.getShutdownThrowable(); 753 if (cause == null) { 754 cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.") 755 .withCause(future.cause()).asException(); 756 } 757 } 758 finalPing.failed(cause); 759 if (ping == finalPing) { 760 ping = null; 761 } 762 } 763 } 764 }); 765 } 766 gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg, ChannelPromise promise)767 private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg, 768 ChannelPromise promise) throws Exception { 769 lifecycleManager.notifyShutdown(msg.getStatus()); 770 // Explicitly flush to create any buffered streams before sending GOAWAY. 771 // TODO(ejona): determine if the need to flush is a bug in Netty 772 flush(ctx); 773 close(ctx, promise); 774 } 775 forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)776 private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, 777 ChannelPromise promise) throws Exception { 778 connection().forEachActiveStream(new Http2StreamVisitor() { 779 @Override 780 public boolean visit(Http2Stream stream) throws Http2Exception { 781 NettyClientStream.TransportState clientStream = clientStream(stream); 782 Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag(); 783 try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) { 784 PerfMark.linkIn(msg.getLink()); 785 PerfMark.attachTag(tag); 786 if (clientStream != null) { 787 clientStream.transportReportStatus(msg.getStatus(), true, new Metadata()); 788 resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise()); 789 } 790 stream.close(); 791 return true; 792 } 793 } 794 }); 795 close(ctx, promise); 796 } 797 798 /** 799 * Handler for a GOAWAY being received. Fails any streams created after the 800 * last known stream. May only be called during a read. 801 */ goingAway(long errorCode, byte[] debugData)802 private void goingAway(long errorCode, byte[] debugData) { 803 Status finalStatus = statusFromH2Error( 804 Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData); 805 lifecycleManager.notifyGracefulShutdown(finalStatus); 806 abruptGoAwayStatus = statusFromH2Error( 807 Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData); 808 // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it 809 // fails streams due to HPACK failures (e.g., header list too large). To be more conservative, 810 // we assume any sent streams may be related to the GOAWAY. This should rarely impact users 811 // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if 812 // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to 813 // UNAVAILABLE. https://github.com/netty/netty/issues/10670 814 final Status abruptGoAwayStatusConservative = statusFromH2Error( 815 null, "Abrupt GOAWAY closed sent stream", errorCode, debugData); 816 final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code(); 817 // Try to allocate as many in-flight streams as possible, to reduce race window of 818 // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to 819 // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING 820 // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been 821 // processed and thus this processing must be in-line before processing additional reads. 822 823 // This can cause reentrancy, but should be minor since it is normal to handle writes in 824 // response to a read. Also, the call stack is rather shallow at this point 825 clientWriteQueue.drainNow(); 826 if (lifecycleManager.notifyShutdown(finalStatus)) { 827 // This is for the only RPCs that are actually covered by the GOAWAY error code. All other 828 // RPCs were not observed by the remote and so should be UNAVAILABLE. 829 channelInactiveReason = statusFromH2Error( 830 null, "Connection closed after GOAWAY", errorCode, debugData); 831 } 832 833 final int lastKnownStream = connection().local().lastStreamKnownByPeer(); 834 try { 835 connection().forEachActiveStream(new Http2StreamVisitor() { 836 @Override 837 public boolean visit(Http2Stream stream) throws Http2Exception { 838 if (stream.id() > lastKnownStream) { 839 NettyClientStream.TransportState clientStream = clientStream(stream); 840 if (clientStream != null) { 841 // RpcProgress _should_ be REFUSED, but are being conservative. See comment for 842 // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent 843 // retries, but only if something else caused a connection failure. 844 RpcProgress progress = mayBeHittingNettyBug 845 ? RpcProgress.PROCESSED 846 : RpcProgress.REFUSED; 847 clientStream.transportReportStatus( 848 abruptGoAwayStatusConservative, progress, false, new Metadata()); 849 } 850 stream.close(); 851 } 852 return true; 853 } 854 }); 855 } catch (Http2Exception e) { 856 throw new RuntimeException(e); 857 } 858 } 859 cancelPing(Throwable t)860 private void cancelPing(Throwable t) { 861 if (ping != null) { 862 ping.failed(t); 863 ping = null; 864 } 865 } 866 867 /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */ statusFromH2Error( Status.Code statusCode, String context, long errorCode, byte[] debugData)868 private Status statusFromH2Error( 869 Status.Code statusCode, String context, long errorCode, byte[] debugData) { 870 Status status = GrpcUtil.Http2Error.statusForCode(errorCode); 871 if (statusCode == null) { 872 statusCode = status.getCode(); 873 } 874 String debugString = ""; 875 if (debugData != null && debugData.length > 0) { 876 // If a debug message was provided, use it. 877 debugString = ", debug data: " + new String(debugData, UTF_8); 878 } 879 return statusCode.toStatus() 880 .withDescription(context + ". " + status.getDescription() + debugString); 881 } 882 883 /** 884 * Gets the client stream associated to the given HTTP/2 stream object. 885 */ clientStream(Http2Stream stream)886 private NettyClientStream.TransportState clientStream(Http2Stream stream) { 887 return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey); 888 } 889 incrementAndGetNextStreamId()890 private int incrementAndGetNextStreamId() throws StatusException { 891 int nextStreamId = connection().local().incrementAndGetNextStreamId(); 892 if (nextStreamId < 0) { 893 logger.fine("Stream IDs have been exhausted for this connection. " 894 + "Initiating graceful shutdown of the connection."); 895 throw EXHAUSTED_STREAMS_STATUS.asException(); 896 } 897 return nextStreamId; 898 } 899 requireHttp2Stream(int streamId)900 private Http2Stream requireHttp2Stream(int streamId) { 901 Http2Stream stream = connection().stream(streamId); 902 if (stream == null) { 903 // This should never happen. 904 throw new AssertionError("Stream does not exist: " + streamId); 905 } 906 return stream; 907 } 908 909 private class FrameListener extends Http2FrameAdapter { 910 private boolean firstSettings = true; 911 912 @Override onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)913 public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { 914 if (firstSettings) { 915 firstSettings = false; 916 lifecycleManager.notifyReady(); 917 } 918 } 919 920 @Override onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)921 public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, 922 boolean endOfStream) throws Http2Exception { 923 NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream); 924 return padding; 925 } 926 927 @Override onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)928 public void onHeadersRead(ChannelHandlerContext ctx, 929 int streamId, 930 Http2Headers headers, 931 int streamDependency, 932 short weight, 933 boolean exclusive, 934 int padding, 935 boolean endStream) throws Http2Exception { 936 NettyClientHandler.this.onHeadersRead(streamId, headers, endStream); 937 } 938 939 @Override onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)940 public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) 941 throws Http2Exception { 942 NettyClientHandler.this.onRstStreamRead(streamId, errorCode); 943 } 944 945 @Override onPingAckRead(ChannelHandlerContext ctx, long ackPayload)946 public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception { 947 Http2Ping p = ping; 948 if (ackPayload == flowControlPing().payload()) { 949 flowControlPing().updateWindow(); 950 logger.log(Level.FINE, "Window: {0}", 951 decoder().flowController().initialWindowSize(connection().connectionStream())); 952 } else if (p != null) { 953 if (p.payload() == ackPayload) { 954 p.complete(); 955 ping = null; 956 } else { 957 logger.log(Level.WARNING, 958 "Received unexpected ping ack. Expecting {0}, got {1}", 959 new Object[] {p.payload(), ackPayload}); 960 } 961 } else { 962 logger.warning("Received unexpected ping ack. No ping outstanding"); 963 } 964 if (keepAliveManager != null) { 965 keepAliveManager.onDataReceived(); 966 } 967 } 968 969 @Override onPingRead(ChannelHandlerContext ctx, long data)970 public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception { 971 if (keepAliveManager != null) { 972 keepAliveManager.onDataReceived(); 973 } 974 } 975 } 976 977 private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter 978 implements AbstractNettyHandler.PingLimiter { 979 private int pingCount; 980 PingCountingFrameWriter(Http2FrameWriter delegate)981 public PingCountingFrameWriter(Http2FrameWriter delegate) { 982 super(delegate); 983 } 984 985 @Override isPingAllowed()986 public boolean isPingAllowed() { 987 // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below. 988 return pingCount < 2; 989 } 990 991 @Override writeHeaders( ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)992 public ChannelFuture writeHeaders( 993 ChannelHandlerContext ctx, int streamId, Http2Headers headers, 994 int padding, boolean endStream, ChannelPromise promise) { 995 pingCount = 0; 996 return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise); 997 } 998 999 @Override writeHeaders( ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)1000 public ChannelFuture writeHeaders( 1001 ChannelHandlerContext ctx, int streamId, Http2Headers headers, 1002 int streamDependency, short weight, boolean exclusive, 1003 int padding, boolean endStream, ChannelPromise promise) { 1004 pingCount = 0; 1005 return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive, 1006 padding, endStream, promise); 1007 } 1008 1009 @Override writeWindowUpdate( ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise)1010 public ChannelFuture writeWindowUpdate( 1011 ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) { 1012 pingCount = 0; 1013 return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise); 1014 } 1015 1016 @Override writePing( ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise)1017 public ChannelFuture writePing( 1018 ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) { 1019 if (!ack) { 1020 pingCount++; 1021 } 1022 return super.writePing(ctx, ack, data, promise); 1023 } 1024 1025 @Override writeData( ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)1026 public ChannelFuture writeData( 1027 ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, 1028 ChannelPromise promise) { 1029 if (data.isReadable()) { 1030 pingCount = 0; 1031 } 1032 return super.writeData(ctx, streamId, data, padding, endStream, promise); 1033 } 1034 } 1035 } 1036