• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import io.grpc.Attributes;
25 import io.grpc.Grpc;
26 import io.grpc.Internal;
27 import io.grpc.InternalChannelz;
28 import io.grpc.SecurityLevel;
29 import io.grpc.Status;
30 import io.grpc.internal.GrpcAttributes;
31 import io.grpc.internal.GrpcUtil;
32 import io.netty.channel.ChannelDuplexHandler;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelFutureListener;
35 import io.netty.channel.ChannelHandler;
36 import io.netty.channel.ChannelHandlerAdapter;
37 import io.netty.channel.ChannelHandlerContext;
38 import io.netty.channel.ChannelInboundHandler;
39 import io.netty.channel.ChannelInboundHandlerAdapter;
40 import io.netty.channel.ChannelPipeline;
41 import io.netty.channel.ChannelPromise;
42 import io.netty.handler.codec.http.DefaultHttpRequest;
43 import io.netty.handler.codec.http.HttpClientCodec;
44 import io.netty.handler.codec.http.HttpClientUpgradeHandler;
45 import io.netty.handler.codec.http.HttpMethod;
46 import io.netty.handler.codec.http.HttpVersion;
47 import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
48 import io.netty.handler.proxy.HttpProxyHandler;
49 import io.netty.handler.proxy.ProxyConnectionEvent;
50 import io.netty.handler.proxy.ProxyHandler;
51 import io.netty.handler.ssl.OpenSsl;
52 import io.netty.handler.ssl.OpenSslEngine;
53 import io.netty.handler.ssl.SslContext;
54 import io.netty.handler.ssl.SslHandler;
55 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
56 import io.netty.util.AsciiString;
57 import io.netty.util.ReferenceCountUtil;
58 import java.net.SocketAddress;
59 import java.net.URI;
60 import java.util.ArrayDeque;
61 import java.util.Arrays;
62 import java.util.Queue;
63 import java.util.logging.Level;
64 import java.util.logging.Logger;
65 import javax.annotation.Nullable;
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLParameters;
68 import javax.net.ssl.SSLSession;
69 
70 /**
71  * Common {@link ProtocolNegotiator}s used by gRPC.
72  */
73 @Internal
74 public final class ProtocolNegotiators {
75   private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName());
76 
ProtocolNegotiators()77   private ProtocolNegotiators() {
78   }
79 
80   /**
81    * Create a server plaintext handler for gRPC.
82    */
serverPlaintext()83   public static ProtocolNegotiator serverPlaintext() {
84     return new ProtocolNegotiator() {
85       @Override
86       public Handler newHandler(final GrpcHttp2ConnectionHandler handler) {
87         class PlaintextHandler extends ChannelHandlerAdapter implements Handler {
88           @Override
89           public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
90             // Set sttributes before replace to be sure we pass it before accepting any requests.
91             handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
92                 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
93                 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
94                 .build(),
95                 /*securityInfo=*/ null);
96             // Just replace this handler with the gRPC handler.
97             ctx.pipeline().replace(this, null, handler);
98           }
99 
100           @Override
101           public AsciiString scheme() {
102             return Utils.HTTP;
103           }
104         }
105 
106         return new PlaintextHandler();
107       }
108 
109       @Override
110       public void close() {}
111     };
112   }
113 
114   /**
115    * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
116    */
117   public static ProtocolNegotiator serverTls(final SslContext sslContext) {
118     Preconditions.checkNotNull(sslContext, "sslContext");
119     return new ProtocolNegotiator() {
120       @Override
121       public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
122         return new ServerTlsHandler(sslContext, handler);
123       }
124 
125       @Override
126       public void close() {}
127     };
128   }
129 
130   @VisibleForTesting
131   static final class ServerTlsHandler extends ChannelInboundHandlerAdapter
132       implements ProtocolNegotiator.Handler {
133     private final GrpcHttp2ConnectionHandler grpcHandler;
134     private final SslContext sslContext;
135 
136     ServerTlsHandler(SslContext sslContext, GrpcHttp2ConnectionHandler grpcHandler) {
137       this.sslContext = sslContext;
138       this.grpcHandler = grpcHandler;
139     }
140 
141     @Override
142     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
143       super.handlerAdded(ctx);
144 
145       SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
146       ctx.pipeline().addFirst(new SslHandler(sslEngine, false));
147     }
148 
149     @Override
150     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
151       fail(ctx, cause);
152     }
153 
154     @Override
155     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
156       if (evt instanceof SslHandshakeCompletionEvent) {
157         SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
158         if (handshakeEvent.isSuccess()) {
159           if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) {
160             SSLSession session = sslHandler(ctx.pipeline()).engine().getSession();
161             // Successfully negotiated the protocol.
162             // Notify about completion and pass down SSLSession in attributes.
163             grpcHandler.handleProtocolNegotiationCompleted(
164                 Attributes.newBuilder()
165                     .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
166                     .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
167                     .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
168                     .build(),
169                 new InternalChannelz.Security(new InternalChannelz.Tls(session)));
170             // Replace this handler with the GRPC handler.
171             ctx.pipeline().replace(this, null, grpcHandler);
172           } else {
173             fail(ctx, new Exception(
174                 "Failed protocol negotiation: Unable to find compatible protocol."));
175           }
176         } else {
177           fail(ctx, handshakeEvent.cause());
178         }
179       }
180       super.userEventTriggered(ctx, evt);
181     }
182 
183     private SslHandler sslHandler(ChannelPipeline pipeline) {
184       return pipeline.get(SslHandler.class);
185     }
186 
187     private void fail(ChannelHandlerContext ctx, Throwable exception) {
188       logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", exception);
189       ctx.close();
190     }
191 
192     @Override
193     public AsciiString scheme() {
194       return Utils.HTTPS;
195     }
196   }
197 
198   /**
199    * Returns a {@link ProtocolNegotiator} that does HTTP CONNECT proxy negotiation.
200    */
201   public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress,
202       final @Nullable String proxyUsername, final @Nullable String proxyPassword,
203       final ProtocolNegotiator negotiator) {
204     Preconditions.checkNotNull(proxyAddress, "proxyAddress");
205     Preconditions.checkNotNull(negotiator, "negotiator");
206     class ProxyNegotiator implements ProtocolNegotiator {
207       @Override
208       public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
209         HttpProxyHandler proxyHandler;
210         if (proxyUsername == null || proxyPassword == null) {
211           proxyHandler = new HttpProxyHandler(proxyAddress);
212         } else {
213           proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword);
214         }
215         return new BufferUntilProxyTunnelledHandler(
216             proxyHandler, negotiator.newHandler(http2Handler));
217       }
218 
219       // This method is not normally called, because we use httpProxy on a per-connection basis in
220       // NettyChannelBuilder. Instead, we expect `negotiator' to be closed by NettyTransportFactory.
221       @Override
222       public void close() {
223         negotiator.close();
224       }
225     }
226 
227     return new ProxyNegotiator();
228   }
229 
230   /**
231    * Buffers all writes until the HTTP CONNECT tunnel is established.
232    */
233   static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler
234       implements ProtocolNegotiator.Handler {
235     private final ProtocolNegotiator.Handler originalHandler;
236 
237     public BufferUntilProxyTunnelledHandler(
238         ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) {
239       super(proxyHandler, handler);
240       this.originalHandler = handler;
241     }
242 
243 
244     @Override
245     public AsciiString scheme() {
246       return originalHandler.scheme();
247     }
248 
249     @Override
250     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
251       if (evt instanceof ProxyConnectionEvent) {
252         writeBufferedAndRemove(ctx);
253       }
254       super.userEventTriggered(ctx, evt);
255     }
256 
257     @Override
258     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
259       fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy"));
260       super.channelInactive(ctx);
261     }
262 
263     @Override
264     public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
265       if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
266         fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy"));
267       }
268       super.close(ctx, future);
269     }
270   }
271 
272   /**
273    * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
274    * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
275    * may happen immediately, even before the TLS Handshake is complete.
276    */
277   public static ProtocolNegotiator tls(SslContext sslContext) {
278     return new TlsNegotiator(sslContext);
279   }
280 
281   @VisibleForTesting
282   static final class TlsNegotiator implements ProtocolNegotiator {
283     private final SslContext sslContext;
284 
285     TlsNegotiator(SslContext sslContext) {
286       this.sslContext = checkNotNull(sslContext, "sslContext");
287     }
288 
289     @VisibleForTesting
290     HostPort parseAuthority(String authority) {
291       URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority"));
292       String host;
293       int port;
294       if (uri.getHost() != null) {
295         host = uri.getHost();
296         port = uri.getPort();
297       } else {
298         /*
299          * Implementation note: We pick -1 as the port here rather than deriving it from the
300          * original socket address.  The SSL engine doens't use this port number when contacting the
301          * remote server, but rather it is used for other things like SSL Session caching.  When an
302          * invalid authority is provided (like "bad_cert"), picking the original port and passing it
303          * in would mean that the port might used under the assumption that it was correct.   By
304          * using -1 here, it forces the SSL implementation to treat it as invalid.
305          */
306         host = authority;
307         port = -1;
308       }
309       return new HostPort(host, port);
310     }
311 
312     @Override
313     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
314       final HostPort hostPort = parseAuthority(handler.getAuthority());
315 
316       ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
317         @Override
318         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
319           SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port);
320           SSLParameters sslParams = sslEngine.getSSLParameters();
321           sslParams.setEndpointIdentificationAlgorithm("HTTPS");
322           sslEngine.setSSLParameters(sslParams);
323           ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false));
324         }
325       };
326       return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
327     }
328 
329     @Override
330     public void close() {}
331   }
332 
333   /** A tuple of (host, port). */
334   @VisibleForTesting
335   static final class HostPort {
336     final String host;
337     final int port;
338 
339     public HostPort(String host, int port) {
340       this.host = host;
341       this.port = port;
342     }
343   }
344 
345   /**
346    * Returns a {@link ProtocolNegotiator} used for upgrading to HTTP/2 from HTTP/1.x.
347    */
348   public static ProtocolNegotiator plaintextUpgrade() {
349     return new PlaintextUpgradeNegotiator();
350   }
351 
352   static final class PlaintextUpgradeNegotiator implements ProtocolNegotiator {
353     @Override
354     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
355       // Register the plaintext upgrader
356       Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
357       HttpClientCodec httpClientCodec = new HttpClientCodec();
358       final HttpClientUpgradeHandler upgrader =
359           new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
360       return new BufferingHttp2UpgradeHandler(upgrader, handler);
361     }
362 
363     @Override
364     public void close() {}
365   }
366 
367   /**
368    * Returns a {@link ChannelHandler} that ensures that the {@code handler} is added to the
369    * pipeline writes to the {@link io.netty.channel.Channel} may happen immediately, even before it
370    * is active.
371    */
372   public static ProtocolNegotiator plaintext() {
373     return new PlaintextNegotiator();
374   }
375 
376   static final class PlaintextNegotiator implements ProtocolNegotiator {
377     @Override
378     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
379       return new BufferUntilChannelActiveHandler(handler);
380     }
381 
382     @Override
383     public void close() {}
384   }
385 
386   private static RuntimeException unavailableException(String msg) {
387     return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
388   }
389 
390   @VisibleForTesting
391   static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String msg,
392                                                 @Nullable Throwable t) {
393     if (!log.isLoggable(level)) {
394       return;
395     }
396 
397     SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
398     SSLEngine engine = sslHandler.engine();
399 
400     StringBuilder builder = new StringBuilder(msg);
401     builder.append("\nSSLEngine Details: [\n");
402     if (engine instanceof OpenSslEngine) {
403       builder.append("    OpenSSL, ");
404       builder.append("Version: 0x").append(Integer.toHexString(OpenSsl.version()));
405       builder.append(" (").append(OpenSsl.versionString()).append("), ");
406       builder.append("ALPN supported: ").append(OpenSsl.isAlpnSupported());
407     } else if (JettyTlsUtil.isJettyAlpnConfigured()) {
408       builder.append("    Jetty ALPN");
409     } else if (JettyTlsUtil.isJettyNpnConfigured()) {
410       builder.append("    Jetty NPN");
411     } else if (JettyTlsUtil.isJava9AlpnAvailable()) {
412       builder.append("    JDK9 ALPN");
413     }
414     builder.append("\n    TLS Protocol: ");
415     builder.append(engine.getSession().getProtocol());
416     builder.append("\n    Application Protocol: ");
417     builder.append(sslHandler.applicationProtocol());
418     builder.append("\n    Need Client Auth: " );
419     builder.append(engine.getNeedClientAuth());
420     builder.append("\n    Want Client Auth: ");
421     builder.append(engine.getWantClientAuth());
422     builder.append("\n    Supported protocols=");
423     builder.append(Arrays.toString(engine.getSupportedProtocols()));
424     builder.append("\n    Enabled protocols=");
425     builder.append(Arrays.toString(engine.getEnabledProtocols()));
426     builder.append("\n    Supported ciphers=");
427     builder.append(Arrays.toString(engine.getSupportedCipherSuites()));
428     builder.append("\n    Enabled ciphers=");
429     builder.append(Arrays.toString(engine.getEnabledCipherSuites()));
430     builder.append("\n]");
431 
432     log.log(level, builder.toString(), t);
433   }
434 
435   /**
436    * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
437    * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
438    * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
439    * i.e.  before it's active or the TLS Handshake is complete.
440    */
441   public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler {
442 
443     private ChannelHandler[] handlers;
444     private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<ChannelWrite>();
445     private boolean writing;
446     private boolean flushRequested;
447     private Throwable failCause;
448 
449     /**
450      * @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and
451      *                 before this handler.
452      */
453     protected AbstractBufferingHandler(ChannelHandler... handlers) {
454       this.handlers = handlers;
455     }
456 
457     /**
458      * When this channel is registered, we will add all the ChannelHandlers passed into our
459      * constructor to the pipeline.
460      */
461     @Override
462     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
463       /**
464        * This check is necessary as a channel may be registered with different event loops during it
465        * lifetime and we only want to configure it once.
466        */
467       if (handlers != null) {
468         for (ChannelHandler handler : handlers) {
469           ctx.pipeline().addBefore(ctx.name(), null, handler);
470         }
471         ChannelHandler handler0 = handlers[0];
472         ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]);
473         handlers = null;
474         if (handler0Ctx != null) { // The handler may have removed itself immediately
475           if (handler0 instanceof ChannelInboundHandler) {
476             ((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx);
477           } else {
478             handler0Ctx.fireChannelRegistered();
479           }
480         }
481       } else {
482         super.channelRegistered(ctx);
483       }
484     }
485 
486     /**
487      * Do not rely on channel handlers to propagate exceptions to us.
488      * {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
489      * Add a listener to the connect future directly and do appropriate error handling.
490      */
491     @Override
492     public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
493         SocketAddress localAddress, ChannelPromise promise) throws Exception {
494       super.connect(ctx, remoteAddress, localAddress, promise);
495       promise.addListener(new ChannelFutureListener() {
496         @Override
497         public void operationComplete(ChannelFuture future) throws Exception {
498           if (!future.isSuccess()) {
499             fail(ctx, future.cause());
500           }
501         }
502       });
503     }
504 
505     /**
506      * If we encounter an exception, then notify all buffered writes that we failed.
507      */
508     @Override
509     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
510       fail(ctx, cause);
511     }
512 
513     /**
514      * If this channel becomes inactive, then notify all buffered writes that we failed.
515      */
516     @Override
517     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
518       fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
519       super.channelInactive(ctx);
520     }
521 
522     /**
523      * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
524      * called, or we have somehow failed. If we have already failed in the past, then the write
525      * will fail immediately.
526      */
527     @Override
528     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
529         throws Exception {
530       /**
531        * This check handles a race condition between Channel.write (in the calling thread) and the
532        * removal of this handler (in the event loop thread).
533        * The problem occurs in e.g. this sequence:
534        * 1) [caller thread] The write method identifies the context for this handler
535        * 2) [event loop] This handler removes itself from the pipeline
536        * 3) [caller thread] The write method delegates to the invoker to call the write method in
537        *    the event loop thread. When this happens, we identify that this handler has been
538        *    removed with "bufferedWrites == null".
539        */
540       if (failCause != null) {
541         promise.setFailure(failCause);
542         ReferenceCountUtil.release(msg);
543       } else if (bufferedWrites == null) {
544         super.write(ctx, msg, promise);
545       } else {
546         bufferedWrites.add(new ChannelWrite(msg, promise));
547       }
548     }
549 
550     /**
551      * Calls to this method will not trigger an immediate flush. The flush will be deferred until
552      * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
553      */
554     @Override
555     public void flush(ChannelHandlerContext ctx) {
556       /**
557        * Swallowing any flushes is not only an optimization but also required
558        * for the SslHandler to work correctly. If the SslHandler receives multiple
559        * flushes while the handshake is still ongoing, then the handshake "randomly"
560        * times out. Not sure at this point why this is happening. Doing a single flush
561        * seems to work but multiple flushes don't ...
562        */
563       if (bufferedWrites == null) {
564         ctx.flush();
565       } else {
566         flushRequested = true;
567       }
568     }
569 
570     /**
571      * If we are still performing protocol negotiation, then this will propagate failures to all
572      * buffered writes.
573      */
574     @Override
575     public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
576       if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
577         fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
578       }
579       super.close(ctx, future);
580     }
581 
582     /**
583      * Propagate failures to all buffered writes.
584      */
585     protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
586       if (failCause == null) {
587         failCause = cause;
588       }
589       if (bufferedWrites != null) {
590         while (!bufferedWrites.isEmpty()) {
591           ChannelWrite write = bufferedWrites.poll();
592           write.promise.setFailure(cause);
593           ReferenceCountUtil.release(write.msg);
594         }
595         bufferedWrites = null;
596       }
597 
598       /**
599        * In case something goes wrong ensure that the channel gets closed as the
600        * NettyClientTransport relies on the channel's close future to get completed.
601        */
602       ctx.close();
603     }
604 
605     protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
606       if (!ctx.channel().isActive() || writing) {
607         return;
608       }
609       // Make sure that method can't be reentered, so that the ordering
610       // in the queue can't be messed up.
611       writing = true;
612       while (!bufferedWrites.isEmpty()) {
613         ChannelWrite write = bufferedWrites.poll();
614         ctx.write(write.msg, write.promise);
615       }
616       assert bufferedWrites.isEmpty();
617       bufferedWrites = null;
618       if (flushRequested) {
619         ctx.flush();
620       }
621       // Removal has to happen last as the above writes will likely trigger
622       // new writes that have to be added to the end of queue in order to not
623       // mess up the ordering.
624       ctx.pipeline().remove(this);
625     }
626 
627     private static class ChannelWrite {
628       Object msg;
629       ChannelPromise promise;
630 
631       ChannelWrite(Object msg, ChannelPromise promise) {
632         this.msg = msg;
633         this.promise = promise;
634       }
635     }
636   }
637 
638   /**
639    * Buffers all writes until the TLS Handshake is complete.
640    */
641   private static class BufferUntilTlsNegotiatedHandler extends AbstractBufferingHandler
642       implements ProtocolNegotiator.Handler {
643 
644     private final GrpcHttp2ConnectionHandler grpcHandler;
645 
646     BufferUntilTlsNegotiatedHandler(
647         ChannelHandler bootstrapHandler, GrpcHttp2ConnectionHandler grpcHandler) {
648       super(bootstrapHandler);
649       this.grpcHandler = grpcHandler;
650     }
651 
652     @Override
653     public AsciiString scheme() {
654       return Utils.HTTPS;
655     }
656 
657     @Override
658     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
659       if (evt instanceof SslHandshakeCompletionEvent) {
660         SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
661         if (handshakeEvent.isSuccess()) {
662           SslHandler handler = ctx.pipeline().get(SslHandler.class);
663           if (NEXT_PROTOCOL_VERSIONS.contains(handler.applicationProtocol())) {
664             // Successfully negotiated the protocol.
665             logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null);
666 
667             // Wait until negotiation is complete to add gRPC.   If added too early, HTTP/2 writes
668             // will fail before we see the userEvent, and the channel is closed down prematurely.
669             ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
670 
671             SSLSession session = handler.engine().getSession();
672             // Successfully negotiated the protocol.
673             // Notify about completion and pass down SSLSession in attributes.
674             grpcHandler.handleProtocolNegotiationCompleted(
675                 Attributes.newBuilder()
676                     .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
677                     .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
678                     .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
679                     .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
680                     .build(),
681                 new InternalChannelz.Security(new InternalChannelz.Tls(session)));
682             writeBufferedAndRemove(ctx);
683           } else {
684             Exception ex = new Exception(
685                 "Failed ALPN negotiation: Unable to find compatible protocol.");
686             logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex);
687             fail(ctx, ex);
688           }
689         } else {
690           fail(ctx, handshakeEvent.cause());
691         }
692       }
693       super.userEventTriggered(ctx, evt);
694     }
695   }
696 
697   /**
698    * Buffers all writes until the {@link io.netty.channel.Channel} is active.
699    */
700   private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
701       implements ProtocolNegotiator.Handler {
702 
703     private final GrpcHttp2ConnectionHandler handler;
704 
705     BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
706       super(handler);
707       this.handler = handler;
708     }
709 
710     @Override
711     public AsciiString scheme() {
712       return Utils.HTTP;
713     }
714 
715     @Override
716     public void handlerAdded(ChannelHandlerContext ctx) {
717       writeBufferedAndRemove(ctx);
718     }
719 
720     @Override
721     public void channelActive(ChannelHandlerContext ctx) throws Exception {
722       writeBufferedAndRemove(ctx);
723       handler.handleProtocolNegotiationCompleted(
724           Attributes
725               .newBuilder()
726               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
727               .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
728               .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
729               .build(),
730           /*securityInfo=*/ null);
731       super.channelActive(ctx);
732     }
733   }
734 
735   /**
736    * Buffers all writes until the HTTP to HTTP/2 upgrade is complete.
737    */
738   private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
739       implements ProtocolNegotiator.Handler {
740 
741     private final GrpcHttp2ConnectionHandler grpcHandler;
742 
743     BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
744       super(handler);
745       this.grpcHandler = grpcHandler;
746     }
747 
748     @Override
749     public AsciiString scheme() {
750       return Utils.HTTP;
751     }
752 
753     @Override
754     public void channelActive(ChannelHandlerContext ctx) throws Exception {
755       // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
756       // which causes the upgrade headers to be added
757       DefaultHttpRequest upgradeTrigger =
758           new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
759       ctx.writeAndFlush(upgradeTrigger);
760       super.channelActive(ctx);
761     }
762 
763     @Override
764     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
765       if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
766         writeBufferedAndRemove(ctx);
767         grpcHandler.handleProtocolNegotiationCompleted(
768             Attributes
769                 .newBuilder()
770                 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
771                 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress())
772                 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
773                 .build(),
774             /*securityInfo=*/ null);
775       } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
776         fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
777       }
778       super.userEventTriggered(ctx, evt);
779     }
780   }
781 }
782