• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import io.grpc.Attributes;
25 import io.grpc.CallCredentials;
26 import io.grpc.Grpc;
27 import io.grpc.Internal;
28 import io.grpc.InternalChannelz;
29 import io.grpc.SecurityLevel;
30 import io.grpc.Status;
31 import io.grpc.internal.GrpcUtil;
32 import io.netty.channel.ChannelDuplexHandler;
33 import io.netty.channel.ChannelFuture;
34 import io.netty.channel.ChannelFutureListener;
35 import io.netty.channel.ChannelHandler;
36 import io.netty.channel.ChannelHandlerAdapter;
37 import io.netty.channel.ChannelHandlerContext;
38 import io.netty.channel.ChannelInboundHandler;
39 import io.netty.channel.ChannelInboundHandlerAdapter;
40 import io.netty.channel.ChannelPipeline;
41 import io.netty.channel.ChannelPromise;
42 import io.netty.handler.codec.http.DefaultHttpRequest;
43 import io.netty.handler.codec.http.HttpClientCodec;
44 import io.netty.handler.codec.http.HttpClientUpgradeHandler;
45 import io.netty.handler.codec.http.HttpMethod;
46 import io.netty.handler.codec.http.HttpVersion;
47 import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
48 import io.netty.handler.proxy.HttpProxyHandler;
49 import io.netty.handler.proxy.ProxyConnectionEvent;
50 import io.netty.handler.proxy.ProxyHandler;
51 import io.netty.handler.ssl.OpenSsl;
52 import io.netty.handler.ssl.OpenSslEngine;
53 import io.netty.handler.ssl.SslContext;
54 import io.netty.handler.ssl.SslHandler;
55 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
56 import io.netty.util.AsciiString;
57 import io.netty.util.ReferenceCountUtil;
58 import java.net.SocketAddress;
59 import java.net.URI;
60 import java.util.ArrayDeque;
61 import java.util.Arrays;
62 import java.util.Queue;
63 import java.util.logging.Level;
64 import java.util.logging.Logger;
65 import javax.annotation.Nullable;
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLParameters;
68 import javax.net.ssl.SSLSession;
69 
70 /**
71  * Common {@link ProtocolNegotiator}s used by gRPC.
72  */
73 @Internal
74 public final class ProtocolNegotiators {
75   private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName());
76 
ProtocolNegotiators()77   private ProtocolNegotiators() {
78   }
79 
80   /**
81    * Create a server plaintext handler for gRPC.
82    */
serverPlaintext()83   public static ProtocolNegotiator serverPlaintext() {
84     return new ProtocolNegotiator() {
85       @Override
86       public Handler newHandler(final GrpcHttp2ConnectionHandler handler) {
87         class PlaintextHandler extends ChannelHandlerAdapter implements Handler {
88           @Override
89           public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
90             // Set sttributes before replace to be sure we pass it before accepting any requests.
91             handler.handleProtocolNegotiationCompleted(Attributes.newBuilder()
92                 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
93                 .build(),
94                 /*securityInfo=*/ null);
95             // Just replace this handler with the gRPC handler.
96             ctx.pipeline().replace(this, null, handler);
97           }
98 
99           @Override
100           public AsciiString scheme() {
101             return Utils.HTTP;
102           }
103         }
104 
105         return new PlaintextHandler();
106       }
107     };
108   }
109 
110   /**
111    * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN.
112    */
113   public static ProtocolNegotiator serverTls(final SslContext sslContext) {
114     Preconditions.checkNotNull(sslContext, "sslContext");
115     return new ProtocolNegotiator() {
116       @Override
117       public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
118         return new ServerTlsHandler(sslContext, handler);
119       }
120     };
121   }
122 
123   @VisibleForTesting
124   static final class ServerTlsHandler extends ChannelInboundHandlerAdapter
125       implements ProtocolNegotiator.Handler {
126     private final GrpcHttp2ConnectionHandler grpcHandler;
127     private final SslContext sslContext;
128 
129     ServerTlsHandler(SslContext sslContext, GrpcHttp2ConnectionHandler grpcHandler) {
130       this.sslContext = sslContext;
131       this.grpcHandler = grpcHandler;
132     }
133 
134     @Override
135     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
136       super.handlerAdded(ctx);
137 
138       SSLEngine sslEngine = sslContext.newEngine(ctx.alloc());
139       ctx.pipeline().addFirst(new SslHandler(sslEngine, false));
140     }
141 
142     @Override
143     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
144       fail(ctx, cause);
145     }
146 
147     @Override
148     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
149       if (evt instanceof SslHandshakeCompletionEvent) {
150         SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
151         if (handshakeEvent.isSuccess()) {
152           if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) {
153             SSLSession session = sslHandler(ctx.pipeline()).engine().getSession();
154             // Successfully negotiated the protocol.
155             // Notify about completion and pass down SSLSession in attributes.
156             grpcHandler.handleProtocolNegotiationCompleted(
157                 Attributes.newBuilder()
158                     .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
159                     .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
160                     .build(),
161                 new InternalChannelz.Security(new InternalChannelz.Tls(session)));
162             // Replace this handler with the GRPC handler.
163             ctx.pipeline().replace(this, null, grpcHandler);
164           } else {
165             fail(ctx, new Exception(
166                 "Failed protocol negotiation: Unable to find compatible protocol."));
167           }
168         } else {
169           fail(ctx, handshakeEvent.cause());
170         }
171       }
172       super.userEventTriggered(ctx, evt);
173     }
174 
175     private SslHandler sslHandler(ChannelPipeline pipeline) {
176       return pipeline.get(SslHandler.class);
177     }
178 
179     private void fail(ChannelHandlerContext ctx, Throwable exception) {
180       logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", exception);
181       ctx.close();
182     }
183 
184     @Override
185     public AsciiString scheme() {
186       return Utils.HTTPS;
187     }
188   }
189 
190   /**
191    * Returns a {@link ProtocolNegotiator} that does HTTP CONNECT proxy negotiation.
192    */
193   public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress,
194       final @Nullable String proxyUsername, final @Nullable String proxyPassword,
195       final ProtocolNegotiator negotiator) {
196     Preconditions.checkNotNull(proxyAddress, "proxyAddress");
197     Preconditions.checkNotNull(negotiator, "negotiator");
198     class ProxyNegotiator implements ProtocolNegotiator {
199       @Override
200       public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) {
201         HttpProxyHandler proxyHandler;
202         if (proxyUsername == null || proxyPassword == null) {
203           proxyHandler = new HttpProxyHandler(proxyAddress);
204         } else {
205           proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword);
206         }
207         return new BufferUntilProxyTunnelledHandler(
208             proxyHandler, negotiator.newHandler(http2Handler));
209       }
210     }
211 
212     return new ProxyNegotiator();
213   }
214 
215   /**
216    * Buffers all writes until the HTTP CONNECT tunnel is established.
217    */
218   static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler
219       implements ProtocolNegotiator.Handler {
220     private final ProtocolNegotiator.Handler originalHandler;
221 
222     public BufferUntilProxyTunnelledHandler(
223         ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) {
224       super(proxyHandler, handler);
225       this.originalHandler = handler;
226     }
227 
228 
229     @Override
230     public AsciiString scheme() {
231       return originalHandler.scheme();
232     }
233 
234     @Override
235     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
236       if (evt instanceof ProxyConnectionEvent) {
237         writeBufferedAndRemove(ctx);
238       }
239       super.userEventTriggered(ctx, evt);
240     }
241 
242     @Override
243     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
244       fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy"));
245       super.channelInactive(ctx);
246     }
247 
248     @Override
249     public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
250       if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
251         fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy"));
252       }
253       super.close(ctx, future);
254     }
255   }
256 
257   /**
258    * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
259    * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
260    * may happen immediately, even before the TLS Handshake is complete.
261    */
262   public static ProtocolNegotiator tls(SslContext sslContext) {
263     return new TlsNegotiator(sslContext);
264   }
265 
266   @VisibleForTesting
267   static final class TlsNegotiator implements ProtocolNegotiator {
268     private final SslContext sslContext;
269 
270     TlsNegotiator(SslContext sslContext) {
271       this.sslContext = checkNotNull(sslContext, "sslContext");
272     }
273 
274     @VisibleForTesting
275     HostPort parseAuthority(String authority) {
276       URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority"));
277       String host;
278       int port;
279       if (uri.getHost() != null) {
280         host = uri.getHost();
281         port = uri.getPort();
282       } else {
283         /*
284          * Implementation note: We pick -1 as the port here rather than deriving it from the
285          * original socket address.  The SSL engine doens't use this port number when contacting the
286          * remote server, but rather it is used for other things like SSL Session caching.  When an
287          * invalid authority is provided (like "bad_cert"), picking the original port and passing it
288          * in would mean that the port might used under the assumption that it was correct.   By
289          * using -1 here, it forces the SSL implementation to treat it as invalid.
290          */
291         host = authority;
292         port = -1;
293       }
294       return new HostPort(host, port);
295     }
296 
297     @Override
298     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
299       final HostPort hostPort = parseAuthority(handler.getAuthority());
300 
301       ChannelHandler sslBootstrap = new ChannelHandlerAdapter() {
302         @Override
303         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
304           SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port);
305           SSLParameters sslParams = sslEngine.getSSLParameters();
306           sslParams.setEndpointIdentificationAlgorithm("HTTPS");
307           sslEngine.setSSLParameters(sslParams);
308           ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false));
309         }
310       };
311       return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler);
312     }
313   }
314 
315   /** A tuple of (host, port). */
316   @VisibleForTesting
317   static final class HostPort {
318     final String host;
319     final int port;
320 
321     public HostPort(String host, int port) {
322       this.host = host;
323       this.port = port;
324     }
325   }
326 
327   /**
328    * Returns a {@link ProtocolNegotiator} used for upgrading to HTTP/2 from HTTP/1.x.
329    */
330   public static ProtocolNegotiator plaintextUpgrade() {
331     return new PlaintextUpgradeNegotiator();
332   }
333 
334   static final class PlaintextUpgradeNegotiator implements ProtocolNegotiator {
335     @Override
336     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
337       // Register the plaintext upgrader
338       Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
339       HttpClientCodec httpClientCodec = new HttpClientCodec();
340       final HttpClientUpgradeHandler upgrader =
341           new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
342       return new BufferingHttp2UpgradeHandler(upgrader, handler);
343     }
344   }
345 
346   /**
347    * Returns a {@link ChannelHandler} that ensures that the {@code handler} is added to the
348    * pipeline writes to the {@link io.netty.channel.Channel} may happen immediately, even before it
349    * is active.
350    */
351   public static ProtocolNegotiator plaintext() {
352     return new PlaintextNegotiator();
353   }
354 
355   static final class PlaintextNegotiator implements ProtocolNegotiator {
356     @Override
357     public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
358       return new BufferUntilChannelActiveHandler(handler);
359     }
360   }
361 
362   private static RuntimeException unavailableException(String msg) {
363     return Status.UNAVAILABLE.withDescription(msg).asRuntimeException();
364   }
365 
366   @VisibleForTesting
367   static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String msg,
368                                                 @Nullable Throwable t) {
369     if (!log.isLoggable(level)) {
370       return;
371     }
372 
373     SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
374     SSLEngine engine = sslHandler.engine();
375 
376     StringBuilder builder = new StringBuilder(msg);
377     builder.append("\nSSLEngine Details: [\n");
378     if (engine instanceof OpenSslEngine) {
379       builder.append("    OpenSSL, ");
380       builder.append("Version: 0x").append(Integer.toHexString(OpenSsl.version()));
381       builder.append(" (").append(OpenSsl.versionString()).append("), ");
382       builder.append("ALPN supported: ").append(OpenSsl.isAlpnSupported());
383     } else if (JettyTlsUtil.isJettyAlpnConfigured()) {
384       builder.append("    Jetty ALPN");
385     } else if (JettyTlsUtil.isJettyNpnConfigured()) {
386       builder.append("    Jetty NPN");
387     } else if (JettyTlsUtil.isJava9AlpnAvailable()) {
388       builder.append("    JDK9 ALPN");
389     }
390     builder.append("\n    TLS Protocol: ");
391     builder.append(engine.getSession().getProtocol());
392     builder.append("\n    Application Protocol: ");
393     builder.append(sslHandler.applicationProtocol());
394     builder.append("\n    Need Client Auth: " );
395     builder.append(engine.getNeedClientAuth());
396     builder.append("\n    Want Client Auth: ");
397     builder.append(engine.getWantClientAuth());
398     builder.append("\n    Supported protocols=");
399     builder.append(Arrays.toString(engine.getSupportedProtocols()));
400     builder.append("\n    Enabled protocols=");
401     builder.append(Arrays.toString(engine.getEnabledProtocols()));
402     builder.append("\n    Supported ciphers=");
403     builder.append(Arrays.toString(engine.getSupportedCipherSuites()));
404     builder.append("\n    Enabled ciphers=");
405     builder.append(Arrays.toString(engine.getEnabledCipherSuites()));
406     builder.append("\n]");
407 
408     log.log(level, builder.toString(), t);
409   }
410 
411   /**
412    * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
413    * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to
414    * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
415    * i.e.  before it's active or the TLS Handshake is complete.
416    */
417   public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler {
418 
419     private ChannelHandler[] handlers;
420     private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<ChannelWrite>();
421     private boolean writing;
422     private boolean flushRequested;
423     private Throwable failCause;
424 
425     /**
426      * @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and
427      *                 before this handler.
428      */
429     protected AbstractBufferingHandler(ChannelHandler... handlers) {
430       this.handlers = handlers;
431     }
432 
433     /**
434      * When this channel is registered, we will add all the ChannelHandlers passed into our
435      * constructor to the pipeline.
436      */
437     @Override
438     public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
439       /**
440        * This check is necessary as a channel may be registered with different event loops during it
441        * lifetime and we only want to configure it once.
442        */
443       if (handlers != null) {
444         for (ChannelHandler handler : handlers) {
445           ctx.pipeline().addBefore(ctx.name(), null, handler);
446         }
447         ChannelHandler handler0 = handlers[0];
448         ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]);
449         handlers = null;
450         if (handler0Ctx != null) { // The handler may have removed itself immediately
451           if (handler0 instanceof ChannelInboundHandler) {
452             ((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx);
453           } else {
454             handler0Ctx.fireChannelRegistered();
455           }
456         }
457       } else {
458         super.channelRegistered(ctx);
459       }
460     }
461 
462     /**
463      * Do not rely on channel handlers to propagate exceptions to us.
464      * {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
465      * Add a listener to the connect future directly and do appropriate error handling.
466      */
467     @Override
468     public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
469         SocketAddress localAddress, ChannelPromise promise) throws Exception {
470       super.connect(ctx, remoteAddress, localAddress, promise);
471       promise.addListener(new ChannelFutureListener() {
472         @Override
473         public void operationComplete(ChannelFuture future) throws Exception {
474           if (!future.isSuccess()) {
475             fail(ctx, future.cause());
476           }
477         }
478       });
479     }
480 
481     /**
482      * If we encounter an exception, then notify all buffered writes that we failed.
483      */
484     @Override
485     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
486       fail(ctx, cause);
487     }
488 
489     /**
490      * If this channel becomes inactive, then notify all buffered writes that we failed.
491      */
492     @Override
493     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
494       fail(ctx, unavailableException("Connection broken while performing protocol negotiation"));
495       super.channelInactive(ctx);
496     }
497 
498     /**
499      * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
500      * called, or we have somehow failed. If we have already failed in the past, then the write
501      * will fail immediately.
502      */
503     @Override
504     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
505         throws Exception {
506       /**
507        * This check handles a race condition between Channel.write (in the calling thread) and the
508        * removal of this handler (in the event loop thread).
509        * The problem occurs in e.g. this sequence:
510        * 1) [caller thread] The write method identifies the context for this handler
511        * 2) [event loop] This handler removes itself from the pipeline
512        * 3) [caller thread] The write method delegates to the invoker to call the write method in
513        *    the event loop thread. When this happens, we identify that this handler has been
514        *    removed with "bufferedWrites == null".
515        */
516       if (failCause != null) {
517         promise.setFailure(failCause);
518         ReferenceCountUtil.release(msg);
519       } else if (bufferedWrites == null) {
520         super.write(ctx, msg, promise);
521       } else {
522         bufferedWrites.add(new ChannelWrite(msg, promise));
523       }
524     }
525 
526     /**
527      * Calls to this method will not trigger an immediate flush. The flush will be deferred until
528      * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
529      */
530     @Override
531     public void flush(ChannelHandlerContext ctx) {
532       /**
533        * Swallowing any flushes is not only an optimization but also required
534        * for the SslHandler to work correctly. If the SslHandler receives multiple
535        * flushes while the handshake is still ongoing, then the handshake "randomly"
536        * times out. Not sure at this point why this is happening. Doing a single flush
537        * seems to work but multiple flushes don't ...
538        */
539       if (bufferedWrites == null) {
540         ctx.flush();
541       } else {
542         flushRequested = true;
543       }
544     }
545 
546     /**
547      * If we are still performing protocol negotiation, then this will propagate failures to all
548      * buffered writes.
549      */
550     @Override
551     public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
552       if (ctx.channel().isActive()) { // This may be a notification that the socket was closed
553         fail(ctx, unavailableException("Channel closed while performing protocol negotiation"));
554       }
555       super.close(ctx, future);
556     }
557 
558     /**
559      * Propagate failures to all buffered writes.
560      */
561     protected final void fail(ChannelHandlerContext ctx, Throwable cause) {
562       if (failCause == null) {
563         failCause = cause;
564       }
565       if (bufferedWrites != null) {
566         while (!bufferedWrites.isEmpty()) {
567           ChannelWrite write = bufferedWrites.poll();
568           write.promise.setFailure(cause);
569           ReferenceCountUtil.release(write.msg);
570         }
571         bufferedWrites = null;
572       }
573 
574       /**
575        * In case something goes wrong ensure that the channel gets closed as the
576        * NettyClientTransport relies on the channel's close future to get completed.
577        */
578       ctx.close();
579     }
580 
581     protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
582       if (!ctx.channel().isActive() || writing) {
583         return;
584       }
585       // Make sure that method can't be reentered, so that the ordering
586       // in the queue can't be messed up.
587       writing = true;
588       while (!bufferedWrites.isEmpty()) {
589         ChannelWrite write = bufferedWrites.poll();
590         ctx.write(write.msg, write.promise);
591       }
592       assert bufferedWrites.isEmpty();
593       bufferedWrites = null;
594       if (flushRequested) {
595         ctx.flush();
596       }
597       // Removal has to happen last as the above writes will likely trigger
598       // new writes that have to be added to the end of queue in order to not
599       // mess up the ordering.
600       ctx.pipeline().remove(this);
601     }
602 
603     private static class ChannelWrite {
604       Object msg;
605       ChannelPromise promise;
606 
607       ChannelWrite(Object msg, ChannelPromise promise) {
608         this.msg = msg;
609         this.promise = promise;
610       }
611     }
612   }
613 
614   /**
615    * Buffers all writes until the TLS Handshake is complete.
616    */
617   private static class BufferUntilTlsNegotiatedHandler extends AbstractBufferingHandler
618       implements ProtocolNegotiator.Handler {
619 
620     private final GrpcHttp2ConnectionHandler grpcHandler;
621 
622     BufferUntilTlsNegotiatedHandler(
623         ChannelHandler bootstrapHandler, GrpcHttp2ConnectionHandler grpcHandler) {
624       super(bootstrapHandler);
625       this.grpcHandler = grpcHandler;
626     }
627 
628     @Override
629     public AsciiString scheme() {
630       return Utils.HTTPS;
631     }
632 
633     @Override
634     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
635       if (evt instanceof SslHandshakeCompletionEvent) {
636         SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
637         if (handshakeEvent.isSuccess()) {
638           SslHandler handler = ctx.pipeline().get(SslHandler.class);
639           if (NEXT_PROTOCOL_VERSIONS.contains(handler.applicationProtocol())) {
640             // Successfully negotiated the protocol.
641             logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null);
642 
643             // Wait until negotiation is complete to add gRPC.   If added too early, HTTP/2 writes
644             // will fail before we see the userEvent, and the channel is closed down prematurely.
645             ctx.pipeline().addBefore(ctx.name(), null, grpcHandler);
646 
647             SSLSession session = handler.engine().getSession();
648             // Successfully negotiated the protocol.
649             // Notify about completion and pass down SSLSession in attributes.
650             grpcHandler.handleProtocolNegotiationCompleted(
651                 Attributes.newBuilder()
652                     .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session)
653                     .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
654                     .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY)
655                     .build(),
656                 new InternalChannelz.Security(new InternalChannelz.Tls(session)));
657             writeBufferedAndRemove(ctx);
658           } else {
659             Exception ex = new Exception(
660                 "Failed ALPN negotiation: Unable to find compatible protocol.");
661             logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex);
662             fail(ctx, ex);
663           }
664         } else {
665           fail(ctx, handshakeEvent.cause());
666         }
667       }
668       super.userEventTriggered(ctx, evt);
669     }
670   }
671 
672   /**
673    * Buffers all writes until the {@link io.netty.channel.Channel} is active.
674    */
675   private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
676       implements ProtocolNegotiator.Handler {
677 
678     private final GrpcHttp2ConnectionHandler handler;
679 
680     BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
681       super(handler);
682       this.handler = handler;
683     }
684 
685     @Override
686     public AsciiString scheme() {
687       return Utils.HTTP;
688     }
689 
690     @Override
691     public void handlerAdded(ChannelHandlerContext ctx) {
692       writeBufferedAndRemove(ctx);
693     }
694 
695     @Override
696     public void channelActive(ChannelHandlerContext ctx) throws Exception {
697       writeBufferedAndRemove(ctx);
698       handler.handleProtocolNegotiationCompleted(
699           Attributes
700               .newBuilder()
701               .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
702               .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
703               .build(),
704           /*securityInfo=*/ null);
705       super.channelActive(ctx);
706     }
707   }
708 
709   /**
710    * Buffers all writes until the HTTP to HTTP/2 upgrade is complete.
711    */
712   private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
713       implements ProtocolNegotiator.Handler {
714 
715     private final GrpcHttp2ConnectionHandler grpcHandler;
716 
717     BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
718       super(handler);
719       this.grpcHandler = grpcHandler;
720     }
721 
722     @Override
723     public AsciiString scheme() {
724       return Utils.HTTP;
725     }
726 
727     @Override
728     public void channelActive(ChannelHandlerContext ctx) throws Exception {
729       // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request
730       // which causes the upgrade headers to be added
731       DefaultHttpRequest upgradeTrigger =
732           new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
733       ctx.writeAndFlush(upgradeTrigger);
734       super.channelActive(ctx);
735     }
736 
737     @Override
738     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
739       if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
740         writeBufferedAndRemove(ctx);
741         grpcHandler.handleProtocolNegotiationCompleted(
742             Attributes
743                 .newBuilder()
744                 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
745                 .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE)
746                 .build(),
747             /*securityInfo=*/ null);
748       } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
749         fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
750       }
751       super.userEventTriggered(ctx, evt);
752     }
753   }
754 }
755