1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 import static io.grpc.netty.GrpcSslContexts.NEXT_PROTOCOL_VERSIONS; 21 22 import com.google.common.annotations.VisibleForTesting; 23 import com.google.common.base.Preconditions; 24 import io.grpc.Attributes; 25 import io.grpc.Grpc; 26 import io.grpc.Internal; 27 import io.grpc.InternalChannelz; 28 import io.grpc.SecurityLevel; 29 import io.grpc.Status; 30 import io.grpc.internal.GrpcAttributes; 31 import io.grpc.internal.GrpcUtil; 32 import io.netty.channel.ChannelDuplexHandler; 33 import io.netty.channel.ChannelFuture; 34 import io.netty.channel.ChannelFutureListener; 35 import io.netty.channel.ChannelHandler; 36 import io.netty.channel.ChannelHandlerAdapter; 37 import io.netty.channel.ChannelHandlerContext; 38 import io.netty.channel.ChannelInboundHandler; 39 import io.netty.channel.ChannelInboundHandlerAdapter; 40 import io.netty.channel.ChannelPipeline; 41 import io.netty.channel.ChannelPromise; 42 import io.netty.handler.codec.http.DefaultHttpRequest; 43 import io.netty.handler.codec.http.HttpClientCodec; 44 import io.netty.handler.codec.http.HttpClientUpgradeHandler; 45 import io.netty.handler.codec.http.HttpMethod; 46 import io.netty.handler.codec.http.HttpVersion; 47 import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; 48 import io.netty.handler.proxy.HttpProxyHandler; 49 import io.netty.handler.proxy.ProxyConnectionEvent; 50 import io.netty.handler.proxy.ProxyHandler; 51 import io.netty.handler.ssl.OpenSsl; 52 import io.netty.handler.ssl.OpenSslEngine; 53 import io.netty.handler.ssl.SslContext; 54 import io.netty.handler.ssl.SslHandler; 55 import io.netty.handler.ssl.SslHandshakeCompletionEvent; 56 import io.netty.util.AsciiString; 57 import io.netty.util.ReferenceCountUtil; 58 import java.net.SocketAddress; 59 import java.net.URI; 60 import java.util.ArrayDeque; 61 import java.util.Arrays; 62 import java.util.Queue; 63 import java.util.logging.Level; 64 import java.util.logging.Logger; 65 import javax.annotation.Nullable; 66 import javax.net.ssl.SSLEngine; 67 import javax.net.ssl.SSLParameters; 68 import javax.net.ssl.SSLSession; 69 70 /** 71 * Common {@link ProtocolNegotiator}s used by gRPC. 72 */ 73 @Internal 74 public final class ProtocolNegotiators { 75 private static final Logger log = Logger.getLogger(ProtocolNegotiators.class.getName()); 76 ProtocolNegotiators()77 private ProtocolNegotiators() { 78 } 79 80 /** 81 * Create a server plaintext handler for gRPC. 82 */ serverPlaintext()83 public static ProtocolNegotiator serverPlaintext() { 84 return new ProtocolNegotiator() { 85 @Override 86 public Handler newHandler(final GrpcHttp2ConnectionHandler handler) { 87 class PlaintextHandler extends ChannelHandlerAdapter implements Handler { 88 @Override 89 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 90 // Set sttributes before replace to be sure we pass it before accepting any requests. 91 handler.handleProtocolNegotiationCompleted(Attributes.newBuilder() 92 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 93 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) 94 .build(), 95 /*securityInfo=*/ null); 96 // Just replace this handler with the gRPC handler. 97 ctx.pipeline().replace(this, null, handler); 98 } 99 100 @Override 101 public AsciiString scheme() { 102 return Utils.HTTP; 103 } 104 } 105 106 return new PlaintextHandler(); 107 } 108 109 @Override 110 public void close() {} 111 }; 112 } 113 114 /** 115 * Create a server TLS handler for HTTP/2 capable of using ALPN/NPN. 116 */ 117 public static ProtocolNegotiator serverTls(final SslContext sslContext) { 118 Preconditions.checkNotNull(sslContext, "sslContext"); 119 return new ProtocolNegotiator() { 120 @Override 121 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 122 return new ServerTlsHandler(sslContext, handler); 123 } 124 125 @Override 126 public void close() {} 127 }; 128 } 129 130 @VisibleForTesting 131 static final class ServerTlsHandler extends ChannelInboundHandlerAdapter 132 implements ProtocolNegotiator.Handler { 133 private final GrpcHttp2ConnectionHandler grpcHandler; 134 private final SslContext sslContext; 135 136 ServerTlsHandler(SslContext sslContext, GrpcHttp2ConnectionHandler grpcHandler) { 137 this.sslContext = sslContext; 138 this.grpcHandler = grpcHandler; 139 } 140 141 @Override 142 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 143 super.handlerAdded(ctx); 144 145 SSLEngine sslEngine = sslContext.newEngine(ctx.alloc()); 146 ctx.pipeline().addFirst(new SslHandler(sslEngine, false)); 147 } 148 149 @Override 150 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 151 fail(ctx, cause); 152 } 153 154 @Override 155 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 156 if (evt instanceof SslHandshakeCompletionEvent) { 157 SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt; 158 if (handshakeEvent.isSuccess()) { 159 if (NEXT_PROTOCOL_VERSIONS.contains(sslHandler(ctx.pipeline()).applicationProtocol())) { 160 SSLSession session = sslHandler(ctx.pipeline()).engine().getSession(); 161 // Successfully negotiated the protocol. 162 // Notify about completion and pass down SSLSession in attributes. 163 grpcHandler.handleProtocolNegotiationCompleted( 164 Attributes.newBuilder() 165 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) 166 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 167 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) 168 .build(), 169 new InternalChannelz.Security(new InternalChannelz.Tls(session))); 170 // Replace this handler with the GRPC handler. 171 ctx.pipeline().replace(this, null, grpcHandler); 172 } else { 173 fail(ctx, new Exception( 174 "Failed protocol negotiation: Unable to find compatible protocol.")); 175 } 176 } else { 177 fail(ctx, handshakeEvent.cause()); 178 } 179 } 180 super.userEventTriggered(ctx, evt); 181 } 182 183 private SslHandler sslHandler(ChannelPipeline pipeline) { 184 return pipeline.get(SslHandler.class); 185 } 186 187 private void fail(ChannelHandlerContext ctx, Throwable exception) { 188 logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed for new client.", exception); 189 ctx.close(); 190 } 191 192 @Override 193 public AsciiString scheme() { 194 return Utils.HTTPS; 195 } 196 } 197 198 /** 199 * Returns a {@link ProtocolNegotiator} that does HTTP CONNECT proxy negotiation. 200 */ 201 public static ProtocolNegotiator httpProxy(final SocketAddress proxyAddress, 202 final @Nullable String proxyUsername, final @Nullable String proxyPassword, 203 final ProtocolNegotiator negotiator) { 204 Preconditions.checkNotNull(proxyAddress, "proxyAddress"); 205 Preconditions.checkNotNull(negotiator, "negotiator"); 206 class ProxyNegotiator implements ProtocolNegotiator { 207 @Override 208 public Handler newHandler(GrpcHttp2ConnectionHandler http2Handler) { 209 HttpProxyHandler proxyHandler; 210 if (proxyUsername == null || proxyPassword == null) { 211 proxyHandler = new HttpProxyHandler(proxyAddress); 212 } else { 213 proxyHandler = new HttpProxyHandler(proxyAddress, proxyUsername, proxyPassword); 214 } 215 return new BufferUntilProxyTunnelledHandler( 216 proxyHandler, negotiator.newHandler(http2Handler)); 217 } 218 219 // This method is not normally called, because we use httpProxy on a per-connection basis in 220 // NettyChannelBuilder. Instead, we expect `negotiator' to be closed by NettyTransportFactory. 221 @Override 222 public void close() { 223 negotiator.close(); 224 } 225 } 226 227 return new ProxyNegotiator(); 228 } 229 230 /** 231 * Buffers all writes until the HTTP CONNECT tunnel is established. 232 */ 233 static final class BufferUntilProxyTunnelledHandler extends AbstractBufferingHandler 234 implements ProtocolNegotiator.Handler { 235 private final ProtocolNegotiator.Handler originalHandler; 236 237 public BufferUntilProxyTunnelledHandler( 238 ProxyHandler proxyHandler, ProtocolNegotiator.Handler handler) { 239 super(proxyHandler, handler); 240 this.originalHandler = handler; 241 } 242 243 244 @Override 245 public AsciiString scheme() { 246 return originalHandler.scheme(); 247 } 248 249 @Override 250 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 251 if (evt instanceof ProxyConnectionEvent) { 252 writeBufferedAndRemove(ctx); 253 } 254 super.userEventTriggered(ctx, evt); 255 } 256 257 @Override 258 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 259 fail(ctx, unavailableException("Connection broken while trying to CONNECT through proxy")); 260 super.channelInactive(ctx); 261 } 262 263 @Override 264 public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { 265 if (ctx.channel().isActive()) { // This may be a notification that the socket was closed 266 fail(ctx, unavailableException("Channel closed while trying to CONNECT through proxy")); 267 } 268 super.close(ctx, future); 269 } 270 } 271 272 /** 273 * Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will 274 * be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel} 275 * may happen immediately, even before the TLS Handshake is complete. 276 */ 277 public static ProtocolNegotiator tls(SslContext sslContext) { 278 return new TlsNegotiator(sslContext); 279 } 280 281 @VisibleForTesting 282 static final class TlsNegotiator implements ProtocolNegotiator { 283 private final SslContext sslContext; 284 285 TlsNegotiator(SslContext sslContext) { 286 this.sslContext = checkNotNull(sslContext, "sslContext"); 287 } 288 289 @VisibleForTesting 290 HostPort parseAuthority(String authority) { 291 URI uri = GrpcUtil.authorityToUri(Preconditions.checkNotNull(authority, "authority")); 292 String host; 293 int port; 294 if (uri.getHost() != null) { 295 host = uri.getHost(); 296 port = uri.getPort(); 297 } else { 298 /* 299 * Implementation note: We pick -1 as the port here rather than deriving it from the 300 * original socket address. The SSL engine doens't use this port number when contacting the 301 * remote server, but rather it is used for other things like SSL Session caching. When an 302 * invalid authority is provided (like "bad_cert"), picking the original port and passing it 303 * in would mean that the port might used under the assumption that it was correct. By 304 * using -1 here, it forces the SSL implementation to treat it as invalid. 305 */ 306 host = authority; 307 port = -1; 308 } 309 return new HostPort(host, port); 310 } 311 312 @Override 313 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 314 final HostPort hostPort = parseAuthority(handler.getAuthority()); 315 316 ChannelHandler sslBootstrap = new ChannelHandlerAdapter() { 317 @Override 318 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 319 SSLEngine sslEngine = sslContext.newEngine(ctx.alloc(), hostPort.host, hostPort.port); 320 SSLParameters sslParams = sslEngine.getSSLParameters(); 321 sslParams.setEndpointIdentificationAlgorithm("HTTPS"); 322 sslEngine.setSSLParameters(sslParams); 323 ctx.pipeline().replace(this, null, new SslHandler(sslEngine, false)); 324 } 325 }; 326 return new BufferUntilTlsNegotiatedHandler(sslBootstrap, handler); 327 } 328 329 @Override 330 public void close() {} 331 } 332 333 /** A tuple of (host, port). */ 334 @VisibleForTesting 335 static final class HostPort { 336 final String host; 337 final int port; 338 339 public HostPort(String host, int port) { 340 this.host = host; 341 this.port = port; 342 } 343 } 344 345 /** 346 * Returns a {@link ProtocolNegotiator} used for upgrading to HTTP/2 from HTTP/1.x. 347 */ 348 public static ProtocolNegotiator plaintextUpgrade() { 349 return new PlaintextUpgradeNegotiator(); 350 } 351 352 static final class PlaintextUpgradeNegotiator implements ProtocolNegotiator { 353 @Override 354 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 355 // Register the plaintext upgrader 356 Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler); 357 HttpClientCodec httpClientCodec = new HttpClientCodec(); 358 final HttpClientUpgradeHandler upgrader = 359 new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000); 360 return new BufferingHttp2UpgradeHandler(upgrader, handler); 361 } 362 363 @Override 364 public void close() {} 365 } 366 367 /** 368 * Returns a {@link ChannelHandler} that ensures that the {@code handler} is added to the 369 * pipeline writes to the {@link io.netty.channel.Channel} may happen immediately, even before it 370 * is active. 371 */ 372 public static ProtocolNegotiator plaintext() { 373 return new PlaintextNegotiator(); 374 } 375 376 static final class PlaintextNegotiator implements ProtocolNegotiator { 377 @Override 378 public Handler newHandler(GrpcHttp2ConnectionHandler handler) { 379 return new BufferUntilChannelActiveHandler(handler); 380 } 381 382 @Override 383 public void close() {} 384 } 385 386 private static RuntimeException unavailableException(String msg) { 387 return Status.UNAVAILABLE.withDescription(msg).asRuntimeException(); 388 } 389 390 @VisibleForTesting 391 static void logSslEngineDetails(Level level, ChannelHandlerContext ctx, String msg, 392 @Nullable Throwable t) { 393 if (!log.isLoggable(level)) { 394 return; 395 } 396 397 SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); 398 SSLEngine engine = sslHandler.engine(); 399 400 StringBuilder builder = new StringBuilder(msg); 401 builder.append("\nSSLEngine Details: [\n"); 402 if (engine instanceof OpenSslEngine) { 403 builder.append(" OpenSSL, "); 404 builder.append("Version: 0x").append(Integer.toHexString(OpenSsl.version())); 405 builder.append(" (").append(OpenSsl.versionString()).append("), "); 406 builder.append("ALPN supported: ").append(OpenSsl.isAlpnSupported()); 407 } else if (JettyTlsUtil.isJettyAlpnConfigured()) { 408 builder.append(" Jetty ALPN"); 409 } else if (JettyTlsUtil.isJettyNpnConfigured()) { 410 builder.append(" Jetty NPN"); 411 } else if (JettyTlsUtil.isJava9AlpnAvailable()) { 412 builder.append(" JDK9 ALPN"); 413 } 414 builder.append("\n TLS Protocol: "); 415 builder.append(engine.getSession().getProtocol()); 416 builder.append("\n Application Protocol: "); 417 builder.append(sslHandler.applicationProtocol()); 418 builder.append("\n Need Client Auth: " ); 419 builder.append(engine.getNeedClientAuth()); 420 builder.append("\n Want Client Auth: "); 421 builder.append(engine.getWantClientAuth()); 422 builder.append("\n Supported protocols="); 423 builder.append(Arrays.toString(engine.getSupportedProtocols())); 424 builder.append("\n Enabled protocols="); 425 builder.append(Arrays.toString(engine.getEnabledProtocols())); 426 builder.append("\n Supported ciphers="); 427 builder.append(Arrays.toString(engine.getSupportedCipherSuites())); 428 builder.append("\n Enabled ciphers="); 429 builder.append(Arrays.toString(engine.getEnabledCipherSuites())); 430 builder.append("\n]"); 431 432 log.log(level, builder.toString(), t); 433 } 434 435 /** 436 * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or 437 * {@link #fail(ChannelHandlerContext, Throwable)} is called. This handler allows us to 438 * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially 439 * i.e. before it's active or the TLS Handshake is complete. 440 */ 441 public abstract static class AbstractBufferingHandler extends ChannelDuplexHandler { 442 443 private ChannelHandler[] handlers; 444 private Queue<ChannelWrite> bufferedWrites = new ArrayDeque<ChannelWrite>(); 445 private boolean writing; 446 private boolean flushRequested; 447 private Throwable failCause; 448 449 /** 450 * @param handlers the ChannelHandlers are added to the pipeline on channelRegistered and 451 * before this handler. 452 */ 453 protected AbstractBufferingHandler(ChannelHandler... handlers) { 454 this.handlers = handlers; 455 } 456 457 /** 458 * When this channel is registered, we will add all the ChannelHandlers passed into our 459 * constructor to the pipeline. 460 */ 461 @Override 462 public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 463 /** 464 * This check is necessary as a channel may be registered with different event loops during it 465 * lifetime and we only want to configure it once. 466 */ 467 if (handlers != null) { 468 for (ChannelHandler handler : handlers) { 469 ctx.pipeline().addBefore(ctx.name(), null, handler); 470 } 471 ChannelHandler handler0 = handlers[0]; 472 ChannelHandlerContext handler0Ctx = ctx.pipeline().context(handlers[0]); 473 handlers = null; 474 if (handler0Ctx != null) { // The handler may have removed itself immediately 475 if (handler0 instanceof ChannelInboundHandler) { 476 ((ChannelInboundHandler) handler0).channelRegistered(handler0Ctx); 477 } else { 478 handler0Ctx.fireChannelRegistered(); 479 } 480 } 481 } else { 482 super.channelRegistered(ctx); 483 } 484 } 485 486 /** 487 * Do not rely on channel handlers to propagate exceptions to us. 488 * {@link NettyClientHandler} is an example of a class that does not propagate exceptions. 489 * Add a listener to the connect future directly and do appropriate error handling. 490 */ 491 @Override 492 public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress, 493 SocketAddress localAddress, ChannelPromise promise) throws Exception { 494 super.connect(ctx, remoteAddress, localAddress, promise); 495 promise.addListener(new ChannelFutureListener() { 496 @Override 497 public void operationComplete(ChannelFuture future) throws Exception { 498 if (!future.isSuccess()) { 499 fail(ctx, future.cause()); 500 } 501 } 502 }); 503 } 504 505 /** 506 * If we encounter an exception, then notify all buffered writes that we failed. 507 */ 508 @Override 509 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 510 fail(ctx, cause); 511 } 512 513 /** 514 * If this channel becomes inactive, then notify all buffered writes that we failed. 515 */ 516 @Override 517 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 518 fail(ctx, unavailableException("Connection broken while performing protocol negotiation")); 519 super.channelInactive(ctx); 520 } 521 522 /** 523 * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is 524 * called, or we have somehow failed. If we have already failed in the past, then the write 525 * will fail immediately. 526 */ 527 @Override 528 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 529 throws Exception { 530 /** 531 * This check handles a race condition between Channel.write (in the calling thread) and the 532 * removal of this handler (in the event loop thread). 533 * The problem occurs in e.g. this sequence: 534 * 1) [caller thread] The write method identifies the context for this handler 535 * 2) [event loop] This handler removes itself from the pipeline 536 * 3) [caller thread] The write method delegates to the invoker to call the write method in 537 * the event loop thread. When this happens, we identify that this handler has been 538 * removed with "bufferedWrites == null". 539 */ 540 if (failCause != null) { 541 promise.setFailure(failCause); 542 ReferenceCountUtil.release(msg); 543 } else if (bufferedWrites == null) { 544 super.write(ctx, msg, promise); 545 } else { 546 bufferedWrites.add(new ChannelWrite(msg, promise)); 547 } 548 } 549 550 /** 551 * Calls to this method will not trigger an immediate flush. The flush will be deferred until 552 * {@link #writeBufferedAndRemove(ChannelHandlerContext)}. 553 */ 554 @Override 555 public void flush(ChannelHandlerContext ctx) { 556 /** 557 * Swallowing any flushes is not only an optimization but also required 558 * for the SslHandler to work correctly. If the SslHandler receives multiple 559 * flushes while the handshake is still ongoing, then the handshake "randomly" 560 * times out. Not sure at this point why this is happening. Doing a single flush 561 * seems to work but multiple flushes don't ... 562 */ 563 if (bufferedWrites == null) { 564 ctx.flush(); 565 } else { 566 flushRequested = true; 567 } 568 } 569 570 /** 571 * If we are still performing protocol negotiation, then this will propagate failures to all 572 * buffered writes. 573 */ 574 @Override 575 public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { 576 if (ctx.channel().isActive()) { // This may be a notification that the socket was closed 577 fail(ctx, unavailableException("Channel closed while performing protocol negotiation")); 578 } 579 super.close(ctx, future); 580 } 581 582 /** 583 * Propagate failures to all buffered writes. 584 */ 585 protected final void fail(ChannelHandlerContext ctx, Throwable cause) { 586 if (failCause == null) { 587 failCause = cause; 588 } 589 if (bufferedWrites != null) { 590 while (!bufferedWrites.isEmpty()) { 591 ChannelWrite write = bufferedWrites.poll(); 592 write.promise.setFailure(cause); 593 ReferenceCountUtil.release(write.msg); 594 } 595 bufferedWrites = null; 596 } 597 598 /** 599 * In case something goes wrong ensure that the channel gets closed as the 600 * NettyClientTransport relies on the channel's close future to get completed. 601 */ 602 ctx.close(); 603 } 604 605 protected final void writeBufferedAndRemove(ChannelHandlerContext ctx) { 606 if (!ctx.channel().isActive() || writing) { 607 return; 608 } 609 // Make sure that method can't be reentered, so that the ordering 610 // in the queue can't be messed up. 611 writing = true; 612 while (!bufferedWrites.isEmpty()) { 613 ChannelWrite write = bufferedWrites.poll(); 614 ctx.write(write.msg, write.promise); 615 } 616 assert bufferedWrites.isEmpty(); 617 bufferedWrites = null; 618 if (flushRequested) { 619 ctx.flush(); 620 } 621 // Removal has to happen last as the above writes will likely trigger 622 // new writes that have to be added to the end of queue in order to not 623 // mess up the ordering. 624 ctx.pipeline().remove(this); 625 } 626 627 private static class ChannelWrite { 628 Object msg; 629 ChannelPromise promise; 630 631 ChannelWrite(Object msg, ChannelPromise promise) { 632 this.msg = msg; 633 this.promise = promise; 634 } 635 } 636 } 637 638 /** 639 * Buffers all writes until the TLS Handshake is complete. 640 */ 641 private static class BufferUntilTlsNegotiatedHandler extends AbstractBufferingHandler 642 implements ProtocolNegotiator.Handler { 643 644 private final GrpcHttp2ConnectionHandler grpcHandler; 645 646 BufferUntilTlsNegotiatedHandler( 647 ChannelHandler bootstrapHandler, GrpcHttp2ConnectionHandler grpcHandler) { 648 super(bootstrapHandler); 649 this.grpcHandler = grpcHandler; 650 } 651 652 @Override 653 public AsciiString scheme() { 654 return Utils.HTTPS; 655 } 656 657 @Override 658 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 659 if (evt instanceof SslHandshakeCompletionEvent) { 660 SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt; 661 if (handshakeEvent.isSuccess()) { 662 SslHandler handler = ctx.pipeline().get(SslHandler.class); 663 if (NEXT_PROTOCOL_VERSIONS.contains(handler.applicationProtocol())) { 664 // Successfully negotiated the protocol. 665 logSslEngineDetails(Level.FINER, ctx, "TLS negotiation succeeded.", null); 666 667 // Wait until negotiation is complete to add gRPC. If added too early, HTTP/2 writes 668 // will fail before we see the userEvent, and the channel is closed down prematurely. 669 ctx.pipeline().addBefore(ctx.name(), null, grpcHandler); 670 671 SSLSession session = handler.engine().getSession(); 672 // Successfully negotiated the protocol. 673 // Notify about completion and pass down SSLSession in attributes. 674 grpcHandler.handleProtocolNegotiationCompleted( 675 Attributes.newBuilder() 676 .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, session) 677 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 678 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) 679 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.PRIVACY_AND_INTEGRITY) 680 .build(), 681 new InternalChannelz.Security(new InternalChannelz.Tls(session))); 682 writeBufferedAndRemove(ctx); 683 } else { 684 Exception ex = new Exception( 685 "Failed ALPN negotiation: Unable to find compatible protocol."); 686 logSslEngineDetails(Level.FINE, ctx, "TLS negotiation failed.", ex); 687 fail(ctx, ex); 688 } 689 } else { 690 fail(ctx, handshakeEvent.cause()); 691 } 692 } 693 super.userEventTriggered(ctx, evt); 694 } 695 } 696 697 /** 698 * Buffers all writes until the {@link io.netty.channel.Channel} is active. 699 */ 700 private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler 701 implements ProtocolNegotiator.Handler { 702 703 private final GrpcHttp2ConnectionHandler handler; 704 705 BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) { 706 super(handler); 707 this.handler = handler; 708 } 709 710 @Override 711 public AsciiString scheme() { 712 return Utils.HTTP; 713 } 714 715 @Override 716 public void handlerAdded(ChannelHandlerContext ctx) { 717 writeBufferedAndRemove(ctx); 718 } 719 720 @Override 721 public void channelActive(ChannelHandlerContext ctx) throws Exception { 722 writeBufferedAndRemove(ctx); 723 handler.handleProtocolNegotiationCompleted( 724 Attributes 725 .newBuilder() 726 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 727 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) 728 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) 729 .build(), 730 /*securityInfo=*/ null); 731 super.channelActive(ctx); 732 } 733 } 734 735 /** 736 * Buffers all writes until the HTTP to HTTP/2 upgrade is complete. 737 */ 738 private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler 739 implements ProtocolNegotiator.Handler { 740 741 private final GrpcHttp2ConnectionHandler grpcHandler; 742 743 BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) { 744 super(handler); 745 this.grpcHandler = grpcHandler; 746 } 747 748 @Override 749 public AsciiString scheme() { 750 return Utils.HTTP; 751 } 752 753 @Override 754 public void channelActive(ChannelHandlerContext ctx) throws Exception { 755 // Trigger the HTTP/1.1 plaintext upgrade protocol by issuing an HTTP request 756 // which causes the upgrade headers to be added 757 DefaultHttpRequest upgradeTrigger = 758 new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); 759 ctx.writeAndFlush(upgradeTrigger); 760 super.channelActive(ctx); 761 } 762 763 @Override 764 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 765 if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) { 766 writeBufferedAndRemove(ctx); 767 grpcHandler.handleProtocolNegotiationCompleted( 768 Attributes 769 .newBuilder() 770 .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress()) 771 .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, ctx.channel().localAddress()) 772 .set(GrpcAttributes.ATTR_SECURITY_LEVEL, SecurityLevel.NONE) 773 .build(), 774 /*securityInfo=*/ null); 775 } else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) { 776 fail(ctx, unavailableException("HTTP/2 upgrade rejected")); 777 } 778 super.userEventTriggered(ctx, evt); 779 } 780 } 781 } 782