1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.http.nio.netty.internal.utils; 17 18 import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop; 19 20 import io.netty.channel.Channel; 21 import io.netty.channel.pool.ChannelPool; 22 import io.netty.util.concurrent.DefaultPromise; 23 import io.netty.util.concurrent.EventExecutor; 24 import io.netty.util.concurrent.Future; 25 import io.netty.util.concurrent.FutureListener; 26 import io.netty.util.concurrent.GlobalEventExecutor; 27 import io.netty.util.concurrent.Promise; 28 import io.netty.util.internal.ObjectUtil; 29 import io.netty.util.internal.ThrowableUtil; 30 import java.nio.channels.ClosedChannelException; 31 import java.util.ArrayDeque; 32 import java.util.Queue; 33 import java.util.concurrent.CompletableFuture; 34 import java.util.concurrent.ScheduledFuture; 35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.TimeoutException; 37 import software.amazon.awssdk.http.HttpMetric; 38 import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPool; 39 import software.amazon.awssdk.metrics.MetricCollector; 40 41 /** 42 * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum 43 * number of concurrent connections. 44 */ 45 //TODO: Contribute me back to Netty 46 public class BetterFixedChannelPool implements SdkChannelPool { 47 private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( 48 new IllegalStateException("Too many outstanding acquire operations"), 49 BetterFixedChannelPool.class, "acquire0(...)"); 50 private static final TimeoutException TIMEOUT_EXCEPTION = ThrowableUtil.unknownStackTrace( 51 new TimeoutException("Acquire operation took longer than configured maximum time"), 52 BetterFixedChannelPool.class, "<init>(...)"); 53 static final IllegalStateException POOL_CLOSED_ON_RELEASE_EXCEPTION = ThrowableUtil.unknownStackTrace( 54 new IllegalStateException("BetterFixedChannelPooled was closed"), 55 BetterFixedChannelPool.class, "release(...)"); 56 static final IllegalStateException POOL_CLOSED_ON_ACQUIRE_EXCEPTION = ThrowableUtil.unknownStackTrace( 57 new IllegalStateException("BetterFixedChannelPooled was closed"), 58 BetterFixedChannelPool.class, "acquire0(...)"); 59 60 public enum AcquireTimeoutAction { 61 /** 62 * Create a new connection when the timeout is detected. 63 */ 64 NEW, 65 66 /** 67 * Fail the {@link Future} of the acquire call with a {@link TimeoutException}. 68 */ 69 FAIL 70 } 71 72 private final EventExecutor executor; 73 private final long acquireTimeoutNanos; 74 private final Runnable timeoutTask; 75 private final SdkChannelPool delegateChannelPool; 76 77 // There is no need to worry about synchronization as everything that modified the queue or counts is done 78 // by the above EventExecutor. 79 private final Queue<AcquireTask> pendingAcquireQueue = new ArrayDeque<>(); 80 private final int maxConnections; 81 private final int maxPendingAcquires; 82 private int acquiredChannelCount; 83 private int pendingAcquireCount; 84 private boolean closed; 85 86 BetterFixedChannelPool(Builder builder)87 private BetterFixedChannelPool(Builder builder) { 88 if (builder.maxConnections < 1) { 89 throw new IllegalArgumentException("maxConnections: " + builder.maxConnections + " (expected: >= 1)"); 90 } 91 if (builder.maxPendingAcquires < 1) { 92 throw new IllegalArgumentException("maxPendingAcquires: " + builder.maxPendingAcquires + " (expected: >= 1)"); 93 } 94 this.delegateChannelPool = builder.channelPool; 95 this.executor = builder.executor; 96 if (builder.action == null && builder.acquireTimeoutMillis == -1) { 97 timeoutTask = null; 98 acquireTimeoutNanos = -1; 99 } else if (builder.action == null && builder.acquireTimeoutMillis != -1) { 100 throw new NullPointerException("action"); 101 } else if (builder.action != null && builder.acquireTimeoutMillis < 0) { 102 throw new IllegalArgumentException("acquireTimeoutMillis: " + builder.acquireTimeoutMillis + " (expected: >= 0)"); 103 } else { 104 acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(builder.acquireTimeoutMillis); 105 switch (builder.action) { 106 case FAIL: 107 timeoutTask = new TimeoutTask() { 108 @Override 109 public void onTimeout(AcquireTask task) { 110 // Fail the promise as we timed out. 111 task.promise.setFailure(TIMEOUT_EXCEPTION); 112 } 113 }; 114 break; 115 case NEW: 116 timeoutTask = new TimeoutTask() { 117 @Override 118 public void onTimeout(AcquireTask task) { 119 // Increment the acquire count and delegate to super to actually acquire a Channel which will 120 // create a new connection. 121 task.acquired(); 122 123 delegateChannelPool.acquire(task.promise); 124 } 125 }; 126 break; 127 default: 128 throw new Error(); 129 } 130 } 131 this.maxConnections = builder.maxConnections; 132 this.maxPendingAcquires = builder.maxPendingAcquires; 133 } 134 135 @Override acquire()136 public Future<Channel> acquire() { 137 return acquire(new DefaultPromise<>(executor)); 138 } 139 140 @Override acquire(final Promise<Channel> promise)141 public Future<Channel> acquire(final Promise<Channel> promise) { 142 try { 143 if (executor.inEventLoop()) { 144 acquire0(promise); 145 } else { 146 executor.execute(() -> acquire0(promise)); 147 } 148 } catch (Throwable cause) { 149 promise.setFailure(cause); 150 } 151 return promise; 152 } 153 154 @Override collectChannelPoolMetrics(MetricCollector metrics)155 public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metrics) { 156 CompletableFuture<Void> delegateMetricResult = delegateChannelPool.collectChannelPoolMetrics(metrics); 157 CompletableFuture<Void> result = new CompletableFuture<>(); 158 doInEventLoop(executor, () -> { 159 try { 160 metrics.reportMetric(HttpMetric.MAX_CONCURRENCY, this.maxConnections); 161 metrics.reportMetric(HttpMetric.PENDING_CONCURRENCY_ACQUIRES, this.pendingAcquireCount); 162 metrics.reportMetric(HttpMetric.LEASED_CONCURRENCY, this.acquiredChannelCount); 163 result.complete(null); 164 } catch (Throwable t) { 165 result.completeExceptionally(t); 166 } 167 }); 168 return CompletableFuture.allOf(result, delegateMetricResult); 169 } 170 acquire0(final Promise<Channel> promise)171 private void acquire0(final Promise<Channel> promise) { 172 assert executor.inEventLoop(); 173 174 if (closed) { 175 promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); 176 return; 177 } 178 if (acquiredChannelCount < maxConnections) { 179 assert acquiredChannelCount >= 0; 180 181 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct 182 // EventLoop 183 Promise<Channel> p = executor.newPromise(); 184 AcquireListener l = new AcquireListener(promise); 185 l.acquired(); 186 p.addListener(l); 187 delegateChannelPool.acquire(p); 188 } else { 189 if (pendingAcquireCount >= maxPendingAcquires) { 190 promise.setFailure(FULL_EXCEPTION); 191 } else { 192 AcquireTask task = new AcquireTask(promise); 193 if (pendingAcquireQueue.offer(task)) { 194 ++pendingAcquireCount; 195 196 if (timeoutTask != null) { 197 task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); 198 } 199 } else { 200 promise.setFailure(FULL_EXCEPTION); 201 } 202 } 203 204 assert pendingAcquireCount > 0; 205 } 206 } 207 208 @Override release(Channel channel)209 public Future<Void> release(Channel channel) { 210 return release(channel, new DefaultPromise<>(executor)); 211 } 212 213 @Override release(final Channel channel, final Promise<Void> promise)214 public Future<Void> release(final Channel channel, final Promise<Void> promise) { 215 ObjectUtil.checkNotNull(promise, "promise"); 216 Promise<Void> p = executor.newPromise(); 217 delegateChannelPool.release(channel, p.addListener(new FutureListener<Void>() { 218 219 @Override 220 public void operationComplete(Future<Void> future) throws Exception { 221 assert executor.inEventLoop(); 222 223 if (closed) { 224 // Since the pool is closed, we have no choice but to close the channel 225 channel.close(); 226 promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); 227 return; 228 } 229 230 if (future.isSuccess()) { 231 decrementAndRunTaskQueue(); 232 promise.setSuccess(null); 233 } else { 234 Throwable cause = future.cause(); 235 // Check if the exception was not because of we passed the Channel to the wrong pool. 236 if (!(cause instanceof IllegalArgumentException)) { 237 decrementAndRunTaskQueue(); 238 } 239 promise.setFailure(future.cause()); 240 } 241 } 242 })); 243 return promise; 244 } 245 decrementAndRunTaskQueue()246 private void decrementAndRunTaskQueue() { 247 --acquiredChannelCount; 248 249 // We should never have a negative value. 250 assert acquiredChannelCount >= 0; 251 252 // Run the pending acquire tasks before notify the original promise so if the user would 253 // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >= 254 // maxPendingAcquires we may be able to run some pending tasks first and so allow to add 255 // more. 256 runTaskQueue(); 257 } 258 runTaskQueue()259 private void runTaskQueue() { 260 while (acquiredChannelCount < maxConnections) { 261 AcquireTask task = pendingAcquireQueue.poll(); 262 if (task == null) { 263 break; 264 } 265 266 // Cancel the timeout if one was scheduled 267 ScheduledFuture<?> timeoutFuture = task.timeoutFuture; 268 if (timeoutFuture != null) { 269 timeoutFuture.cancel(false); 270 } 271 272 --pendingAcquireCount; 273 task.acquired(); 274 275 delegateChannelPool.acquire(task.promise); 276 } 277 278 // We should never have a negative value. 279 assert pendingAcquireCount >= 0; 280 assert acquiredChannelCount >= 0; 281 } 282 283 // AcquireTask extends AcquireListener to reduce object creations and so GC pressure 284 private final class AcquireTask extends AcquireListener { 285 final Promise<Channel> promise; 286 final long expireNanoTime = System.nanoTime() + acquireTimeoutNanos; 287 ScheduledFuture<?> timeoutFuture; 288 AcquireTask(Promise<Channel> promise)289 public AcquireTask(Promise<Channel> promise) { 290 super(promise); 291 // We need to create a new promise as we need to ensure the AcquireListener runs in the correct 292 // EventLoop. 293 this.promise = executor.<Channel>newPromise().addListener(this); 294 } 295 } 296 297 private abstract class TimeoutTask implements Runnable { 298 @Override run()299 public final void run() { 300 assert executor.inEventLoop(); 301 long nanoTime = System.nanoTime(); 302 for (; ; ) { 303 AcquireTask task = pendingAcquireQueue.peek(); 304 // Compare nanoTime as descripted in the javadocs of System.nanoTime() 305 // 306 // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime() 307 // See https://github.com/netty/netty/issues/3705 308 if (task == null || nanoTime - task.expireNanoTime < 0) { 309 break; 310 } 311 pendingAcquireQueue.remove(); 312 313 --pendingAcquireCount; 314 onTimeout(task); 315 } 316 } 317 onTimeout(AcquireTask task)318 public abstract void onTimeout(AcquireTask task); 319 } 320 321 private class AcquireListener implements FutureListener<Channel> { 322 private final Promise<Channel> originalPromise; 323 protected boolean acquired; 324 AcquireListener(Promise<Channel> originalPromise)325 AcquireListener(Promise<Channel> originalPromise) { 326 this.originalPromise = originalPromise; 327 } 328 329 @Override operationComplete(Future<Channel> future)330 public void operationComplete(Future<Channel> future) throws Exception { 331 assert executor.inEventLoop(); 332 333 if (closed) { 334 if (future.isSuccess()) { 335 // Since the pool is closed, we have no choice but to close the channel 336 future.getNow().close(); 337 } 338 originalPromise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); 339 return; 340 } 341 342 if (future.isSuccess()) { 343 originalPromise.setSuccess(future.getNow()); 344 } else { 345 if (acquired) { 346 decrementAndRunTaskQueue(); 347 } else { 348 runTaskQueue(); 349 } 350 351 originalPromise.setFailure(future.cause()); 352 } 353 } 354 acquired()355 public void acquired() { 356 if (acquired) { 357 return; 358 } 359 acquiredChannelCount++; 360 acquired = true; 361 } 362 } 363 364 @Override close()365 public void close() { 366 if (executor.inEventLoop()) { 367 close0(); 368 } else { 369 executor.submit(() -> close0()).awaitUninterruptibly(); 370 } 371 } 372 close0()373 private void close0() { 374 if (!closed) { 375 closed = true; 376 for (;;) { 377 AcquireTask task = pendingAcquireQueue.poll(); 378 if (task == null) { 379 break; 380 } 381 ScheduledFuture<?> f = task.timeoutFuture; 382 if (f != null) { 383 f.cancel(false); 384 } 385 task.promise.setFailure(new ClosedChannelException()); 386 } 387 acquiredChannelCount = 0; 388 pendingAcquireCount = 0; 389 390 // Ensure we dispatch this on another Thread as close0 will be called from the EventExecutor and we need 391 // to ensure we will not block in a EventExecutor. 392 GlobalEventExecutor.INSTANCE.execute(() -> delegateChannelPool.close()); 393 } 394 } 395 builder()396 public static Builder builder() { 397 return new Builder(); 398 } 399 400 public static final class Builder { 401 402 private SdkChannelPool channelPool; 403 private EventExecutor executor; 404 private AcquireTimeoutAction action; 405 private long acquireTimeoutMillis; 406 private int maxConnections; 407 private int maxPendingAcquires; 408 Builder()409 private Builder() { 410 } 411 channelPool(SdkChannelPool channelPool)412 public Builder channelPool(SdkChannelPool channelPool) { 413 this.channelPool = channelPool; 414 return this; 415 } 416 executor(EventExecutor executor)417 public Builder executor(EventExecutor executor) { 418 this.executor = executor; 419 return this; 420 } 421 acquireTimeoutAction(AcquireTimeoutAction action)422 public Builder acquireTimeoutAction(AcquireTimeoutAction action) { 423 this.action = action; 424 return this; 425 } 426 acquireTimeoutMillis(long acquireTimeoutMillis)427 public Builder acquireTimeoutMillis(long acquireTimeoutMillis) { 428 this.acquireTimeoutMillis = acquireTimeoutMillis; 429 return this; 430 } 431 maxConnections(int maxConnections)432 public Builder maxConnections(int maxConnections) { 433 this.maxConnections = maxConnections; 434 return this; 435 } 436 maxPendingAcquires(int maxPendingAcquires)437 public Builder maxPendingAcquires(int maxPendingAcquires) { 438 this.maxPendingAcquires = maxPendingAcquires; 439 return this; 440 } 441 build()442 public BetterFixedChannelPool build() { 443 return new BetterFixedChannelPool(this); 444 } 445 } 446 } 447