1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import io.grpc.Status; 22 import io.netty.buffer.ByteBuf; 23 import io.netty.buffer.ByteBufUtil; 24 import io.netty.channel.ChannelDuplexHandler; 25 import io.netty.channel.ChannelFuture; 26 import io.netty.channel.ChannelFutureListener; 27 import io.netty.channel.ChannelHandler; 28 import io.netty.channel.ChannelHandlerContext; 29 import io.netty.channel.ChannelPromise; 30 import io.netty.util.ReferenceCountUtil; 31 import java.net.SocketAddress; 32 import java.util.ArrayDeque; 33 import java.util.Queue; 34 import java.util.logging.Level; 35 import java.util.logging.Logger; 36 37 /** 38 * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or 39 * {@link #failWrites(Throwable)} is called. This handler allows us to 40 * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially 41 * i.e. before it's active or the TLS Handshake is complete. 42 */ 43 final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler { 44 private static final Logger logger = 45 Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName()); 46 47 private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>(); 48 private final ChannelHandler next; 49 private boolean writing; 50 private boolean flushRequested; 51 private Throwable failCause; 52 WriteBufferingAndExceptionHandler(ChannelHandler next)53 WriteBufferingAndExceptionHandler(ChannelHandler next) { 54 this.next = checkNotNull(next, "next"); 55 } 56 57 @Override handlerAdded(ChannelHandlerContext ctx)58 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 59 ctx.pipeline().addBefore(ctx.name(), null, next); 60 super.handlerAdded(ctx); 61 // kick off protocol negotiation. 62 ctx.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT); 63 } 64 65 @Override handlerRemoved(ChannelHandlerContext ctx)66 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 67 if (!bufferedWrites.isEmpty()) { 68 Status status = Status.INTERNAL.withDescription("Buffer removed before draining writes"); 69 failWrites(status.asRuntimeException()); 70 } 71 super.handlerRemoved(ctx); 72 } 73 74 /** 75 * If this channel becomes inactive, then notify all buffered writes that we failed. 76 */ 77 @Override channelInactive(ChannelHandlerContext ctx)78 public void channelInactive(ChannelHandlerContext ctx) { 79 Status status = Status.UNAVAILABLE.withDescription( 80 "Connection closed while performing protocol negotiation for " + ctx.pipeline().names()); 81 failWrites(status.asRuntimeException()); 82 } 83 84 @Override exceptionCaught(ChannelHandlerContext ctx, Throwable cause)85 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 86 assert cause != null; 87 Throwable previousFailure = failCause; 88 Status status = Utils.statusFromThrowable(cause) 89 .augmentDescription("Channel Pipeline: " + ctx.pipeline().names()); 90 failWrites(status.asRuntimeException()); 91 // Check to see if the channel is active and this is the first failure. If a downstream 92 // handler triggers an exception in close(), avoid being reentrant. This is not obviously 93 // correct, so here are the cases and how they are correctly handled: 94 // 1. !active, prev==null: the channel is inactive, no-op 95 // 2. !active, prev!=null: the channel is inactive, no-op 96 // 3. active, prev==null: this is the first error, close 97 // 4a. active, prev!=null[channelInactive]: impossible, no-op 98 // 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close(). 99 // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write. 100 // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect. 101 if (ctx.channel().isActive() && previousFailure == null) { 102 final class LogOnFailure implements ChannelFutureListener { 103 @Override 104 public void operationComplete(ChannelFuture future) { 105 if (!future.isSuccess()) { 106 logger.log(Level.FINE, "Failed closing channel", future.cause()); 107 } 108 } 109 } 110 111 ctx.close().addListener(new LogOnFailure()); 112 } 113 } 114 115 /** 116 * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is 117 * called, or we have somehow failed. If we have already failed in the past, then the write 118 * will fail immediately. 119 */ 120 @Override 121 @SuppressWarnings("FutureReturnValueIgnored") write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)122 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 123 if (failCause != null) { 124 promise.setFailure(failCause); 125 ReferenceCountUtil.release(msg); 126 } else { 127 // Do not special case GracefulServerCloseCommand, as we don't want to cause handshake 128 // failures. 129 if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) { 130 // No point in continuing negotiation 131 ctx.close(); 132 // Still enqueue the command in case the HTTP/2 handler is already on the pipeline 133 } 134 bufferedWrites.add(new ChannelWrite(msg, promise)); 135 } 136 } 137 138 /** 139 * Connect failures do not show up as {@link #channelInactive} or {@link #exceptionCaught}, so 140 * it needs to be watched. 141 */ 142 @Override connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)143 public void connect( 144 ChannelHandlerContext ctx, 145 SocketAddress remoteAddress, 146 SocketAddress localAddress, 147 ChannelPromise promise) throws Exception { 148 final class ConnectListener implements ChannelFutureListener { 149 @Override 150 public void operationComplete(ChannelFuture future) { 151 if (!future.isSuccess()) { 152 failWrites(future.cause()); 153 } 154 } 155 } 156 157 super.connect(ctx, remoteAddress, localAddress, promise); 158 promise.addListener(new ConnectListener()); 159 } 160 161 @Override channelRead(ChannelHandlerContext ctx, Object msg)162 public void channelRead(ChannelHandlerContext ctx, Object msg) { 163 try { 164 if (logger.isLoggable(Level.FINE)) { 165 Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg; 166 logger.log( 167 Level.FINE, 168 "Unexpected channelRead()->{0} reached end of pipeline {1}", 169 new Object[] {loggedMsg, ctx.pipeline().names()}); 170 } 171 exceptionCaught( 172 ctx, 173 Status.INTERNAL.withDescription( 174 "channelRead() missed by ProtocolNegotiator handler: " + msg) 175 .asRuntimeException()); 176 } finally { 177 ReferenceCountUtil.safeRelease(msg); 178 } 179 } 180 181 /** 182 * Calls to this method will not trigger an immediate flush. The flush will be deferred until 183 * {@link #writeBufferedAndRemove(ChannelHandlerContext)}. 184 */ 185 @Override flush(ChannelHandlerContext ctx)186 public void flush(ChannelHandlerContext ctx) { 187 /* 188 * Swallowing any flushes is not only an optimization but also required 189 * for the SslHandler to work correctly. If the SslHandler receives multiple 190 * flushes while the handshake is still ongoing, then the handshake "randomly" 191 * times out. Not sure at this point why this is happening. Doing a single flush 192 * seems to work but multiple flushes don't ... 193 */ 194 flushRequested = true; 195 } 196 197 /** 198 * If we are still performing protocol negotiation, then this will propagate failures to all 199 * buffered writes. 200 */ 201 @Override close(ChannelHandlerContext ctx, ChannelPromise future)202 public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { 203 Status status = Status.UNAVAILABLE.withDescription( 204 "Connection closing while performing protocol negotiation for " + ctx.pipeline().names()); 205 failWrites(status.asRuntimeException()); 206 super.close(ctx, future); 207 } 208 209 @SuppressWarnings("FutureReturnValueIgnored") writeBufferedAndRemove(ChannelHandlerContext ctx)210 final void writeBufferedAndRemove(ChannelHandlerContext ctx) { 211 // TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready. 212 if (!ctx.channel().isActive() || writing) { 213 return; 214 } 215 // Make sure that method can't be reentered, so that the ordering 216 // in the queue can't be messed up. 217 writing = true; 218 while (!bufferedWrites.isEmpty()) { 219 ChannelWrite write = bufferedWrites.poll(); 220 ctx.write(write.msg, write.promise); 221 } 222 if (flushRequested) { 223 ctx.flush(); 224 } 225 // Removal has to happen last as the above writes will likely trigger 226 // new writes that have to be added to the end of queue in order to not 227 // mess up the ordering. 228 ctx.pipeline().remove(this); 229 } 230 231 /** 232 * Propagate failures to all buffered writes. 233 */ 234 @SuppressWarnings("FutureReturnValueIgnored") failWrites(Throwable cause)235 private void failWrites(Throwable cause) { 236 if (failCause == null) { 237 failCause = cause; 238 } else { 239 logger.log(Level.FINE, "Ignoring duplicate failure", cause); 240 } 241 while (!bufferedWrites.isEmpty()) { 242 ChannelWrite write = bufferedWrites.poll(); 243 write.promise.setFailure(cause); 244 ReferenceCountUtil.release(write.msg); 245 } 246 } 247 248 private static final class ChannelWrite { 249 final Object msg; 250 final ChannelPromise promise; 251 ChannelWrite(Object msg, ChannelPromise promise)252 ChannelWrite(Object msg, ChannelPromise promise) { 253 this.msg = msg; 254 this.promise = promise; 255 } 256 } 257 } 258