• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.http.nio.netty.internal;
17 
18 import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.newSslHandler;
19 
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandler;
23 import io.netty.channel.EventLoop;
24 import io.netty.channel.pool.ChannelPool;
25 import io.netty.channel.pool.ChannelPoolHandler;
26 import io.netty.handler.ssl.SslContext;
27 import io.netty.handler.ssl.SslHandler;
28 import io.netty.util.AttributeKey;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.Promise;
31 import java.net.URI;
32 import software.amazon.awssdk.annotations.SdkInternalApi;
33 import software.amazon.awssdk.annotations.SdkTestInternalApi;
34 import software.amazon.awssdk.http.nio.netty.internal.utils.NettyClientLogger;
35 import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
36 import software.amazon.awssdk.utils.StringUtils;
37 
38 /**
39  * Connection pool that knows how to establish a tunnel using the HTTP CONNECT method.
40  */
41 @SdkInternalApi
42 public class Http1TunnelConnectionPool implements ChannelPool {
43     static final AttributeKey<Boolean> TUNNEL_ESTABLISHED_KEY = NettyUtils.getOrCreateAttributeKey(
44             "aws.http.nio.netty.async.Http1TunnelConnectionPool.tunnelEstablished");
45 
46     private static final NettyClientLogger log = NettyClientLogger.getLogger(Http1TunnelConnectionPool.class);
47 
48     private final EventLoop eventLoop;
49     private final ChannelPool delegate;
50     private final SslContext sslContext;
51     private final URI proxyAddress;
52     private final String proxyUser;
53     private final String proxyPassword;
54     private final URI remoteAddress;
55     private final ChannelPoolHandler handler;
56     private final InitHandlerSupplier initHandlerSupplier;
57     private final NettyConfiguration nettyConfiguration;
58 
Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, String proxyUsername, String proxyPassword, URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration)59     public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext,
60                                      URI proxyAddress, String proxyUsername, String proxyPassword,
61                                      URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration) {
62         this(eventLoop, delegate, sslContext,
63              proxyAddress, proxyUsername, proxyPassword, remoteAddress, handler,
64              ProxyTunnelInitHandler::new, nettyConfiguration);
65     }
66 
Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler, NettyConfiguration nettyConfiguration)67     public Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext,
68                                      URI proxyAddress, URI remoteAddress, ChannelPoolHandler handler,
69                                      NettyConfiguration nettyConfiguration) {
70         this(eventLoop, delegate, sslContext,
71              proxyAddress, null, null, remoteAddress, handler,
72              ProxyTunnelInitHandler::new, nettyConfiguration);
73 
74     }
75 
76     @SdkTestInternalApi
Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext, URI proxyAddress, String proxyUser, String proxyPassword, URI remoteAddress, ChannelPoolHandler handler, InitHandlerSupplier initHandlerSupplier, NettyConfiguration nettyConfiguration)77     Http1TunnelConnectionPool(EventLoop eventLoop, ChannelPool delegate, SslContext sslContext,
78                               URI proxyAddress, String proxyUser, String proxyPassword, URI remoteAddress,
79                               ChannelPoolHandler handler, InitHandlerSupplier initHandlerSupplier,
80                               NettyConfiguration nettyConfiguration) {
81         this.eventLoop = eventLoop;
82         this.delegate = delegate;
83         this.sslContext = sslContext;
84         this.proxyAddress = proxyAddress;
85         this.proxyUser = proxyUser;
86         this.proxyPassword = proxyPassword;
87         this.remoteAddress = remoteAddress;
88         this.handler = handler;
89         this.initHandlerSupplier = initHandlerSupplier;
90         this.nettyConfiguration = nettyConfiguration;
91     }
92 
93     @Override
acquire()94     public Future<Channel> acquire() {
95         return acquire(eventLoop.newPromise());
96     }
97 
98     @Override
acquire(Promise<Channel> promise)99     public Future<Channel> acquire(Promise<Channel> promise) {
100         delegate.acquire(eventLoop.newPromise()).addListener((Future<Channel> f) -> {
101             if (f.isSuccess()) {
102                 setupChannel(f.getNow(), promise);
103             } else {
104                 promise.setFailure(f.cause());
105             }
106         });
107         return promise;
108     }
109 
110     @Override
release(Channel channel)111     public Future<Void> release(Channel channel) {
112         return release(channel, eventLoop.newPromise());
113     }
114 
115     @Override
release(Channel channel, Promise<Void> promise)116     public Future<Void> release(Channel channel, Promise<Void> promise) {
117         return delegate.release(channel, promise);
118     }
119 
120     @Override
close()121     public void close() {
122         delegate.close();
123     }
124 
setupChannel(Channel ch, Promise<Channel> acquirePromise)125     private void setupChannel(Channel ch, Promise<Channel> acquirePromise) {
126         if (isTunnelEstablished(ch)) {
127             log.debug(ch, () -> String.format("Tunnel already established for %s", ch.id().asShortText()));
128             acquirePromise.setSuccess(ch);
129             return;
130         }
131 
132         log.debug(ch, () -> String.format("Tunnel not yet established for channel %s. Establishing tunnel now.",
133                                           ch.id().asShortText()));
134 
135         Promise<Channel> tunnelEstablishedPromise = eventLoop.newPromise();
136 
137         SslHandler sslHandler = createSslHandlerIfNeeded(ch.alloc());
138         if (sslHandler != null) {
139             ch.pipeline().addLast(sslHandler);
140         }
141         ch.pipeline().addLast(initHandlerSupplier.newInitHandler(delegate, proxyUser, proxyPassword, remoteAddress,
142                                                                     tunnelEstablishedPromise));
143         tunnelEstablishedPromise.addListener((Future<Channel> f) -> {
144             if (f.isSuccess()) {
145                 Channel tunnel = f.getNow();
146                 handler.channelCreated(tunnel);
147                 tunnel.attr(TUNNEL_ESTABLISHED_KEY).set(true);
148                 acquirePromise.setSuccess(tunnel);
149             } else {
150                 ch.close();
151                 delegate.release(ch);
152 
153                 Throwable cause = f.cause();
154                 log.error(ch, () -> String.format("Unable to establish tunnel for channel %s", ch.id().asShortText()), cause);
155                 acquirePromise.setFailure(cause);
156             }
157         });
158     }
159 
createSslHandlerIfNeeded(ByteBufAllocator alloc)160     private SslHandler createSslHandlerIfNeeded(ByteBufAllocator alloc) {
161         if (sslContext == null) {
162             return null;
163         }
164 
165         String scheme = proxyAddress.getScheme();
166 
167         if (!"https".equals(StringUtils.lowerCase(scheme))) {
168             return null;
169         }
170 
171         return newSslHandler(sslContext, alloc, proxyAddress.getHost(), proxyAddress.getPort(),
172                              nettyConfiguration.tlsHandshakeTimeout());
173     }
174 
isTunnelEstablished(Channel ch)175     private static boolean isTunnelEstablished(Channel ch) {
176         Boolean established = ch.attr(TUNNEL_ESTABLISHED_KEY).get();
177         return Boolean.TRUE.equals(established);
178     }
179 
180     @SdkTestInternalApi
181     @FunctionalInterface
182     interface InitHandlerSupplier {
newInitHandler(ChannelPool sourcePool, String proxyUsername, String proxyPassword, URI remoteAddress, Promise<Channel> tunnelInitFuture)183         ChannelHandler newInitHandler(ChannelPool sourcePool, String proxyUsername, String proxyPassword, URI remoteAddress,
184                                       Promise<Channel> tunnelInitFuture);
185     }
186 }
187