• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2019 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import io.grpc.Status;
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufUtil;
24 import io.netty.channel.ChannelDuplexHandler;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelHandler;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.util.ReferenceCountUtil;
31 import java.net.SocketAddress;
32 import java.util.ArrayDeque;
33 import java.util.Queue;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 
37 /**
38  * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
39  * {@link #failWrites(Throwable)} is called. This handler allows us to
40  * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
41  * i.e.  before it's active or the TLS Handshake is complete.
42  */
43 final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
44   private static final Logger logger =
45       Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName());
46 
47   private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
48   private final ChannelHandler next;
49   private boolean writing;
50   private boolean flushRequested;
51   private Throwable failCause;
52 
WriteBufferingAndExceptionHandler(ChannelHandler next)53   WriteBufferingAndExceptionHandler(ChannelHandler next) {
54     this.next = checkNotNull(next, "next");
55   }
56 
57   @Override
handlerAdded(ChannelHandlerContext ctx)58   public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
59     ctx.pipeline().addBefore(ctx.name(), null, next);
60     super.handlerAdded(ctx);
61     // kick off protocol negotiation.
62     ctx.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
63   }
64 
65   @Override
handlerRemoved(ChannelHandlerContext ctx)66   public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
67     if (!bufferedWrites.isEmpty()) {
68       Status status = Status.INTERNAL.withDescription("Buffer removed before draining writes");
69       failWrites(status.asRuntimeException());
70     }
71     super.handlerRemoved(ctx);
72   }
73 
74   /**
75    * If this channel becomes inactive, then notify all buffered writes that we failed.
76    */
77   @Override
channelInactive(ChannelHandlerContext ctx)78   public void channelInactive(ChannelHandlerContext ctx) {
79     Status status = Status.UNAVAILABLE.withDescription(
80         "Connection closed while performing protocol negotiation for " + ctx.pipeline().names());
81     failWrites(status.asRuntimeException());
82   }
83 
84   @Override
exceptionCaught(ChannelHandlerContext ctx, Throwable cause)85   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
86     assert cause != null;
87     Throwable previousFailure = failCause;
88     Status status = Utils.statusFromThrowable(cause)
89         .augmentDescription("Channel Pipeline: " + ctx.pipeline().names());
90     failWrites(status.asRuntimeException());
91     // Check to see if the channel is active and this is the first failure.  If a downstream
92     // handler triggers an exception in close(), avoid being reentrant.  This is not obviously
93     // correct, so here are the cases and how they are correctly handled:
94     // 1. !active, prev==null: the channel is inactive, no-op
95     // 2. !active, prev!=null: the channel is inactive, no-op
96     // 3.  active, prev==null: this is the first error, close
97     // 4a. active, prev!=null[channelInactive]: impossible, no-op
98     // 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close().
99     // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
100     // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
101     if (ctx.channel().isActive() && previousFailure == null) {
102       final class LogOnFailure implements ChannelFutureListener {
103         @Override
104         public void operationComplete(ChannelFuture future) {
105           if (!future.isSuccess()) {
106             logger.log(Level.FINE, "Failed closing channel", future.cause());
107           }
108         }
109       }
110 
111       ctx.close().addListener(new LogOnFailure());
112     }
113   }
114 
115   /**
116    * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
117    * called, or we have somehow failed. If we have already failed in the past, then the write
118    * will fail immediately.
119    */
120   @Override
121   @SuppressWarnings("FutureReturnValueIgnored")
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)122   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
123     if (failCause != null) {
124       promise.setFailure(failCause);
125       ReferenceCountUtil.release(msg);
126     } else {
127       // Do not special case GracefulServerCloseCommand, as we don't want to cause handshake
128       // failures.
129       if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) {
130         // No point in continuing negotiation
131         ctx.close();
132         // Still enqueue the command in case the HTTP/2 handler is already on the pipeline
133       }
134       bufferedWrites.add(new ChannelWrite(msg, promise));
135     }
136   }
137 
138   /**
139    * Connect failures do not show up as {@link #channelInactive} or {@link #exceptionCaught}, so
140    * it needs to be watched.
141    */
142   @Override
connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)143   public void connect(
144       ChannelHandlerContext ctx,
145       SocketAddress remoteAddress,
146       SocketAddress localAddress,
147       ChannelPromise promise) throws Exception {
148     final class ConnectListener implements ChannelFutureListener {
149       @Override
150       public void operationComplete(ChannelFuture future) {
151         if (!future.isSuccess()) {
152           failWrites(future.cause());
153         }
154       }
155     }
156 
157     super.connect(ctx, remoteAddress, localAddress, promise);
158     promise.addListener(new ConnectListener());
159   }
160 
161   @Override
channelRead(ChannelHandlerContext ctx, Object msg)162   public void channelRead(ChannelHandlerContext ctx, Object msg) {
163     try {
164       if (logger.isLoggable(Level.FINE)) {
165         Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
166         logger.log(
167             Level.FINE,
168             "Unexpected channelRead()->{0} reached end of pipeline {1}",
169             new Object[] {loggedMsg, ctx.pipeline().names()});
170       }
171       exceptionCaught(
172           ctx,
173           Status.INTERNAL.withDescription(
174               "channelRead() missed by ProtocolNegotiator handler: " + msg)
175               .asRuntimeException());
176     } finally {
177       ReferenceCountUtil.safeRelease(msg);
178     }
179   }
180 
181   /**
182    * Calls to this method will not trigger an immediate flush. The flush will be deferred until
183    * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
184    */
185   @Override
flush(ChannelHandlerContext ctx)186   public void flush(ChannelHandlerContext ctx) {
187     /*
188      * Swallowing any flushes is not only an optimization but also required
189      * for the SslHandler to work correctly. If the SslHandler receives multiple
190      * flushes while the handshake is still ongoing, then the handshake "randomly"
191      * times out. Not sure at this point why this is happening. Doing a single flush
192      * seems to work but multiple flushes don't ...
193      */
194     flushRequested = true;
195   }
196 
197   /**
198    * If we are still performing protocol negotiation, then this will propagate failures to all
199    * buffered writes.
200    */
201   @Override
close(ChannelHandlerContext ctx, ChannelPromise future)202   public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
203     Status status = Status.UNAVAILABLE.withDescription(
204         "Connection closing while performing protocol negotiation for " + ctx.pipeline().names());
205     failWrites(status.asRuntimeException());
206     super.close(ctx, future);
207   }
208 
209   @SuppressWarnings("FutureReturnValueIgnored")
writeBufferedAndRemove(ChannelHandlerContext ctx)210   final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
211     // TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready.
212     if (!ctx.channel().isActive() || writing) {
213       return;
214     }
215     // Make sure that method can't be reentered, so that the ordering
216     // in the queue can't be messed up.
217     writing = true;
218     while (!bufferedWrites.isEmpty()) {
219       ChannelWrite write = bufferedWrites.poll();
220       ctx.write(write.msg, write.promise);
221     }
222     if (flushRequested) {
223       ctx.flush();
224     }
225     // Removal has to happen last as the above writes will likely trigger
226     // new writes that have to be added to the end of queue in order to not
227     // mess up the ordering.
228     ctx.pipeline().remove(this);
229   }
230 
231   /**
232    * Propagate failures to all buffered writes.
233    */
234   @SuppressWarnings("FutureReturnValueIgnored")
failWrites(Throwable cause)235   private void failWrites(Throwable cause) {
236     if (failCause == null) {
237       failCause = cause;
238     } else {
239       logger.log(Level.FINE, "Ignoring duplicate failure", cause);
240     }
241     while (!bufferedWrites.isEmpty()) {
242       ChannelWrite write = bufferedWrites.poll();
243       write.promise.setFailure(cause);
244       ReferenceCountUtil.release(write.msg);
245     }
246   }
247 
248   private static final class ChannelWrite {
249     final Object msg;
250     final ChannelPromise promise;
251 
ChannelWrite(Object msg, ChannelPromise promise)252     ChannelWrite(Object msg, ChannelPromise promise) {
253       this.msg = msg;
254       this.promise = promise;
255     }
256   }
257 }
258