1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Preconditions; 24 import io.grpc.Attributes; 25 import io.grpc.CallCredentials; 26 import io.grpc.Grpc; 27 import io.grpc.Internal; 28 import io.grpc.InternalChannelz; 29 import io.grpc.SecurityLevel; 30 import io.grpc.Status; 31 import io.grpc.internal.GrpcUtil; 32 import io.netty.channel.ChannelDuplexHandler; 33 import io.netty.channel.ChannelFuture; 34 import io.netty.channel.ChannelFutureListener; 35 import io.netty.channel.ChannelHandler; 36 import io.netty.channel.ChannelHandlerAdapter; 37 import io.netty.channel.ChannelHandlerContext; 38 import io.netty.channel.ChannelInboundHandler; 39 import io.netty.channel.ChannelInboundHandlerAdapter; 40 import io.netty.channel.ChannelPipeline; 41 import io.netty.channel.ChannelPromise; 42 import io.netty.handler.codec.http.DefaultHttpRequest; 43 import io.netty.handler.codec.http.HttpClientCodec; 44 import io.netty.handler.codec.http.HttpClientUpgradeHandler; 45 import io.netty.handler.codec.http.HttpMethod; 46 import io.netty.handler.codec.http.HttpVersion; 47 import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; 48 import io.netty.handler.proxy.HttpProxyHandler; 49 import io.netty.handler.proxy.ProxyConnectionEvent; 50 import io.netty.handler.proxy.ProxyHandler; 51 import io.netty.handler.ssl.OpenSsl; 52 import io.netty.handler.ssl.OpenSslEngine; 53 import io.netty.handler.ssl.SslContext; 54 import io.netty.handler.ssl.SslHandler; 55 import io.netty.handler.ssl.SslHandshakeCompletionEvent; 56 import io.netty.util.AsciiString; 57 import io.netty.util.ReferenceCountUtil; 58 import java.net.SocketAddress; 59 import java.net.URI; 60 import java.util.ArrayDeque; 61 import java.util.Arrays; 62 import java.util.Queue; 63 import java.util.logging.Level; 64 import java.util.logging.Logger; 65 import javax.annotation.Nullable; 66 import javax.net.ssl.SSLEngine; 67 import javax.net.ssl.SSLParameters; 68 import javax.net.ssl.SSLSession; 69 70 /** 71 * Common {@link ProtocolNegotiator}s used by gRPC. 72 */ 73 @Internal 74 public final class ProtocolNegotiators { 75 private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName()); 76 ProtocolNegotiators()77 private ProtocolNegotiators() { 78 } 79 80 /** 81 * Create a server plaintext handler for gRPC. 82 */ serverPlaintext()83 public static ProtocolNegotiator serverPlaintext() { 84 return new ProtocolNegotiator() { 85 @Override 86 public Handler newHandler(final GrpcHttp2ConnectionHandler handler) { 87 class PlaintextHandler extends ChannelHandlerAdapter implements Handler { 88 @Override 89 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 90 // Set sttributes before replace to be sure we pass it before accepting any requests. 91 handler.handleProtocolNegotiationCompleted(Attributes.newBuilder() 92 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 93 .build(), 94 /*securityInfo=*/ null); 95 // Just replace this handler with the gRPC handler. 96 ctx.pipeline().replace(this, null, handler); 97 } 98 99 @Override 100 public AsciiString scheme() { 101 return Utils.HTTP; 102 } 103 } 104 105 return new PlaintextHandler(); 106 } 107 }; 108 } 109 110 /** 111 * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN. 112 */ 113 public static ProtocolNegotiator serverTls(final SslContext sslContext) { 114 Preconditions.checkNotNull(sslContext, "sslContext"); 115 return new ProtocolNegotiator() { 116 @Override 117 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 118 return new ServerTlsHandler(sslContext, handler); 119 } 120 }; 121 } 122 123 @VisibleForTesting 124 static final class ServerTlsHandler extends ChannelInboundHandlerAdapter 125 implements ProtocolNegotiator.Handler { 126 private final GrpcHttp2ConnectionHandler grpcHandler; 127 private final SslContext sslContext; 128 129 ServerTlsHandler(SslContext sslContext, GrpcHttp2ConnectionHandler grpcHandler) { 130 this.sslContext = sslContext; 131 this.grpcHandler = grpcHandler; 132 } 133 134 @Override 135 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 136 super.handlerAdded(ctx); 137 138 SSLEngine sslEngine = sslContext.newEngine(ctx.alloc()); 139 ctx.pipeline().addFirst(new SslHandler(sslEngine, false)); 140 } 141 142 @Override 143 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 144 fail(ctx, cause); 145 } 146 147 @Override 148 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 149 if (evt instanceof SslHandshakeCompletionEvent) { 150 SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt; 151 if (handshakeEvent.isSuccess()) { 152 if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) { 153 SSLSession session = sslHandler(ctx.pipeline()).engine().getSession(); 154 // Successfully negotiated the protocol. 155 // Notify about completion and pass down SSLSession in attributes. 156 grpcHandler.handleProtocolNegotiationCompleted( 157 Attributes.newBuilder() 158 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) 159 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 160 .build(), 161 new InternalChannelz.Security(new InternalChannelz.Tls(session))); 162 // Replace this handler with the GRPC handler. 163 ctx.pipeline().replace(this, null, grpcHandler); 164 } else { 165 fail(ctx, new Exception( 166 "Failed protocol negotiation: Unable to find compatible protocol.")); 167 } 168 } else { 169 fail(ctx, handshakeEvent.cause()); 170 } 171 } 172 super.userEventTriggered(ctx, evt); 173 } 174 175 private SslHandler sslHandler(ChannelPipeline pipeline) { 176 return pipeline.get(SslHandler.class); 177 } 178 179 private void fail(ChannelHandlerContext ctx, Throwable exception) { 180 logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", exception); 181 ctx.close(); 182 } 183 184 @Override 185 public AsciiString scheme() { 186 return Utils.HTTPS; 187 } 188 } 189 190 /** 191 * Returns a {@link ProtocolNegotiator} that does HTTP CONNECT proxy negotiation. 192 */ 193 public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress, 194 final @Nullable String proxyUsername, final @Nullable String proxyPassword, 195 final ProtocolNegotiator negotiator) { 196 Preconditions.checkNotNull(proxyAddress, "proxyAddress"); 197 Preconditions.checkNotNull(negotiator, "negotiator"); 198 class ProxyNegotiator implements ProtocolNegotiator { 199 @Override 200 public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { 201 HttpProxyHandler proxyHandler; 202 if (proxyUsername == null || proxyPassword == null) { 203 proxyHandler = new HttpProxyHandler(proxyAddress); 204 } else { 205 proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword); 206 } 207 return new BufferUntilProxyTunnelledHandler( 208 proxyHandler, negotiator.newHandler(http2Handler)); 209 } 210 } 211 212 return new ProxyNegotiator(); 213 } 214 215 /** 216 * Buffers all writes until the HTTP CONNECT tunnel is established. 217 */ 218 static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler 219 implements ProtocolNegotiator.Handler { 220 private final ProtocolNegotiator.Handler originalHandler; 221 222 public BufferUntilProxyTunnelledHandler( 223 ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) { 224 super(proxyHandler, handler); 225 this.originalHandler = handler; 226 } 227 228 229 @Override 230 public AsciiString scheme() { 231 return originalHandler.scheme(); 232 } 233 234 @Override 235 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 236 if (evt instanceof ProxyConnectionEvent) { 237 writeBufferedAndRemove(ctx); 238 } 239 super.userEventTriggered(ctx, evt); 240 } 241 242 @Override 243 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 244 fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy")); 245 super.channelInactive(ctx); 246 } 247 248 @Override 249 public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { 250 if (ctx.channel().isActive()) { // This may be a notification that the socket was closed 251 fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy")); 252 } 253 super.close(ctx, future); 254 } 255 } 256 257 /** 258 * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will 259 * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel} 260 * may happen immediately, even before the TLS Handshake is complete. 261 */ 262 public static ProtocolNegotiator tls(SslContext sslContext) { 263 return new TlsNegotiator(sslContext); 264 } 265 266 @VisibleForTesting 267 static final class TlsNegotiator implements ProtocolNegotiator { 268 private final SslContext sslContext; 269 270 TlsNegotiator(SslContext sslContext) { 271 this.sslContext = checkNotNull(sslContext, "sslContext"); 272 } 273 274 @VisibleForTesting 275 HostPort parseAuthority(String authority) { 276 URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority")); 277 String host; 278 int port; 279 if (uri.getHost() != null) { 280 host = uri.getHost(); 281 port = uri.getPort(); 282 } else { 283 /* 284 * Implementation note: We pick -1 as the port here rather than deriving it from the 285 * original socket address. The SSL engine doens't use this port number when contacting the 286 * remote server, but rather it is used for other things like SSL Session caching. When an 287 * invalid authority is provided (like "bad_cert"), picking the original port and passing it 288 * in would mean that the port might used under the assumption that it was correct. By 289 * using -1 here, it forces the SSL implementation to treat it as invalid. 290 */ 291 host = authority; 292 port = -1; 293 } 294 return new HostPort(host, port); 295 } 296 297 @Override 298 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 299 final HostPort hostPort = parseAuthority(handler.getAuthority()); 300 301 ChannelHandler sslBootstrap = new ChannelHandlerAdapter() { 302 @Override 303 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 304 SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port); 305 SSLParameters sslParams = sslEngine.getSSLParameters(); 306 sslParams.setEndpointIdentificationAlgorithm("HTTPS"); 307 sslEngine.setSSLParameters(sslParams); 308 ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false)); 309 } 310 }; 311 return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler); 312 } 313 } 314 315 /** A tuple of (host, port). */ 316 @VisibleForTesting 317 static final class HostPort { 318 final String host; 319 final int port; 320 321 public HostPort(String host, int port) { 322 this.host = host; 323 this.port = port; 324 } 325 } 326 327 /** 328 * Returns a {@link ProtocolNegotiator} used for upgrading to HTTP/2 from HTTP/1.x. 329 */ 330 public static ProtocolNegotiator plaintextUpgrade() { 331 return new PlaintextUpgradeNegotiator(); 332 } 333 334 static final class PlaintextUpgradeNegotiator implements ProtocolNegotiator { 335 @Override 336 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 337 // Register the plaintext upgrader 338 Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler); 339 HttpClientCodec httpClientCodec = new HttpClientCodec(); 340 final HttpClientUpgradeHandler upgrader = 341 new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); 342 return new BufferingHttp2UpgradeHandler(upgrader, handler); 343 } 344 } 345 346 /** 347 * Returns a {@link ChannelHandler} that ensures that the {@code handler} is added to the 348 * pipeline writes to the {@link io.netty.channel.Channel} may happen immediately, even before it 349 * is active. 350 */ 351 public static ProtocolNegotiator plaintext() { 352 return new PlaintextNegotiator(); 353 } 354 355 static final class PlaintextNegotiator implements ProtocolNegotiator { 356 @Override 357 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 358 return new BufferUntilChannelActiveHandler(handler); 359 } 360 } 361 362 private static RuntimeException unavailableException(String msg) { 363 return Status.UNAVAILABLE.withDescription(msg).asRuntimeException(); 364 } 365 366 @VisibleForTesting 367 static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String msg, 368 @Nullable Throwable t) { 369 if (!log.isLoggable(level)) { 370 return; 371 } 372 373 SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); 374 SSLEngine engine = sslHandler.engine(); 375 376 StringBuilder builder = new StringBuilder(msg); 377 builder.append("\nSSLEngine Details: [\n"); 378 if (engine instanceof OpenSslEngine) { 379 builder.append(" OpenSSL, "); 380 builder.append("Version: 0x").append(Integer.toHexString(OpenSsl.version())); 381 builder.append(" (").append(OpenSsl.versionString()).append("), "); 382 builder.append("ALPN supported: ").append(OpenSsl.isAlpnSupported()); 383 } else if (JettyTlsUtil.isJettyAlpnConfigured()) { 384 builder.append(" Jetty ALPN"); 385 } else if (JettyTlsUtil.isJettyNpnConfigured()) { 386 builder.append(" Jetty NPN"); 387 } else if (JettyTlsUtil.isJava9AlpnAvailable()) { 388 builder.append(" JDK9 ALPN"); 389 } 390 builder.append("\n TLS Protocol: "); 391 builder.append(engine.getSession().getProtocol()); 392 builder.append("\n Application Protocol: "); 393 builder.append(sslHandler.applicationProtocol()); 394 builder.append("\n Need Client Auth: " ); 395 builder.append(engine.getNeedClientAuth()); 396 builder.append("\n Want Client Auth: "); 397 builder.append(engine.getWantClientAuth()); 398 builder.append("\n Supported protocols="); 399 builder.append(Arrays.toString(engine.getSupportedProtocols())); 400 builder.append("\n Enabled protocols="); 401 builder.append(Arrays.toString(engine.getEnabledProtocols())); 402 builder.append("\n Supported ciphers="); 403 builder.append(Arrays.toString(engine.getSupportedCipherSuites())); 404 builder.append("\n Enabled ciphers="); 405 builder.append(Arrays.toString(engine.getEnabledCipherSuites())); 406 builder.append("\n]"); 407 408 log.log(level, builder.toString(), t); 409 } 410 411 /** 412 * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or 413 * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to 414 * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially 415 * i.e. before it's active or the TLS Handshake is complete. 416 */ 417 public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler { 418 419 private ChannelHandler[] handlers; 420 private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<ChannelWrite>(); 421 private boolean writing; 422 private boolean flushRequested; 423 private Throwable failCause; 424 425 /** 426 * @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and 427 * before this handler. 428 */ 429 protected AbstractBufferingHandler(ChannelHandler... handlers) { 430 this.handlers = handlers; 431 } 432 433 /** 434 * When this channel is registered, we will add all the ChannelHandlers passed into our 435 * constructor to the pipeline. 436 */ 437 @Override 438 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 439 /** 440 * This check is necessary as a channel may be registered with different event loops during it 441 * lifetime and we only want to configure it once. 442 */ 443 if (handlers != null) { 444 for (ChannelHandler handler : handlers) { 445 ctx.pipeline().addBefore(ctx.name(), null, handler); 446 } 447 ChannelHandler handler0 = handlers[0]; 448 ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]); 449 handlers = null; 450 if (handler0Ctx != null) { // The handler may have removed itself immediately 451 if (handler0 instanceof ChannelInboundHandler) { 452 ((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx); 453 } else { 454 handler0Ctx.fireChannelRegistered(); 455 } 456 } 457 } else { 458 super.channelRegistered(ctx); 459 } 460 } 461 462 /** 463 * Do not rely on channel handlers to propagate exceptions to us. 464 * {@link NettyClientHandler} is an example of a class that does not propagate exceptions. 465 * Add a listener to the connect future directly and do appropriate error handling. 466 */ 467 @Override 468 public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress, 469 SocketAddress localAddress, ChannelPromise promise) throws Exception { 470 super.connect(ctx, remoteAddress, localAddress, promise); 471 promise.addListener(new ChannelFutureListener() { 472 @Override 473 public void operationComplete(ChannelFuture future) throws Exception { 474 if (!future.isSuccess()) { 475 fail(ctx, future.cause()); 476 } 477 } 478 }); 479 } 480 481 /** 482 * If we encounter an exception, then notify all buffered writes that we failed. 483 */ 484 @Override 485 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 486 fail(ctx, cause); 487 } 488 489 /** 490 * If this channel becomes inactive, then notify all buffered writes that we failed. 491 */ 492 @Override 493 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 494 fail(ctx, unavailableException("Connection broken while performing protocol negotiation")); 495 super.channelInactive(ctx); 496 } 497 498 /** 499 * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is 500 * called, or we have somehow failed. If we have already failed in the past, then the write 501 * will fail immediately. 502 */ 503 @Override 504 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 505 throws Exception { 506 /** 507 * This check handles a race condition between Channel.write (in the calling thread) and the 508 * removal of this handler (in the event loop thread). 509 * The problem occurs in e.g. this sequence: 510 * 1) [caller thread] The write method identifies the context for this handler 511 * 2) [event loop] This handler removes itself from the pipeline 512 * 3) [caller thread] The write method delegates to the invoker to call the write method in 513 * the event loop thread. When this happens, we identify that this handler has been 514 * removed with "bufferedWrites == null". 515 */ 516 if (failCause != null) { 517 promise.setFailure(failCause); 518 ReferenceCountUtil.release(msg); 519 } else if (bufferedWrites == null) { 520 super.write(ctx, msg, promise); 521 } else { 522 bufferedWrites.add(new ChannelWrite(msg, promise)); 523 } 524 } 525 526 /** 527 * Calls to this method will not trigger an immediate flush. The flush will be deferred until 528 * {@link #writeBufferedAndRemove(ChannelHandlerContext)}. 529 */ 530 @Override 531 public void flush(ChannelHandlerContext ctx) { 532 /** 533 * Swallowing any flushes is not only an optimization but also required 534 * for the SslHandler to work correctly. If the SslHandler receives multiple 535 * flushes while the handshake is still ongoing, then the handshake "randomly" 536 * times out. Not sure at this point why this is happening. Doing a single flush 537 * seems to work but multiple flushes don't ... 538 */ 539 if (bufferedWrites == null) { 540 ctx.flush(); 541 } else { 542 flushRequested = true; 543 } 544 } 545 546 /** 547 * If we are still performing protocol negotiation, then this will propagate failures to all 548 * buffered writes. 549 */ 550 @Override 551 public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { 552 if (ctx.channel().isActive()) { // This may be a notification that the socket was closed 553 fail(ctx, unavailableException("Channel closed while performing protocol negotiation")); 554 } 555 super.close(ctx, future); 556 } 557 558 /** 559 * Propagate failures to all buffered writes. 560 */ 561 protected final void fail(ChannelHandlerContext ctx, Throwable cause) { 562 if (failCause == null) { 563 failCause = cause; 564 } 565 if (bufferedWrites != null) { 566 while (!bufferedWrites.isEmpty()) { 567 ChannelWrite write = bufferedWrites.poll(); 568 write.promise.setFailure(cause); 569 ReferenceCountUtil.release(write.msg); 570 } 571 bufferedWrites = null; 572 } 573 574 /** 575 * In case something goes wrong ensure that the channel gets closed as the 576 * NettyClientTransport relies on the channel's close future to get completed. 577 */ 578 ctx.close(); 579 } 580 581 protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) { 582 if (!ctx.channel().isActive() || writing) { 583 return; 584 } 585 // Make sure that method can't be reentered, so that the ordering 586 // in the queue can't be messed up. 587 writing = true; 588 while (!bufferedWrites.isEmpty()) { 589 ChannelWrite write = bufferedWrites.poll(); 590 ctx.write(write.msg, write.promise); 591 } 592 assert bufferedWrites.isEmpty(); 593 bufferedWrites = null; 594 if (flushRequested) { 595 ctx.flush(); 596 } 597 // Removal has to happen last as the above writes will likely trigger 598 // new writes that have to be added to the end of queue in order to not 599 // mess up the ordering. 600 ctx.pipeline().remove(this); 601 } 602 603 private static class ChannelWrite { 604 Object msg; 605 ChannelPromise promise; 606 607 ChannelWrite(Object msg, ChannelPromise promise) { 608 this.msg = msg; 609 this.promise = promise; 610 } 611 } 612 } 613 614 /** 615 * Buffers all writes until the TLS Handshake is complete. 616 */ 617 private static class BufferUntilTlsNegotiatedHandler extends AbstractBufferingHandler 618 implements ProtocolNegotiator.Handler { 619 620 private final GrpcHttp2ConnectionHandler grpcHandler; 621 622 BufferUntilTlsNegotiatedHandler( 623 ChannelHandler bootstrapHandler, GrpcHttp2ConnectionHandler grpcHandler) { 624 super(bootstrapHandler); 625 this.grpcHandler = grpcHandler; 626 } 627 628 @Override 629 public AsciiString scheme() { 630 return Utils.HTTPS; 631 } 632 633 @Override 634 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 635 if (evt instanceof SslHandshakeCompletionEvent) { 636 SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt; 637 if (handshakeEvent.isSuccess()) { 638 SslHandler handler = ctx.pipeline().get(SslHandler.class); 639 if (NEXT_PROTOCOL_VERSIONS.contains(handler.applicationProtocol())) { 640 // Successfully negotiated the protocol. 641 logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null); 642 643 // Wait until negotiation is complete to add gRPC. If added too early, HTTP/2 writes 644 // will fail before we see the userEvent, and the channel is closed down prematurely. 645 ctx.pipeline().addBefore(ctx.name(), null, grpcHandler); 646 647 SSLSession session = handler.engine().getSession(); 648 // Successfully negotiated the protocol. 649 // Notify about completion and pass down SSLSession in attributes. 650 grpcHandler.handleProtocolNegotiationCompleted( 651 Attributes.newBuilder() 652 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) 653 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 654 .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) 655 .build(), 656 new InternalChannelz.Security(new InternalChannelz.Tls(session))); 657 writeBufferedAndRemove(ctx); 658 } else { 659 Exception ex = new Exception( 660 "Failed ALPN negotiation: Unable to find compatible protocol."); 661 logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex); 662 fail(ctx, ex); 663 } 664 } else { 665 fail(ctx, handshakeEvent.cause()); 666 } 667 } 668 super.userEventTriggered(ctx, evt); 669 } 670 } 671 672 /** 673 * Buffers all writes until the {@link io.netty.channel.Channel} is active. 674 */ 675 private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler 676 implements ProtocolNegotiator.Handler { 677 678 private final GrpcHttp2ConnectionHandler handler; 679 680 BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) { 681 super(handler); 682 this.handler = handler; 683 } 684 685 @Override 686 public AsciiString scheme() { 687 return Utils.HTTP; 688 } 689 690 @Override 691 public void handlerAdded(ChannelHandlerContext ctx) { 692 writeBufferedAndRemove(ctx); 693 } 694 695 @Override 696 public void channelActive(ChannelHandlerContext ctx) throws Exception { 697 writeBufferedAndRemove(ctx); 698 handler.handleProtocolNegotiationCompleted( 699 Attributes 700 .newBuilder() 701 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 702 .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) 703 .build(), 704 /*securityInfo=*/ null); 705 super.channelActive(ctx); 706 } 707 } 708 709 /** 710 * Buffers all writes until the HTTP to HTTP/2 upgrade is complete. 711 */ 712 private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler 713 implements ProtocolNegotiator.Handler { 714 715 private final GrpcHttp2ConnectionHandler grpcHandler; 716 717 BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) { 718 super(handler); 719 this.grpcHandler = grpcHandler; 720 } 721 722 @Override 723 public AsciiString scheme() { 724 return Utils.HTTP; 725 } 726 727 @Override 728 public void channelActive(ChannelHandlerContext ctx) throws Exception { 729 // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request 730 // which causes the upgrade headers to be added 731 DefaultHttpRequest upgradeTrigger = 732 new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); 733 ctx.writeAndFlush(upgradeTrigger); 734 super.channelActive(ctx); 735 } 736 737 @Override 738 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 739 if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) { 740 writeBufferedAndRemove(ctx); 741 grpcHandler.handleProtocolNegotiationCompleted( 742 Attributes 743 .newBuilder() 744 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 745 .set(CallCredentials.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) 746 .build(), 747 /*securityInfo=*/ null); 748 } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { 749 fail(ctx, unavailableException("HTTP/2 upgrade rejected")); 750 } 751 super.userEventTriggered(ctx, evt); 752 } 753 } 754 } 755