1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.http.nio.netty.internal; 17 18 import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.newSslHandler; 19 20 import io.netty.buffer.ByteBufAllocator; 21 import io.netty.channel.Channel; 22 import io.netty.channel.ChannelHandler; 23 import io.netty.channel.EventLoop; 24 import io.netty.channel.pool.ChannelPool; 25 import io.netty.channel.pool.ChannelPoolHandler; 26 import io.netty.handler.ssl.SslContext; 27 import io.netty.handler.ssl.SslHandler; 28 import io.netty.util.AttributeKey; 29 import io.netty.util.concurrent.Future; 30 import io.netty.util.concurrent.Promise; 31 import java.net.URI; 32 import software.amazon.awssdk.annotations.SdkInternalApi; 33 import software.amazon.awssdk.annotations.SdkTestInternalApi; 34 import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger; 35 import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils; 36 import software.amazon.awssdk.utils.StringUtils; 37 38 /** 39 * Connection pool that knows how to establish a tunnel using the HTTP CONNECT method. 40 */ 41 @SdkInternalApi 42 public class Http1TunnelConnectionPool implements ChannelPool { 43 static final AttributeKey<Boolean> TUNNEL_ESTABLISHED_KEY = NettyUtils.getOrCreateAttributeKey( 44 "aws.http.nio.netty.async.Http1TunnelConnectionPool.tunnelEstablished"); 45 46 private static final NettyClientLogger log = NettyClientLogger.getLogger(Http1TunnelConnectionPool.class); 47 48 private final EventLoop eventLoop; 49 private final ChannelPool delegate; 50 private final SslContext sslContext; 51 private final URI proxyAddress; 52 private final String proxyUser; 53 private final String proxyPassword; 54 private final URI remoteAddress; 55 private final ChannelPoolHandler handler; 56 private final InitHandlerSupplier initHandlerSupplier; 57 private final NettyConfiguration nettyConfiguration; 58 Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, String proxyUsername, String proxyPassword, URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration)59 public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, 60 URI proxyAddress, String proxyUsername, String proxyPassword, 61 URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration) { 62 this(eventLoop, delegate, sslContext, 63 proxyAddress, proxyUsername, proxyPassword, remoteAddress, handler, 64 ProxyTunnelInitHandler::new, nettyConfiguration); 65 } 66 Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration)67 public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, 68 URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler, 69 NettyConfiguration nettyConfiguration) { 70 this(eventLoop, delegate, sslContext, 71 proxyAddress, null, null, remoteAddress, handler, 72 ProxyTunnelInitHandler::new, nettyConfiguration); 73 74 } 75 76 @SdkTestInternalApi Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, String proxyUser, String proxyPassword, URI remoteAddress, ChannelPoolHandler handler, InitHandlerSupplier initHandlerSupplier, NettyConfiguration nettyConfiguration)77 Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, 78 URI proxyAddress, String proxyUser, String proxyPassword, URI remoteAddress, 79 ChannelPoolHandler handler, InitHandlerSupplier initHandlerSupplier, 80 NettyConfiguration nettyConfiguration) { 81 this.eventLoop = eventLoop; 82 this.delegate = delegate; 83 this.sslContext = sslContext; 84 this.proxyAddress = proxyAddress; 85 this.proxyUser = proxyUser; 86 this.proxyPassword = proxyPassword; 87 this.remoteAddress = remoteAddress; 88 this.handler = handler; 89 this.initHandlerSupplier = initHandlerSupplier; 90 this.nettyConfiguration = nettyConfiguration; 91 } 92 93 @Override acquire()94 public Future<Channel> acquire() { 95 return acquire(eventLoop.newPromise()); 96 } 97 98 @Override acquire(Promise<Channel> promise)99 public Future<Channel> acquire(Promise<Channel> promise) { 100 delegate.acquire(eventLoop.newPromise()).addListener((Future<Channel> f) -> { 101 if (f.isSuccess()) { 102 setupChannel(f.getNow(), promise); 103 } else { 104 promise.setFailure(f.cause()); 105 } 106 }); 107 return promise; 108 } 109 110 @Override release(Channel channel)111 public Future<Void> release(Channel channel) { 112 return release(channel, eventLoop.newPromise()); 113 } 114 115 @Override release(Channel channel, Promise<Void> promise)116 public Future<Void> release(Channel channel, Promise<Void> promise) { 117 return delegate.release(channel, promise); 118 } 119 120 @Override close()121 public void close() { 122 delegate.close(); 123 } 124 setupChannel(Channel ch, Promise<Channel> acquirePromise)125 private void setupChannel(Channel ch, Promise<Channel> acquirePromise) { 126 if (isTunnelEstablished(ch)) { 127 log.debug(ch, () -> String.format("Tunnel already established for %s", ch.id().asShortText())); 128 acquirePromise.setSuccess(ch); 129 return; 130 } 131 132 log.debug(ch, () -> String.format("Tunnel not yet established for channel %s. Establishing tunnel now.", 133 ch.id().asShortText())); 134 135 Promise<Channel> tunnelEstablishedPromise = eventLoop.newPromise(); 136 137 SslHandler sslHandler = createSslHandlerIfNeeded(ch.alloc()); 138 if (sslHandler != null) { 139 ch.pipeline().addLast(sslHandler); 140 } 141 ch.pipeline().addLast(initHandlerSupplier.newInitHandler(delegate, proxyUser, proxyPassword, remoteAddress, 142 tunnelEstablishedPromise)); 143 tunnelEstablishedPromise.addListener((Future<Channel> f) -> { 144 if (f.isSuccess()) { 145 Channel tunnel = f.getNow(); 146 handler.channelCreated(tunnel); 147 tunnel.attr(TUNNEL_ESTABLISHED_KEY).set(true); 148 acquirePromise.setSuccess(tunnel); 149 } else { 150 ch.close(); 151 delegate.release(ch); 152 153 Throwable cause = f.cause(); 154 log.error(ch, () -> String.format("Unable to establish tunnel for channel %s", ch.id().asShortText()), cause); 155 acquirePromise.setFailure(cause); 156 } 157 }); 158 } 159 createSslHandlerIfNeeded(ByteBufAllocator alloc)160 private SslHandler createSslHandlerIfNeeded(ByteBufAllocator alloc) { 161 if (sslContext == null) { 162 return null; 163 } 164 165 String scheme = proxyAddress.getScheme(); 166 167 if (!"https".equals(StringUtils.lowerCase(scheme))) { 168 return null; 169 } 170 171 return newSslHandler(sslContext, alloc, proxyAddress.getHost(), proxyAddress.getPort(), 172 nettyConfiguration.tlsHandshakeTimeout()); 173 } 174 isTunnelEstablished(Channel ch)175 private static boolean isTunnelEstablished(Channel ch) { 176 Boolean established = ch.attr(TUNNEL_ESTABLISHED_KEY).get(); 177 return Boolean.TRUE.equals(established); 178 } 179 180 @SdkTestInternalApi 181 @FunctionalInterface 182 interface InitHandlerSupplier { newInitHandler(ChannelPool sourcePool, String proxyUsername, String proxyPassword, URI remoteAddress, Promise<Channel> tunnelInitFuture)183 ChannelHandler newInitHandler(ChannelPool sourcePool, String proxyUsername, String proxyPassword, URI remoteAddress, 184 Promise<Channel> tunnelInitFuture); 185 } 186 } 187