• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.http.nio.netty.internal.utils;
17 
18 import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
19 
20 import io.netty.channel.Channel;
21 import io.netty.channel.pool.ChannelPool;
22 import io.netty.util.concurrent.DefaultPromise;
23 import io.netty.util.concurrent.EventExecutor;
24 import io.netty.util.concurrent.Future;
25 import io.netty.util.concurrent.FutureListener;
26 import io.netty.util.concurrent.GlobalEventExecutor;
27 import io.netty.util.concurrent.Promise;
28 import io.netty.util.internal.ObjectUtil;
29 import io.netty.util.internal.ThrowableUtil;
30 import java.nio.channels.ClosedChannelException;
31 import java.util.ArrayDeque;
32 import java.util.Queue;
33 import java.util.concurrent.CompletableFuture;
34 import java.util.concurrent.ScheduledFuture;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import software.amazon.awssdk.http.HttpMetric;
38 import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPool;
39 import software.amazon.awssdk.metrics.MetricCollector;
40 
41 /**
42  * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
43  * number of concurrent connections.
44  */
45 //TODO: Contribute me back to Netty
46 public class BetterFixedChannelPool implements SdkChannelPool {
47     private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
48         new IllegalStateException("Too many outstanding acquire operations"),
49         BetterFixedChannelPool.class, "acquire0(...)");
50     private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace(
51         new TimeoutException("Acquire operation took longer than configured maximum time"),
52         BetterFixedChannelPool.class, "<init>(...)");
53     static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace(
54         new IllegalStateException("BetterFixedChannelPooled was closed"),
55         BetterFixedChannelPool.class, "release(...)");
56     static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace(
57         new IllegalStateException("BetterFixedChannelPooled was closed"),
58         BetterFixedChannelPool.class, "acquire0(...)");
59 
60     public enum AcquireTimeoutAction {
61         /**
62          * Create a new connection when the timeout is detected.
63          */
64         NEW,
65 
66         /**
67          * Fail the {@link Future} of the acquire call with a {@link TimeoutException}.
68          */
69         FAIL
70     }
71 
72     private final EventExecutor executor;
73     private final long acquireTimeoutNanos;
74     private final Runnable timeoutTask;
75     private final SdkChannelPool delegateChannelPool;
76 
77     // There is no need to worry about synchronization as everything that modified the queue or counts is done
78     // by the above EventExecutor.
79     private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<>();
80     private final int maxConnections;
81     private final int maxPendingAcquires;
82     private int acquiredChannelCount;
83     private int pendingAcquireCount;
84     private boolean closed;
85 
86 
BetterFixedChannelPool(Builder builder)87     private BetterFixedChannelPool(Builder builder) {
88         if (builder.maxConnections < 1) {
89             throw new IllegalArgumentException("maxConnections: " + builder.maxConnections + " (expected: >= 1)");
90         }
91         if (builder.maxPendingAcquires < 1) {
92             throw new IllegalArgumentException("maxPendingAcquires: " + builder.maxPendingAcquires + " (expected: >= 1)");
93         }
94         this.delegateChannelPool = builder.channelPool;
95         this.executor = builder.executor;
96         if (builder.action == null && builder.acquireTimeoutMillis == -1) {
97             timeoutTask = null;
98             acquireTimeoutNanos = -1;
99         } else if (builder.action == null && builder.acquireTimeoutMillis != -1) {
100             throw new NullPointerException("action");
101         } else if (builder.action != null && builder.acquireTimeoutMillis < 0) {
102             throw new IllegalArgumentException("acquireTimeoutMillis: " + builder.acquireTimeoutMillis + " (expected: >= 0)");
103         } else {
104             acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(builder.acquireTimeoutMillis);
105             switch (builder.action) {
106                 case FAIL:
107                     timeoutTask = new TimeoutTask() {
108                         @Override
109                         public void onTimeout(AcquireTask task) {
110                             // Fail the promise as we timed out.
111                             task.promise.setFailure(TIMEOUT_EXCEPTION);
112                         }
113                     };
114                     break;
115                 case NEW:
116                     timeoutTask = new TimeoutTask() {
117                         @Override
118                         public void onTimeout(AcquireTask task) {
119                             // Increment the acquire count and delegate to super to actually acquire a Channel which will
120                             // create a new connection.
121                             task.acquired();
122 
123                             delegateChannelPool.acquire(task.promise);
124                         }
125                     };
126                     break;
127                 default:
128                     throw new Error();
129             }
130         }
131         this.maxConnections = builder.maxConnections;
132         this.maxPendingAcquires = builder.maxPendingAcquires;
133     }
134 
135     @Override
acquire()136     public Future<Channel> acquire() {
137         return acquire(new DefaultPromise<>(executor));
138     }
139 
140     @Override
acquire(final Promise<Channel> promise)141     public Future<Channel> acquire(final Promise<Channel> promise) {
142         try {
143             if (executor.inEventLoop()) {
144                 acquire0(promise);
145             } else {
146                 executor.execute(() -> acquire0(promise));
147             }
148         } catch (Throwable cause) {
149             promise.setFailure(cause);
150         }
151         return promise;
152     }
153 
154     @Override
collectChannelPoolMetrics(MetricCollector metrics)155     public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metrics) {
156         CompletableFuture<Void> delegateMetricResult = delegateChannelPool.collectChannelPoolMetrics(metrics);
157         CompletableFuture<Void> result = new CompletableFuture<>();
158         doInEventLoop(executor, () -> {
159             try {
160                 metrics.reportMetric(HttpMetric.MAX_CONCURRENCY, this.maxConnections);
161                 metrics.reportMetric(HttpMetric.PENDING_CONCURRENCY_ACQUIRES, this.pendingAcquireCount);
162                 metrics.reportMetric(HttpMetric.LEASED_CONCURRENCY, this.acquiredChannelCount);
163                 result.complete(null);
164             } catch (Throwable t) {
165                 result.completeExceptionally(t);
166             }
167         });
168         return CompletableFuture.allOf(result, delegateMetricResult);
169     }
170 
acquire0(final Promise<Channel> promise)171     private void acquire0(final Promise<Channel> promise) {
172         assert executor.inEventLoop();
173 
174         if (closed) {
175             promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
176             return;
177         }
178         if (acquiredChannelCount < maxConnections) {
179             assert acquiredChannelCount >= 0;
180 
181             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
182             // EventLoop
183             Promise<Channel> p = executor.newPromise();
184             AcquireListener l = new AcquireListener(promise);
185             l.acquired();
186             p.addListener(l);
187             delegateChannelPool.acquire(p);
188         } else {
189             if (pendingAcquireCount >= maxPendingAcquires) {
190                 promise.setFailure(FULL_EXCEPTION);
191             } else {
192                 AcquireTask task = new AcquireTask(promise);
193                 if (pendingAcquireQueue.offer(task)) {
194                     ++pendingAcquireCount;
195 
196                     if (timeoutTask != null) {
197                         task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
198                     }
199                 } else {
200                     promise.setFailure(FULL_EXCEPTION);
201                 }
202             }
203 
204             assert pendingAcquireCount > 0;
205         }
206     }
207 
208     @Override
release(Channel channel)209     public Future<Void> release(Channel channel) {
210         return release(channel, new DefaultPromise<>(executor));
211     }
212 
213     @Override
release(final Channel channel, final Promise<Void> promise)214     public Future<Void> release(final Channel channel, final Promise<Void> promise) {
215         ObjectUtil.checkNotNull(promise, "promise");
216         Promise<Void> p = executor.newPromise();
217         delegateChannelPool.release(channel, p.addListener(new FutureListener<Void>() {
218 
219             @Override
220             public void operationComplete(Future<Void> future) throws Exception {
221                 assert executor.inEventLoop();
222 
223                 if (closed) {
224                     // Since the pool is closed, we have no choice but to close the channel
225                     channel.close();
226                     promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
227                     return;
228                 }
229 
230                 if (future.isSuccess()) {
231                     decrementAndRunTaskQueue();
232                     promise.setSuccess(null);
233                 } else {
234                     Throwable cause = future.cause();
235                     // Check if the exception was not because of we passed the Channel to the wrong pool.
236                     if (!(cause instanceof IllegalArgumentException)) {
237                         decrementAndRunTaskQueue();
238                     }
239                     promise.setFailure(future.cause());
240                 }
241             }
242         }));
243         return promise;
244     }
245 
decrementAndRunTaskQueue()246     private void decrementAndRunTaskQueue() {
247         --acquiredChannelCount;
248 
249         // We should never have a negative value.
250         assert acquiredChannelCount >= 0;
251 
252         // Run the pending acquire tasks before notify the original promise so if the user would
253         // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
254         // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
255         // more.
256         runTaskQueue();
257     }
258 
runTaskQueue()259     private void runTaskQueue() {
260         while (acquiredChannelCount < maxConnections) {
261             AcquireTask task = pendingAcquireQueue.poll();
262             if (task == null) {
263                 break;
264             }
265 
266             // Cancel the timeout if one was scheduled
267             ScheduledFuture<?> timeoutFuture = task.timeoutFuture;
268             if (timeoutFuture != null) {
269                 timeoutFuture.cancel(false);
270             }
271 
272             --pendingAcquireCount;
273             task.acquired();
274 
275             delegateChannelPool.acquire(task.promise);
276         }
277 
278         // We should never have a negative value.
279         assert pendingAcquireCount >= 0;
280         assert acquiredChannelCount >= 0;
281     }
282 
283     // AcquireTask extends AcquireListener to reduce object creations and so GC pressure
284     private final class AcquireTask extends AcquireListener {
285         final Promise<Channel> promise;
286         final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos;
287         ScheduledFuture<?> timeoutFuture;
288 
AcquireTask(Promise<Channel> promise)289         public AcquireTask(Promise<Channel> promise) {
290             super(promise);
291             // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
292             // EventLoop.
293             this.promise = executor.<Channel>newPromise().addListener(this);
294         }
295     }
296 
297     private abstract class TimeoutTask implements Runnable {
298         @Override
run()299         public final void run() {
300             assert executor.inEventLoop();
301             long nanoTime = System.nanoTime();
302             for (; ; ) {
303                 AcquireTask task = pendingAcquireQueue.peek();
304                 // Compare nanoTime as descripted in the javadocs of System.nanoTime()
305                 //
306                 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
307                 // See https://github.com/netty/netty/issues/3705
308                 if (task == null || nanoTime - task.expireNanoTime < 0) {
309                     break;
310                 }
311                 pendingAcquireQueue.remove();
312 
313                 --pendingAcquireCount;
314                 onTimeout(task);
315             }
316         }
317 
onTimeout(AcquireTask task)318         public abstract void onTimeout(AcquireTask task);
319     }
320 
321     private class AcquireListener implements FutureListener<Channel> {
322         private final Promise<Channel> originalPromise;
323         protected boolean acquired;
324 
AcquireListener(Promise<Channel> originalPromise)325         AcquireListener(Promise<Channel> originalPromise) {
326             this.originalPromise = originalPromise;
327         }
328 
329         @Override
operationComplete(Future<Channel> future)330         public void operationComplete(Future<Channel> future) throws Exception {
331             assert executor.inEventLoop();
332 
333             if (closed) {
334                 if (future.isSuccess()) {
335                     // Since the pool is closed, we have no choice but to close the channel
336                     future.getNow().close();
337                 }
338                 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
339                 return;
340             }
341 
342             if (future.isSuccess()) {
343                 originalPromise.setSuccess(future.getNow());
344             } else {
345                 if (acquired) {
346                     decrementAndRunTaskQueue();
347                 } else {
348                     runTaskQueue();
349                 }
350 
351                 originalPromise.setFailure(future.cause());
352             }
353         }
354 
acquired()355         public void acquired() {
356             if (acquired) {
357                 return;
358             }
359             acquiredChannelCount++;
360             acquired = true;
361         }
362     }
363 
364     @Override
close()365     public void close() {
366         if (executor.inEventLoop()) {
367             close0();
368         } else {
369             executor.submit(() -> close0()).awaitUninterruptibly();
370         }
371     }
372 
close0()373     private void close0() {
374         if (!closed) {
375             closed = true;
376             for (;;) {
377                 AcquireTask task = pendingAcquireQueue.poll();
378                 if (task == null) {
379                     break;
380                 }
381                 ScheduledFuture<?> f = task.timeoutFuture;
382                 if (f != null) {
383                     f.cancel(false);
384                 }
385                 task.promise.setFailure(new ClosedChannelException());
386             }
387             acquiredChannelCount = 0;
388             pendingAcquireCount = 0;
389 
390             // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need
391             // to ensure we will not block in a EventExecutor.
392             GlobalEventExecutor.INSTANCE.execute(() -> delegateChannelPool.close());
393         }
394     }
395 
builder()396     public static Builder builder() {
397         return new Builder();
398     }
399 
400     public static final class Builder {
401 
402         private SdkChannelPool channelPool;
403         private EventExecutor executor;
404         private AcquireTimeoutAction action;
405         private long acquireTimeoutMillis;
406         private int maxConnections;
407         private int maxPendingAcquires;
408 
Builder()409         private Builder() {
410         }
411 
channelPool(SdkChannelPool channelPool)412         public Builder channelPool(SdkChannelPool channelPool) {
413             this.channelPool = channelPool;
414             return this;
415         }
416 
executor(EventExecutor executor)417         public Builder executor(EventExecutor executor) {
418             this.executor = executor;
419             return this;
420         }
421 
acquireTimeoutAction(AcquireTimeoutAction action)422         public Builder acquireTimeoutAction(AcquireTimeoutAction action) {
423             this.action = action;
424             return this;
425         }
426 
acquireTimeoutMillis(long acquireTimeoutMillis)427         public Builder acquireTimeoutMillis(long acquireTimeoutMillis) {
428             this.acquireTimeoutMillis = acquireTimeoutMillis;
429             return this;
430         }
431 
maxConnections(int maxConnections)432         public Builder maxConnections(int maxConnections) {
433             this.maxConnections = maxConnections;
434             return this;
435         }
436 
maxPendingAcquires(int maxPendingAcquires)437         public Builder maxPendingAcquires(int maxPendingAcquires) {
438             this.maxPendingAcquires = maxPendingAcquires;
439             return this;
440         }
441 
build()442         public BetterFixedChannelPool build() {
443             return new BetterFixedChannelPool(this);
444         }
445     }
446 }
447