• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static io.netty.handler.codec.http2.DefaultHttp2LocalFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
20 import static io.netty.util.CharsetUtil.UTF_8;
21 import static io.netty.util.internal.ObjectUtil.checkNotNull;
22 
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Stopwatch;
26 import com.google.common.base.Supplier;
27 import com.google.common.base.Ticker;
28 import io.grpc.Attributes;
29 import io.grpc.ChannelLogger;
30 import io.grpc.InternalChannelz;
31 import io.grpc.Metadata;
32 import io.grpc.Status;
33 import io.grpc.StatusException;
34 import io.grpc.internal.ClientStreamListener.RpcProgress;
35 import io.grpc.internal.ClientTransport.PingCallback;
36 import io.grpc.internal.GrpcAttributes;
37 import io.grpc.internal.GrpcUtil;
38 import io.grpc.internal.Http2Ping;
39 import io.grpc.internal.InUseStateAggregator;
40 import io.grpc.internal.KeepAliveManager;
41 import io.grpc.internal.TransportTracer;
42 import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ClientHeadersDecoder;
43 import io.netty.buffer.ByteBuf;
44 import io.netty.buffer.ByteBufUtil;
45 import io.netty.buffer.Unpooled;
46 import io.netty.channel.Channel;
47 import io.netty.channel.ChannelFuture;
48 import io.netty.channel.ChannelFutureListener;
49 import io.netty.channel.ChannelHandlerContext;
50 import io.netty.channel.ChannelPromise;
51 import io.netty.handler.codec.http2.DecoratingHttp2FrameWriter;
52 import io.netty.handler.codec.http2.DefaultHttp2Connection;
53 import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
54 import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
55 import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
56 import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
57 import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
58 import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
59 import io.netty.handler.codec.http2.Http2CodecUtil;
60 import io.netty.handler.codec.http2.Http2Connection;
61 import io.netty.handler.codec.http2.Http2ConnectionAdapter;
62 import io.netty.handler.codec.http2.Http2ConnectionDecoder;
63 import io.netty.handler.codec.http2.Http2ConnectionEncoder;
64 import io.netty.handler.codec.http2.Http2Error;
65 import io.netty.handler.codec.http2.Http2Exception;
66 import io.netty.handler.codec.http2.Http2FrameAdapter;
67 import io.netty.handler.codec.http2.Http2FrameLogger;
68 import io.netty.handler.codec.http2.Http2FrameReader;
69 import io.netty.handler.codec.http2.Http2FrameWriter;
70 import io.netty.handler.codec.http2.Http2Headers;
71 import io.netty.handler.codec.http2.Http2HeadersDecoder;
72 import io.netty.handler.codec.http2.Http2InboundFrameLogger;
73 import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
74 import io.netty.handler.codec.http2.Http2Settings;
75 import io.netty.handler.codec.http2.Http2Stream;
76 import io.netty.handler.codec.http2.Http2StreamVisitor;
77 import io.netty.handler.codec.http2.StreamBufferingEncoder;
78 import io.netty.handler.codec.http2.WeightedFairQueueByteDistributor;
79 import io.netty.handler.logging.LogLevel;
80 import io.perfmark.PerfMark;
81 import io.perfmark.Tag;
82 import io.perfmark.TaskCloseable;
83 import java.nio.channels.ClosedChannelException;
84 import java.util.concurrent.Executor;
85 import java.util.logging.Level;
86 import java.util.logging.Logger;
87 import javax.annotation.Nullable;
88 
89 /**
90  * Client-side Netty handler for GRPC processing. All event handlers are executed entirely within
91  * the context of the Netty Channel thread.
92  */
93 class NettyClientHandler extends AbstractNettyHandler {
94   private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
95 
96   /**
97    * A message that simply passes through the channel without any real processing. It is useful to
98    * check if buffers have been drained and test the health of the channel in a single operation.
99    */
100   static final Object NOOP_MESSAGE = new Object();
101 
102   /**
103    * Status used when the transport has exhausted the number of streams.
104    */
105   private static final Status EXHAUSTED_STREAMS_STATUS =
106           Status.UNAVAILABLE.withDescription("Stream IDs have been exhausted");
107   private static final long USER_PING_PAYLOAD = 1111;
108 
109   private final Http2Connection.PropertyKey streamKey;
110   private final ClientTransportLifecycleManager lifecycleManager;
111   private final KeepAliveManager keepAliveManager;
112   // Returns new unstarted stopwatches
113   private final Supplier<Stopwatch> stopwatchFactory;
114   private final TransportTracer transportTracer;
115   private final Attributes eagAttributes;
116   private final String authority;
117   private final InUseStateAggregator<Http2Stream> inUseState =
118       new InUseStateAggregator<Http2Stream>() {
119         @Override
120         protected void handleInUse() {
121           lifecycleManager.notifyInUse(true);
122         }
123 
124         @Override
125         protected void handleNotInUse() {
126           lifecycleManager.notifyInUse(false);
127         }
128       };
129 
130   private WriteQueue clientWriteQueue;
131   private Http2Ping ping;
132   private Attributes attributes;
133   private InternalChannelz.Security securityInfo;
134   private Status abruptGoAwayStatus;
135   private Status channelInactiveReason;
136 
newHandler( ClientTransportLifecycleManager lifecycleManager, @Nullable KeepAliveManager keepAliveManager, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, Ticker ticker)137   static NettyClientHandler newHandler(
138       ClientTransportLifecycleManager lifecycleManager,
139       @Nullable KeepAliveManager keepAliveManager,
140       boolean autoFlowControl,
141       int flowControlWindow,
142       int maxHeaderListSize,
143       Supplier<Stopwatch> stopwatchFactory,
144       Runnable tooManyPingsRunnable,
145       TransportTracer transportTracer,
146       Attributes eagAttributes,
147       String authority,
148       ChannelLogger negotiationLogger,
149       Ticker ticker) {
150     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
151     Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
152     Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
153     Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
154     Http2Connection connection = new DefaultHttp2Connection(false);
155     WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);
156     dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.
157     DefaultHttp2RemoteFlowController controller =
158         new DefaultHttp2RemoteFlowController(connection, dist);
159     connection.remote().flowController(controller);
160 
161     return newHandler(
162         connection,
163         frameReader,
164         frameWriter,
165         lifecycleManager,
166         keepAliveManager,
167         autoFlowControl,
168         flowControlWindow,
169         maxHeaderListSize,
170         stopwatchFactory,
171         tooManyPingsRunnable,
172         transportTracer,
173         eagAttributes,
174         authority,
175         negotiationLogger,
176         ticker);
177   }
178 
179   @VisibleForTesting
newHandler( final Http2Connection connection, Http2FrameReader frameReader, Http2FrameWriter frameWriter, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, boolean autoFlowControl, int flowControlWindow, int maxHeaderListSize, Supplier<Stopwatch> stopwatchFactory, Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, ChannelLogger negotiationLogger, Ticker ticker)180   static NettyClientHandler newHandler(
181       final Http2Connection connection,
182       Http2FrameReader frameReader,
183       Http2FrameWriter frameWriter,
184       ClientTransportLifecycleManager lifecycleManager,
185       KeepAliveManager keepAliveManager,
186       boolean autoFlowControl,
187       int flowControlWindow,
188       int maxHeaderListSize,
189       Supplier<Stopwatch> stopwatchFactory,
190       Runnable tooManyPingsRunnable,
191       TransportTracer transportTracer,
192       Attributes eagAttributes,
193       String authority,
194       ChannelLogger negotiationLogger,
195       Ticker ticker) {
196     Preconditions.checkNotNull(connection, "connection");
197     Preconditions.checkNotNull(frameReader, "frameReader");
198     Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
199     Preconditions.checkArgument(flowControlWindow > 0, "flowControlWindow must be positive");
200     Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
201     Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
202     Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
203     Preconditions.checkNotNull(eagAttributes, "eagAttributes");
204     Preconditions.checkNotNull(authority, "authority");
205 
206     Http2FrameLogger frameLogger = new Http2FrameLogger(LogLevel.DEBUG, NettyClientHandler.class);
207     frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
208     frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
209 
210     PingCountingFrameWriter pingCounter;
211     frameWriter = pingCounter = new PingCountingFrameWriter(frameWriter);
212 
213     StreamBufferingEncoder encoder =
214         new StreamBufferingEncoder(
215             new DefaultHttp2ConnectionEncoder(connection, frameWriter));
216 
217     // Create the local flow controller configured to auto-refill the connection window.
218     connection.local().flowController(
219         new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));
220 
221     Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,
222         frameReader);
223 
224     transportTracer.setFlowControlWindowReader(new Utils.FlowControlReader(connection));
225 
226     Http2Settings settings = new Http2Settings();
227     settings.pushEnabled(false);
228     settings.initialWindowSize(flowControlWindow);
229     settings.maxConcurrentStreams(0);
230     settings.maxHeaderListSize(maxHeaderListSize);
231 
232     return new NettyClientHandler(
233         decoder,
234         encoder,
235         settings,
236         negotiationLogger,
237         lifecycleManager,
238         keepAliveManager,
239         stopwatchFactory,
240         tooManyPingsRunnable,
241         transportTracer,
242         eagAttributes,
243         authority,
244         autoFlowControl,
245         pingCounter,
246         ticker);
247   }
248 
NettyClientHandler( Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings settings, ChannelLogger negotiationLogger, ClientTransportLifecycleManager lifecycleManager, KeepAliveManager keepAliveManager, Supplier<Stopwatch> stopwatchFactory, final Runnable tooManyPingsRunnable, TransportTracer transportTracer, Attributes eagAttributes, String authority, boolean autoFlowControl, PingLimiter pingLimiter, Ticker ticker)249   private NettyClientHandler(
250       Http2ConnectionDecoder decoder,
251       Http2ConnectionEncoder encoder,
252       Http2Settings settings,
253       ChannelLogger negotiationLogger,
254       ClientTransportLifecycleManager lifecycleManager,
255       KeepAliveManager keepAliveManager,
256       Supplier<Stopwatch> stopwatchFactory,
257       final Runnable tooManyPingsRunnable,
258       TransportTracer transportTracer,
259       Attributes eagAttributes,
260       String authority,
261       boolean autoFlowControl,
262       PingLimiter pingLimiter,
263       Ticker ticker) {
264     super(/* channelUnused= */ null, decoder, encoder, settings,
265         negotiationLogger, autoFlowControl, pingLimiter, ticker);
266     this.lifecycleManager = lifecycleManager;
267     this.keepAliveManager = keepAliveManager;
268     this.stopwatchFactory = stopwatchFactory;
269     this.transportTracer = Preconditions.checkNotNull(transportTracer);
270     this.eagAttributes = eagAttributes;
271     this.authority = authority;
272     this.attributes = Attributes.newBuilder()
273         .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttributes).build();
274 
275     // Set the frame listener on the decoder.
276     decoder().frameListener(new FrameListener());
277 
278     Http2Connection connection = encoder.connection();
279     streamKey = connection.newKey();
280 
281     connection.addListener(new Http2ConnectionAdapter() {
282       @Override
283       public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
284         byte[] debugDataBytes = ByteBufUtil.getBytes(debugData);
285         goingAway(errorCode, debugDataBytes);
286         if (errorCode == Http2Error.ENHANCE_YOUR_CALM.code()) {
287           String data = new String(debugDataBytes, UTF_8);
288           logger.log(
289               Level.WARNING, "Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: {0}", data);
290           if ("too_many_pings".equals(data)) {
291             tooManyPingsRunnable.run();
292           }
293         }
294       }
295 
296       @Override
297       public void onStreamActive(Http2Stream stream) {
298         if (connection().numActiveStreams() == 1
299             && NettyClientHandler.this.keepAliveManager != null) {
300           NettyClientHandler.this.keepAliveManager.onTransportActive();
301         }
302       }
303 
304       @Override
305       public void onStreamClosed(Http2Stream stream) {
306         // Although streams with CALL_OPTIONS_RPC_OWNED_BY_BALANCER are not marked as "in-use" in
307         // the first place, we don't propagate that option here, and it's safe to reset the in-use
308         // state for them, which will be a cheap no-op.
309         inUseState.updateObjectInUse(stream, false);
310         if (connection().numActiveStreams() == 0
311             && NettyClientHandler.this.keepAliveManager != null) {
312           NettyClientHandler.this.keepAliveManager.onTransportIdle();
313         }
314       }
315     });
316   }
317 
318   /**
319    * The protocol negotiation attributes, available once the protocol negotiation completes;
320    * otherwise returns {@code Attributes.EMPTY}.
321    */
getAttributes()322   Attributes getAttributes() {
323     return attributes;
324   }
325 
326   /**
327    * Handler for commands sent from the stream.
328    */
329   @Override
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)330   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
331           throws Exception {
332     if (msg instanceof CreateStreamCommand) {
333       createStream((CreateStreamCommand) msg, promise);
334     } else if (msg instanceof SendGrpcFrameCommand) {
335       sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
336     } else if (msg instanceof CancelClientStreamCommand) {
337       cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
338     } else if (msg instanceof SendPingCommand) {
339       sendPingFrame(ctx, (SendPingCommand) msg, promise);
340     } else if (msg instanceof GracefulCloseCommand) {
341       gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
342     } else if (msg instanceof ForcefulCloseCommand) {
343       forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
344     } else if (msg == NOOP_MESSAGE) {
345       ctx.write(Unpooled.EMPTY_BUFFER, promise);
346     } else {
347       throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
348     }
349   }
350 
startWriteQueue(Channel channel)351   void startWriteQueue(Channel channel) {
352     clientWriteQueue = new WriteQueue(channel);
353   }
354 
getWriteQueue()355   WriteQueue getWriteQueue() {
356     return clientWriteQueue;
357   }
358 
getLifecycleManager()359   ClientTransportLifecycleManager getLifecycleManager() {
360     return lifecycleManager;
361   }
362 
363   /**
364    * Returns the given processed bytes back to inbound flow control.
365    */
returnProcessedBytes(Http2Stream stream, int bytes)366   void returnProcessedBytes(Http2Stream stream, int bytes) {
367     try {
368       decoder().flowController().consumeBytes(stream, bytes);
369     } catch (Http2Exception e) {
370       throw new RuntimeException(e);
371     }
372   }
373 
onHeadersRead(int streamId, Http2Headers headers, boolean endStream)374   private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
375     // Stream 1 is reserved for the Upgrade response, so we should ignore its headers here:
376     if (streamId != Http2CodecUtil.HTTP_UPGRADE_STREAM_ID) {
377       NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
378       PerfMark.event("NettyClientHandler.onHeadersRead", stream.tag());
379       stream.transportHeadersReceived(headers, endStream);
380     }
381 
382     if (keepAliveManager != null) {
383       keepAliveManager.onDataReceived();
384     }
385   }
386 
387   /**
388    * Handler for an inbound HTTP/2 DATA frame.
389    */
onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream)390   private void onDataRead(int streamId, ByteBuf data, int padding, boolean endOfStream) {
391     flowControlPing().onDataRead(data.readableBytes(), padding);
392     NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
393     PerfMark.event("NettyClientHandler.onDataRead", stream.tag());
394     stream.transportDataReceived(data, endOfStream);
395     if (keepAliveManager != null) {
396       keepAliveManager.onDataReceived();
397     }
398   }
399 
400   /**
401    * Handler for an inbound HTTP/2 RST_STREAM frame, terminating a stream.
402    */
onRstStreamRead(int streamId, long errorCode)403   private void onRstStreamRead(int streamId, long errorCode) {
404     NettyClientStream.TransportState stream = clientStream(connection().stream(streamId));
405     if (stream != null) {
406       PerfMark.event("NettyClientHandler.onRstStreamRead", stream.tag());
407       Status status = statusFromH2Error(null, "RST_STREAM closed stream", errorCode, null);
408       stream.transportReportStatus(
409           status,
410           errorCode == Http2Error.REFUSED_STREAM.code()
411               ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
412           false /*stop delivery*/,
413           new Metadata());
414       if (keepAliveManager != null) {
415         keepAliveManager.onDataReceived();
416       }
417     }
418   }
419 
420   @Override
close(ChannelHandlerContext ctx, ChannelPromise promise)421   public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
422     logger.fine("Network channel being closed by the application.");
423     if (ctx.channel().isActive()) { // Ignore notification that the socket was closed
424       lifecycleManager.notifyShutdown(
425           Status.UNAVAILABLE.withDescription("Transport closed for unknown reason"));
426     }
427     super.close(ctx, promise);
428   }
429 
430   /**
431    * Handler for the Channel shutting down.
432    */
433   @Override
channelInactive(ChannelHandlerContext ctx)434   public void channelInactive(ChannelHandlerContext ctx) throws Exception {
435     try {
436       logger.fine("Network channel is closed");
437       Status status = Status.UNAVAILABLE.withDescription("Network closed for unknown reason");
438       lifecycleManager.notifyShutdown(status);
439       final Status streamStatus;
440       if (channelInactiveReason != null) {
441         streamStatus = channelInactiveReason;
442       } else {
443         streamStatus = lifecycleManager.getShutdownStatus();
444       }
445       try {
446         cancelPing(lifecycleManager.getShutdownThrowable());
447         // Report status to the application layer for any open streams
448         connection().forEachActiveStream(new Http2StreamVisitor() {
449           @Override
450           public boolean visit(Http2Stream stream) throws Http2Exception {
451             NettyClientStream.TransportState clientStream = clientStream(stream);
452             if (clientStream != null) {
453               clientStream.transportReportStatus(streamStatus, false, new Metadata());
454             }
455             return true;
456           }
457         });
458       } finally {
459         lifecycleManager.notifyTerminated(status);
460       }
461     } finally {
462       // Close any open streams
463       super.channelInactive(ctx);
464       if (keepAliveManager != null) {
465         keepAliveManager.onTransportTermination();
466       }
467     }
468   }
469 
470   @Override
handleProtocolNegotiationCompleted( Attributes attributes, InternalChannelz.Security securityInfo)471   public void handleProtocolNegotiationCompleted(
472       Attributes attributes, InternalChannelz.Security securityInfo) {
473     this.attributes = this.attributes.toBuilder().setAll(attributes).build();
474     this.securityInfo = securityInfo;
475     super.handleProtocolNegotiationCompleted(attributes, securityInfo);
476     writeBufferingAndRemove(ctx().channel());
477   }
478 
writeBufferingAndRemove(Channel channel)479   static void writeBufferingAndRemove(Channel channel) {
480     checkNotNull(channel, "channel");
481     ChannelHandlerContext handlerCtx =
482         channel.pipeline().context(WriteBufferingAndExceptionHandler.class);
483     if (handlerCtx == null) {
484       return;
485     }
486     ((WriteBufferingAndExceptionHandler) handlerCtx.handler()).writeBufferedAndRemove(handlerCtx);
487   }
488 
489   @Override
getEagAttributes()490   public Attributes getEagAttributes() {
491     return eagAttributes;
492   }
493 
494   @Override
getAuthority()495   public String getAuthority() {
496     return authority;
497   }
498 
getSecurityInfo()499   InternalChannelz.Security getSecurityInfo() {
500     return securityInfo;
501   }
502 
503   @Override
onConnectionError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception http2Ex)504   protected void onConnectionError(ChannelHandlerContext ctx,  boolean outbound, Throwable cause,
505       Http2Exception http2Ex) {
506     logger.log(Level.FINE, "Caught a connection error", cause);
507     lifecycleManager.notifyShutdown(Utils.statusFromThrowable(cause));
508     // Parent class will shut down the Channel
509     super.onConnectionError(ctx, outbound, cause, http2Ex);
510   }
511 
512   @Override
onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause, Http2Exception.StreamException http2Ex)513   protected void onStreamError(ChannelHandlerContext ctx, boolean outbound, Throwable cause,
514       Http2Exception.StreamException http2Ex) {
515     // Close the stream with a status that contains the cause.
516     NettyClientStream.TransportState stream = clientStream(connection().stream(http2Ex.streamId()));
517     if (stream != null) {
518       stream.transportReportStatus(Utils.statusFromThrowable(cause), false, new Metadata());
519     } else {
520       logger.log(Level.FINE, "Stream error for unknown stream " + http2Ex.streamId(), cause);
521     }
522 
523     // Delegate to the base class to send a RST_STREAM.
524     super.onStreamError(ctx, outbound, cause, http2Ex);
525   }
526 
527   @Override
isGracefulShutdownComplete()528   protected boolean isGracefulShutdownComplete() {
529     // Only allow graceful shutdown to complete after all pending streams have completed.
530     return super.isGracefulShutdownComplete()
531         && ((StreamBufferingEncoder) encoder()).numBufferedStreams() == 0;
532   }
533 
534   /**
535    * Attempts to create a new stream from the given command. If there are too many active streams,
536    * the creation request is queued.
537    */
createStream(CreateStreamCommand command, ChannelPromise promise)538   private void createStream(CreateStreamCommand command, ChannelPromise promise)
539           throws Exception {
540     if (lifecycleManager.getShutdownThrowable() != null) {
541       command.stream().setNonExistent();
542       // The connection is going away (it is really the GOAWAY case),
543       // just terminate the stream now.
544       command.stream().transportReportStatus(
545           lifecycleManager.getShutdownStatus(), RpcProgress.MISCARRIED, true, new Metadata());
546       promise.setFailure(lifecycleManager.getShutdownThrowable());
547       return;
548     }
549 
550     // Get the stream ID for the new stream.
551     int streamId;
552     try {
553       streamId = incrementAndGetNextStreamId();
554     } catch (StatusException e) {
555       command.stream().setNonExistent();
556       // Stream IDs have been exhausted for this connection. Fail the promise immediately.
557       promise.setFailure(e);
558 
559       // Initiate a graceful shutdown if we haven't already.
560       if (!connection().goAwaySent()) {
561         logger.fine("Stream IDs have been exhausted for this connection. "
562                 + "Initiating graceful shutdown of the connection.");
563         lifecycleManager.notifyShutdown(e.getStatus());
564         close(ctx(), ctx().newPromise());
565       }
566       return;
567     }
568     if (connection().goAwayReceived()) {
569       Status s = abruptGoAwayStatus;
570       int maxActiveStreams = connection().local().maxActiveStreams();
571       int lastStreamId = connection().local().lastStreamKnownByPeer();
572       if (s == null) {
573         // Should be impossible, but handle pseudo-gracefully
574         s = Status.INTERNAL.withDescription(
575             "Failed due to abrupt GOAWAY, but can't find GOAWAY details");
576       } else if (streamId > lastStreamId) {
577         s = s.augmentDescription(
578             "stream id: " + streamId + ", GOAWAY Last-Stream-ID:" + lastStreamId);
579       } else if (connection().local().numActiveStreams() == maxActiveStreams) {
580         s = s.augmentDescription("At MAX_CONCURRENT_STREAMS limit. limit: " + maxActiveStreams);
581       }
582       if (streamId > lastStreamId || connection().local().numActiveStreams() == maxActiveStreams) {
583         // This should only be reachable during onGoAwayReceived, as otherwise
584         // getShutdownThrowable() != null
585         command.stream().setNonExistent();
586         command.stream().transportReportStatus(s, RpcProgress.MISCARRIED, true, new Metadata());
587         promise.setFailure(s.asRuntimeException());
588         return;
589       }
590     }
591 
592     NettyClientStream.TransportState stream = command.stream();
593     Http2Headers headers = command.headers();
594     stream.setId(streamId);
595 
596     try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.createStream")) {
597       PerfMark.linkIn(command.getLink());
598       PerfMark.attachTag(stream.tag());
599       createStreamTraced(
600           streamId, stream, headers, command.isGet(), command.shouldBeCountedForInUse(), promise);
601     }
602   }
603 
createStreamTraced( final int streamId, final NettyClientStream.TransportState stream, final Http2Headers headers, boolean isGet, final boolean shouldBeCountedForInUse, final ChannelPromise promise)604   private void createStreamTraced(
605       final int streamId,
606       final NettyClientStream.TransportState stream,
607       final Http2Headers headers,
608       boolean isGet,
609       final boolean shouldBeCountedForInUse,
610       final ChannelPromise promise) {
611     // Create an intermediate promise so that we can intercept the failure reported back to the
612     // application.
613     ChannelPromise tempPromise = ctx().newPromise();
614     encoder().writeHeaders(ctx(), streamId, headers, 0, isGet, tempPromise)
615         .addListener(new ChannelFutureListener() {
616           @Override
617           public void operationComplete(ChannelFuture future) throws Exception {
618             if (future.isSuccess()) {
619               // The http2Stream will be null in case a stream buffered in the encoder
620               // was canceled via RST_STREAM.
621               Http2Stream http2Stream = connection().stream(streamId);
622               if (http2Stream != null) {
623                 stream.getStatsTraceContext().clientOutboundHeaders();
624                 http2Stream.setProperty(streamKey, stream);
625 
626                 // This delays the in-use state until the I/O completes, which technically may
627                 // be later than we would like.
628                 if (shouldBeCountedForInUse) {
629                   inUseState.updateObjectInUse(http2Stream, true);
630                 }
631 
632                 // Attach the client stream to the HTTP/2 stream object as user data.
633                 stream.setHttp2Stream(http2Stream);
634               }
635               // Otherwise, the stream has been cancelled and Netty is sending a
636               // RST_STREAM frame which causes it to purge pending writes from the
637               // flow-controller and delete the http2Stream. The stream listener has already
638               // been notified of cancellation so there is nothing to do.
639 
640               // Just forward on the success status to the original promise.
641               promise.setSuccess();
642             } else {
643               Throwable cause = future.cause();
644               if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
645                 StreamBufferingEncoder.Http2GoAwayException e =
646                     (StreamBufferingEncoder.Http2GoAwayException) cause;
647                 Status status = statusFromH2Error(
648                     Status.Code.UNAVAILABLE, "GOAWAY closed buffered stream",
649                     e.errorCode(), e.debugData());
650                 cause = status.asRuntimeException();
651                 stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
652               } else if (cause instanceof StreamBufferingEncoder.Http2ChannelClosedException) {
653                 Status status = lifecycleManager.getShutdownStatus();
654                 if (status == null) {
655                   status = Status.UNAVAILABLE.withCause(cause)
656                       .withDescription("Connection closed while stream is buffered");
657                 }
658                 stream.transportReportStatus(status, RpcProgress.MISCARRIED, true, new Metadata());
659               }
660               promise.setFailure(cause);
661             }
662           }
663         });
664   }
665 
666   /**
667    * Cancels this stream.
668    */
cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise)669   private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
670       ChannelPromise promise) {
671     NettyClientStream.TransportState stream = cmd.stream();
672     try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.cancelStream")) {
673       PerfMark.attachTag(stream.tag());
674       PerfMark.linkIn(cmd.getLink());
675       Status reason = cmd.reason();
676       if (reason != null) {
677         stream.transportReportStatus(reason, true, new Metadata());
678       }
679       if (!cmd.stream().isNonExistent()) {
680         encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
681       } else {
682         promise.setSuccess();
683       }
684     }
685   }
686 
687   /**
688    * Sends the given GRPC frame for the stream.
689    */
sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd, ChannelPromise promise)690   private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
691       ChannelPromise promise) {
692     try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendGrpcFrame")) {
693       PerfMark.attachTag(cmd.stream().tag());
694       PerfMark.linkIn(cmd.getLink());
695       // Call the base class to write the HTTP/2 DATA frame.
696       // Note: no need to flush since this is handled by the outbound flow controller.
697       encoder().writeData(ctx, cmd.stream().id(), cmd.content(), 0, cmd.endStream(), promise);
698     }
699   }
700 
sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise)701   private void sendPingFrame(ChannelHandlerContext ctx, SendPingCommand msg,
702       ChannelPromise promise) {
703     try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.sendPingFrame")) {
704       PerfMark.linkIn(msg.getLink());
705       sendPingFrameTraced(ctx, msg, promise);
706     }
707   }
708 
709   /**
710    * Sends a PING frame. If a ping operation is already outstanding, the callback in the message is
711    * registered to be called when the existing operation completes, and no new frame is sent.
712    */
sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg, ChannelPromise promise)713   private void sendPingFrameTraced(ChannelHandlerContext ctx, SendPingCommand msg,
714       ChannelPromise promise) {
715     // Don't check lifecycleManager.getShutdownStatus() since we want to allow pings after shutdown
716     // but before termination. After termination, messages will no longer arrive because the
717     // pipeline clears all handlers on channel close.
718 
719     PingCallback callback = msg.callback();
720     Executor executor = msg.executor();
721     // we only allow one outstanding ping at a time, so just add the callback to
722     // any outstanding operation
723     if (ping != null) {
724       promise.setSuccess();
725       ping.addCallback(callback, executor);
726       return;
727     }
728 
729     // Use a new promise to prevent calling the callback twice on write failure: here and in
730     // NettyClientTransport.ping(). It may appear strange, but it will behave the same as if
731     // ping != null above.
732     promise.setSuccess();
733     promise = ctx().newPromise();
734     // set outstanding operation
735     long data = USER_PING_PAYLOAD;
736     Stopwatch stopwatch = stopwatchFactory.get();
737     stopwatch.start();
738     ping = new Http2Ping(data, stopwatch);
739     ping.addCallback(callback, executor);
740     // and then write the ping
741     encoder().writePing(ctx, false, USER_PING_PAYLOAD, promise);
742     ctx.flush();
743     final Http2Ping finalPing = ping;
744     promise.addListener(new ChannelFutureListener() {
745       @Override
746       public void operationComplete(ChannelFuture future) throws Exception {
747         if (future.isSuccess()) {
748           transportTracer.reportKeepAliveSent();
749         } else {
750           Throwable cause = future.cause();
751           if (cause instanceof ClosedChannelException) {
752             cause = lifecycleManager.getShutdownThrowable();
753             if (cause == null) {
754               cause = Status.UNKNOWN.withDescription("Ping failed but for unknown reason.")
755                   .withCause(future.cause()).asException();
756             }
757           }
758           finalPing.failed(cause);
759           if (ping == finalPing) {
760             ping = null;
761           }
762         }
763       }
764     });
765   }
766 
gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg, ChannelPromise promise)767   private void gracefulClose(ChannelHandlerContext ctx, GracefulCloseCommand msg,
768       ChannelPromise promise) throws Exception {
769     lifecycleManager.notifyShutdown(msg.getStatus());
770     // Explicitly flush to create any buffered streams before sending GOAWAY.
771     // TODO(ejona): determine if the need to flush is a bug in Netty
772     flush(ctx);
773     close(ctx, promise);
774   }
775 
forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg, ChannelPromise promise)776   private void forcefulClose(final ChannelHandlerContext ctx, final ForcefulCloseCommand msg,
777       ChannelPromise promise) throws Exception {
778     connection().forEachActiveStream(new Http2StreamVisitor() {
779       @Override
780       public boolean visit(Http2Stream stream) throws Http2Exception {
781         NettyClientStream.TransportState clientStream = clientStream(stream);
782         Tag tag = clientStream != null ? clientStream.tag() : PerfMark.createTag();
783         try (TaskCloseable ignore = PerfMark.traceTask("NettyClientHandler.forcefulClose")) {
784           PerfMark.linkIn(msg.getLink());
785           PerfMark.attachTag(tag);
786           if (clientStream != null) {
787             clientStream.transportReportStatus(msg.getStatus(), true, new Metadata());
788             resetStream(ctx, stream.id(), Http2Error.CANCEL.code(), ctx.newPromise());
789           }
790           stream.close();
791           return true;
792         }
793       }
794     });
795     close(ctx, promise);
796   }
797 
798   /**
799    * Handler for a GOAWAY being received. Fails any streams created after the
800    * last known stream. May only be called during a read.
801    */
goingAway(long errorCode, byte[] debugData)802   private void goingAway(long errorCode, byte[] debugData) {
803     Status finalStatus = statusFromH2Error(
804         Status.Code.UNAVAILABLE, "GOAWAY shut down transport", errorCode, debugData);
805     lifecycleManager.notifyGracefulShutdown(finalStatus);
806     abruptGoAwayStatus = statusFromH2Error(
807         Status.Code.UNAVAILABLE, "Abrupt GOAWAY closed unsent stream", errorCode, debugData);
808     // While this _should_ be UNAVAILABLE, Netty uses the wrong stream id in the GOAWAY when it
809     // fails streams due to HPACK failures (e.g., header list too large). To be more conservative,
810     // we assume any sent streams may be related to the GOAWAY. This should rarely impact users
811     // since the main time servers should use abrupt GOAWAYs is if there is a protocol error, and if
812     // there wasn't a protocol error the error code was probably NO_ERROR which is mapped to
813     // UNAVAILABLE. https://github.com/netty/netty/issues/10670
814     final Status abruptGoAwayStatusConservative = statusFromH2Error(
815         null, "Abrupt GOAWAY closed sent stream", errorCode, debugData);
816     final boolean mayBeHittingNettyBug = errorCode != Http2Error.NO_ERROR.code();
817     // Try to allocate as many in-flight streams as possible, to reduce race window of
818     // https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
819     // gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
820     // after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
821     // processed and thus this processing must be in-line before processing additional reads.
822 
823     // This can cause reentrancy, but should be minor since it is normal to handle writes in
824     // response to a read. Also, the call stack is rather shallow at this point
825     clientWriteQueue.drainNow();
826     if (lifecycleManager.notifyShutdown(finalStatus)) {
827       // This is for the only RPCs that are actually covered by the GOAWAY error code. All other
828       // RPCs were not observed by the remote and so should be UNAVAILABLE.
829       channelInactiveReason = statusFromH2Error(
830           null, "Connection closed after GOAWAY", errorCode, debugData);
831     }
832 
833     final int lastKnownStream = connection().local().lastStreamKnownByPeer();
834     try {
835       connection().forEachActiveStream(new Http2StreamVisitor() {
836         @Override
837         public boolean visit(Http2Stream stream) throws Http2Exception {
838           if (stream.id() > lastKnownStream) {
839             NettyClientStream.TransportState clientStream = clientStream(stream);
840             if (clientStream != null) {
841               // RpcProgress _should_ be REFUSED, but are being conservative. See comment for
842               // abruptGoAwayStatusConservative. This does reduce our ability to perform transparent
843               // retries, but only if something else caused a connection failure.
844               RpcProgress progress = mayBeHittingNettyBug
845                   ? RpcProgress.PROCESSED
846                   : RpcProgress.REFUSED;
847               clientStream.transportReportStatus(
848                   abruptGoAwayStatusConservative, progress, false, new Metadata());
849             }
850             stream.close();
851           }
852           return true;
853         }
854       });
855     } catch (Http2Exception e) {
856       throw new RuntimeException(e);
857     }
858   }
859 
cancelPing(Throwable t)860   private void cancelPing(Throwable t) {
861     if (ping != null) {
862       ping.failed(t);
863       ping = null;
864     }
865   }
866 
867   /** If {@code statusCode} is non-null, it will be used instead of the http2 error code mapping. */
statusFromH2Error( Status.Code statusCode, String context, long errorCode, byte[] debugData)868   private Status statusFromH2Error(
869       Status.Code statusCode, String context, long errorCode, byte[] debugData) {
870     Status status = GrpcUtil.Http2Error.statusForCode(errorCode);
871     if (statusCode == null) {
872       statusCode = status.getCode();
873     }
874     String debugString = "";
875     if (debugData != null && debugData.length > 0) {
876       // If a debug message was provided, use it.
877       debugString = ", debug data: " + new String(debugData, UTF_8);
878     }
879     return statusCode.toStatus()
880         .withDescription(context + ". " + status.getDescription() + debugString);
881   }
882 
883   /**
884    * Gets the client stream associated to the given HTTP/2 stream object.
885    */
clientStream(Http2Stream stream)886   private NettyClientStream.TransportState clientStream(Http2Stream stream) {
887     return stream == null ? null : (NettyClientStream.TransportState) stream.getProperty(streamKey);
888   }
889 
incrementAndGetNextStreamId()890   private int incrementAndGetNextStreamId() throws StatusException {
891     int nextStreamId = connection().local().incrementAndGetNextStreamId();
892     if (nextStreamId < 0) {
893       logger.fine("Stream IDs have been exhausted for this connection. "
894               + "Initiating graceful shutdown of the connection.");
895       throw EXHAUSTED_STREAMS_STATUS.asException();
896     }
897     return nextStreamId;
898   }
899 
requireHttp2Stream(int streamId)900   private Http2Stream requireHttp2Stream(int streamId) {
901     Http2Stream stream = connection().stream(streamId);
902     if (stream == null) {
903       // This should never happen.
904       throw new AssertionError("Stream does not exist: " + streamId);
905     }
906     return stream;
907   }
908 
909   private class FrameListener extends Http2FrameAdapter {
910     private boolean firstSettings = true;
911 
912     @Override
onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)913     public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
914       if (firstSettings) {
915         firstSettings = false;
916         lifecycleManager.notifyReady();
917       }
918     }
919 
920     @Override
onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)921     public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
922         boolean endOfStream) throws Http2Exception {
923       NettyClientHandler.this.onDataRead(streamId, data, padding, endOfStream);
924       return padding;
925     }
926 
927     @Override
onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream)928     public void onHeadersRead(ChannelHandlerContext ctx,
929         int streamId,
930         Http2Headers headers,
931         int streamDependency,
932         short weight,
933         boolean exclusive,
934         int padding,
935         boolean endStream) throws Http2Exception {
936       NettyClientHandler.this.onHeadersRead(streamId, headers, endStream);
937     }
938 
939     @Override
onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)940     public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
941         throws Http2Exception {
942       NettyClientHandler.this.onRstStreamRead(streamId, errorCode);
943     }
944 
945     @Override
onPingAckRead(ChannelHandlerContext ctx, long ackPayload)946     public void onPingAckRead(ChannelHandlerContext ctx, long ackPayload) throws Http2Exception {
947       Http2Ping p = ping;
948       if (ackPayload == flowControlPing().payload()) {
949         flowControlPing().updateWindow();
950         logger.log(Level.FINE, "Window: {0}",
951             decoder().flowController().initialWindowSize(connection().connectionStream()));
952       } else if (p != null) {
953         if (p.payload() == ackPayload) {
954           p.complete();
955           ping = null;
956         } else {
957           logger.log(Level.WARNING,
958               "Received unexpected ping ack. Expecting {0}, got {1}",
959               new Object[] {p.payload(), ackPayload});
960         }
961       } else {
962         logger.warning("Received unexpected ping ack. No ping outstanding");
963       }
964       if (keepAliveManager != null) {
965         keepAliveManager.onDataReceived();
966       }
967     }
968 
969     @Override
onPingRead(ChannelHandlerContext ctx, long data)970     public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
971       if (keepAliveManager != null) {
972         keepAliveManager.onDataReceived();
973       }
974     }
975   }
976 
977   private static class PingCountingFrameWriter extends DecoratingHttp2FrameWriter
978       implements AbstractNettyHandler.PingLimiter {
979     private int pingCount;
980 
PingCountingFrameWriter(Http2FrameWriter delegate)981     public PingCountingFrameWriter(Http2FrameWriter delegate) {
982       super(delegate);
983     }
984 
985     @Override
isPingAllowed()986     public boolean isPingAllowed() {
987       // "3 strikes" may cause the server to complain, so we limit ourselves to 2 or below.
988       return pingCount < 2;
989     }
990 
991     @Override
writeHeaders( ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endStream, ChannelPromise promise)992     public ChannelFuture writeHeaders(
993         ChannelHandlerContext ctx, int streamId, Http2Headers headers,
994         int padding, boolean endStream, ChannelPromise promise) {
995       pingCount = 0;
996       return super.writeHeaders(ctx, streamId, headers, padding, endStream, promise);
997     }
998 
999     @Override
writeHeaders( ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endStream, ChannelPromise promise)1000     public ChannelFuture writeHeaders(
1001         ChannelHandlerContext ctx, int streamId, Http2Headers headers,
1002         int streamDependency, short weight, boolean exclusive,
1003         int padding, boolean endStream, ChannelPromise promise) {
1004       pingCount = 0;
1005       return super.writeHeaders(ctx, streamId, headers, streamDependency, weight, exclusive,
1006           padding, endStream, promise);
1007     }
1008 
1009     @Override
writeWindowUpdate( ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise)1010     public ChannelFuture writeWindowUpdate(
1011         ChannelHandlerContext ctx, int streamId, int windowSizeIncrement, ChannelPromise promise) {
1012       pingCount = 0;
1013       return super.writeWindowUpdate(ctx, streamId, windowSizeIncrement, promise);
1014     }
1015 
1016     @Override
writePing( ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise)1017     public ChannelFuture writePing(
1018         ChannelHandlerContext ctx, boolean ack, long data, ChannelPromise promise) {
1019       if (!ack) {
1020         pingCount++;
1021       }
1022       return super.writePing(ctx, ack, data, promise);
1023     }
1024 
1025     @Override
writeData( ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream, ChannelPromise promise)1026     public ChannelFuture writeData(
1027         ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endStream,
1028         ChannelPromise promise) {
1029       if (data.isReadable()) {
1030         pingCount = 0;
1031       }
1032       return super.writeData(ctx, streamId, data, padding, endStream, promise);
1033     }
1034   }
1035 }
1036