1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.utils.async; 17 18 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.CANCEL; 19 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_COMPLETE; 20 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_ERROR; 21 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_NEXT; 22 23 import java.util.Queue; 24 import java.util.concurrent.CancellationException; 25 import java.util.concurrent.CompletableFuture; 26 import java.util.concurrent.ConcurrentLinkedQueue; 27 import java.util.concurrent.atomic.AtomicBoolean; 28 import java.util.concurrent.atomic.AtomicLong; 29 import java.util.function.Supplier; 30 import org.reactivestreams.Publisher; 31 import org.reactivestreams.Subscriber; 32 import org.reactivestreams.Subscription; 33 import software.amazon.awssdk.annotations.SdkProtectedApi; 34 import software.amazon.awssdk.utils.Logger; 35 import software.amazon.awssdk.utils.Validate; 36 37 /** 38 * A {@link Publisher} to which callers can {@link #send(Object)} messages, simplifying the process of implementing a publisher. 39 * 40 * <p><b>Operations</b> 41 * 42 * <p>The {@code SimplePublisher} supports three simplified operations: 43 * <ol> 44 * <li>{@link #send(Object)} for sending messages</li> 45 * <li>{@link #complete()} for indicating the successful end of messages</li> 46 * <li>{@link #error(Throwable)} for indicating the unsuccessful end of messages</li> 47 * </ol> 48 * 49 * Each of these operations returns a {@link CompletableFuture} for indicating when the message has been successfully sent. 50 * 51 * <p>Callers are expected to invoke a series of {@link #send(Object)}s followed by a single {@link #complete()} or 52 * {@link #error(Throwable)}. See the documentation on each operation for more details. 53 * 54 * <p>This publisher will store an unbounded number of messages. It is recommended that callers limit the number of in-flight 55 * {@link #send(Object)} operations in order to bound the amount of memory used by this publisher. 56 */ 57 @SdkProtectedApi 58 public final class SimplePublisher<T> implements Publisher<T> { 59 private static final Logger log = Logger.loggerFor(SimplePublisher.class); 60 61 /** 62 * Track the amount of outstanding demand requested by the active subscriber. 63 */ 64 private final AtomicLong outstandingDemand = new AtomicLong(); 65 66 /** 67 * The queue of events to be processed, in the order they should be processed. These events are lower priority than those 68 * in {@link #highPriorityQueue} and will be processed after that queue is empty. 69 * 70 * <p>All logic within this publisher is represented using events in this queue. This ensures proper ordering of events 71 * processing and simplified reasoning about thread safety. 72 */ 73 private final Queue<QueueEntry<T>> standardPriorityQueue = new ConcurrentLinkedQueue<>(); 74 75 /** 76 * The queue of events to be processed, in the order they should be processed. These events are higher priority than those 77 * in {@link #standardPriorityQueue} and will be processed first. 78 * 79 * <p>Events are written to this queue to "skip the line" in processing, so it's typically reserved for terminal events, 80 * like subscription cancellation. 81 * 82 * <p>All logic within this publisher is represented using events in this queue. This ensures proper ordering of events 83 * processing and simplified reasoning about thread safety. 84 */ 85 private final Queue<QueueEntry<T>> highPriorityQueue = new ConcurrentLinkedQueue<>(); 86 87 /** 88 * Whether the {@link #standardPriorityQueue} and {@link #highPriorityQueue}s are currently being processed. Only one thread 89 * may read events from the queues at a time. 90 */ 91 private final AtomicBoolean processingQueue = new AtomicBoolean(false); 92 93 /** 94 * The failure message that should be sent to future events. 95 */ 96 private final FailureMessage failureMessage = new FailureMessage(); 97 98 /** 99 * The subscriber provided via {@link #subscribe(Subscriber)}. This publisher only supports a single subscriber. 100 */ 101 private Subscriber<? super T> subscriber; 102 103 /** 104 * Send a message using this publisher. 105 * 106 * <p>Messages sent using this publisher will eventually be sent to a downstream subscriber, in the order they were 107 * written. When the message is sent to the subscriber, the returned future will be completed successfully. 108 * 109 * <p>This method may be invoked concurrently when the order of messages is not important. 110 * 111 * <p>In the time between when this method is invoked and the returned future is not completed, this publisher stores the 112 * request message in memory. Callers are recommended to limit the number of sends in progress at a time to bound the 113 * amount of memory used by this publisher. 114 * 115 * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or 116 * if the {@code send} call was performed after a {@link #complete()} or {@link #error(Throwable)} call. 117 * 118 * @param value The message to send. Must not be null. 119 * @return A future that is completed when the message is sent to the subscriber. 120 */ send(T value)121 public CompletableFuture<Void> send(T value) { 122 log.trace(() -> "Received send() with " + value); 123 124 OnNextQueueEntry<T> entry = new OnNextQueueEntry<>(value); 125 try { 126 Validate.notNull(value, "Null cannot be written."); 127 standardPriorityQueue.add(entry); 128 processEventQueue(); 129 } catch (RuntimeException t) { 130 entry.resultFuture.completeExceptionally(t); 131 } 132 return entry.resultFuture; 133 } 134 135 /** 136 * Indicate that no more {@link #send(Object)} calls will be made, and that stream of messages is completed successfully. 137 * 138 * <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the 139 * stream is treated as complete. The returned future will be completed successfully when the {@code complete} is sent to 140 * the downstream subscriber. 141 * 142 * <p>After this method is invoked, any future {@link #send(Object)}, {@code complete()} or {@link #error(Throwable)} 143 * calls will be completed exceptionally and not be processed. 144 * 145 * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or 146 * if the {@code complete} call was performed after a {@code complete} or {@link #error(Throwable)} call. 147 * 148 * @return A future that is completed when the complete has been sent to the downstream subscriber. 149 */ complete()150 public CompletableFuture<Void> complete() { 151 log.trace(() -> "Received complete()"); 152 153 OnCompleteQueueEntry<T> entry = new OnCompleteQueueEntry<>(); 154 155 try { 156 standardPriorityQueue.add(entry); 157 processEventQueue(); 158 } catch (RuntimeException t) { 159 entry.resultFuture.completeExceptionally(t); 160 } 161 return entry.resultFuture; 162 } 163 164 /** 165 * Indicate that no more {@link #send(Object)} calls will be made, and that streaming of messages has failed. 166 * 167 * <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the 168 * stream is treated as being in-error. The returned future will be completed successfully when the {@code error} is 169 * sent to the downstream subscriber. 170 * 171 * <p>After this method is invoked, any future {@link #send(Object)}, {@link #complete()} or {@code #error(Throwable)} 172 * calls will be completed exceptionally and not be processed. 173 * 174 * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or 175 * if the {@code complete} call was performed after a {@link #complete()} or {@code error} call. 176 * 177 * @param error The error to send. 178 * @return A future that is completed when the exception has been sent to the downstream subscriber. 179 */ error(Throwable error)180 public CompletableFuture<Void> error(Throwable error) { 181 log.trace(() -> "Received error() with " + error, error); 182 183 OnErrorQueueEntry<T> entry = new OnErrorQueueEntry<>(error); 184 185 try { 186 standardPriorityQueue.add(entry); 187 processEventQueue(); 188 } catch (RuntimeException t) { 189 entry.resultFuture.completeExceptionally(t); 190 } 191 return entry.resultFuture; 192 } 193 194 /** 195 * A method called by the downstream subscriber in order to subscribe to the publisher. 196 */ 197 @Override subscribe(Subscriber<? super T> s)198 public void subscribe(Subscriber<? super T> s) { 199 if (subscriber != null) { 200 s.onSubscribe(new NoOpSubscription()); 201 s.onError(new IllegalStateException("Only one subscription may be active at a time.")); 202 } 203 this.subscriber = s; 204 s.onSubscribe(new SubscriptionImpl()); 205 processEventQueue(); 206 } 207 208 /** 209 * Process the messages in the event queue. This is invoked after every operation on the publisher that changes the state 210 * of the event queue. 211 * 212 * <p>Internally, this method will only be executed by one thread at a time. Any calls to this method will another thread 213 * is processing the queue will return immediately. This ensures: (1) thread safety in queue processing, (2) mutual recursion 214 * between onSubscribe/onNext with {@link Subscription#request(long)} are impossible. 215 */ processEventQueue()216 private void processEventQueue() { 217 do { 218 if (!processingQueue.compareAndSet(false, true)) { 219 // Some other thread is processing the queue, so we don't need to. 220 return; 221 } 222 223 try { 224 doProcessQueue(); 225 } catch (Throwable e) { 226 panicAndDie(e); 227 break; 228 } finally { 229 processingQueue.set(false); 230 } 231 232 // Once releasing the processing-queue flag, we need to double-check that the queue still doesn't need to be 233 // processed, because new messages might have come in since we decided to release the flag. 234 } while (shouldProcessQueueEntry(standardPriorityQueue.peek()) || 235 shouldProcessQueueEntry(highPriorityQueue.peek())); 236 } 237 238 /** 239 * Pop events off of the queue and process them in the order they are given, returning when we can no longer process the 240 * event at the head of the queue. 241 * 242 * <p>Invoked only from within the {@link #processEventQueue()} method with the {@link #processingQueue} flag held. 243 */ doProcessQueue()244 private void doProcessQueue() { 245 while (true) { 246 QueueEntry<T> entry = highPriorityQueue.peek(); 247 Queue<?> sourceQueue = highPriorityQueue; 248 249 if (entry == null) { 250 entry = standardPriorityQueue.peek(); 251 sourceQueue = standardPriorityQueue; 252 } 253 254 if (!shouldProcessQueueEntry(entry)) { 255 // We're done processing entries. 256 return; 257 } 258 259 if (failureMessage.isSet()) { 260 entry.resultFuture.completeExceptionally(failureMessage.get()); 261 } else { 262 switch (entry.type()) { 263 case ON_NEXT: 264 OnNextQueueEntry<T> onNextEntry = (OnNextQueueEntry<T>) entry; 265 266 log.trace(() -> "Calling onNext() with " + onNextEntry.value); 267 subscriber.onNext(onNextEntry.value); 268 long newDemand = outstandingDemand.decrementAndGet(); 269 log.trace(() -> "Decreased demand to " + newDemand); 270 break; 271 case ON_COMPLETE: 272 failureMessage.trySet(() -> new IllegalStateException("onComplete() was already invoked.")); 273 274 log.trace(() -> "Calling onComplete()"); 275 subscriber.onComplete(); 276 break; 277 case ON_ERROR: 278 279 OnErrorQueueEntry<T> onErrorEntry = (OnErrorQueueEntry<T>) entry; 280 failureMessage.trySet(() -> new IllegalStateException("onError() was already invoked.", 281 onErrorEntry.failure)); 282 log.trace(() -> "Calling onError() with " + onErrorEntry.failure, onErrorEntry.failure); 283 subscriber.onError(onErrorEntry.failure); 284 break; 285 case CANCEL: 286 failureMessage.trySet(() -> new CancellationException("subscription has been cancelled.")); 287 288 subscriber = null; // Allow subscriber to be garbage collected after cancellation. 289 break; 290 default: 291 // Should never happen. Famous last words? 292 throw new IllegalStateException("Unknown entry type: " + entry.type()); 293 } 294 295 entry.resultFuture.complete(null); 296 } 297 298 sourceQueue.remove(); 299 } 300 } 301 302 /** 303 * Return true if we should process the provided queue entry. 304 */ shouldProcessQueueEntry(QueueEntry<T> entry)305 private boolean shouldProcessQueueEntry(QueueEntry<T> entry) { 306 if (entry == null) { 307 // The queue is empty. 308 return false; 309 } 310 311 if (failureMessage.isSet()) { 312 return true; 313 } 314 315 if (subscriber == null) { 316 // We don't have a subscriber yet. 317 return false; 318 } 319 320 if (entry.type() != ON_NEXT) { 321 // This event isn't an on-next event, so we don't need subscriber demand in order to process it. 322 return true; 323 } 324 325 // This is an on-next event and we're not failing on-next events, so make sure we have demand available before 326 // processing it. 327 return outstandingDemand.get() > 0; 328 } 329 330 /** 331 * Invoked from within {@link #processEventQueue()} when we can't process the queue for some reason. This is likely 332 * caused by a downstream subscriber throwing an exception from {@code onNext}, which it should never do. 333 * 334 * <p>Here we try our best to fail all of the entries in the queue, so that no callers have "stuck" futures. 335 */ panicAndDie(Throwable cause)336 private void panicAndDie(Throwable cause) { 337 try { 338 // Create exception here instead of in supplier to preserve a more-useful stack trace. 339 RuntimeException failure = new IllegalStateException("Encountered fatal error in publisher", cause); 340 failureMessage.trySet(() -> failure); 341 subscriber.onError(cause instanceof Error ? cause : failure); 342 343 while (true) { 344 QueueEntry<T> entry = standardPriorityQueue.poll(); 345 if (entry == null) { 346 break; 347 } 348 entry.resultFuture.completeExceptionally(failure); 349 } 350 } catch (Throwable t) { 351 t.addSuppressed(cause); 352 log.error(() -> "Failed while processing a failure. This could result in stuck futures.", t); 353 } 354 } 355 356 /** 357 * The subscription passed to the first {@link #subscriber} that subscribes to this publisher. This allows the downstream 358 * subscriber to request for more {@code onNext} calls or to {@code cancel} the stream of messages. 359 */ 360 private class SubscriptionImpl implements Subscription { 361 @Override request(long n)362 public void request(long n) { 363 log.trace(() -> "Received request() with " + n); 364 if (n <= 0) { 365 // Create exception here instead of in supplier to preserve a more-useful stack trace. 366 IllegalArgumentException failure = new IllegalArgumentException("A downstream publisher requested an invalid " 367 + "amount of data: " + n); 368 highPriorityQueue.add(new OnErrorQueueEntry<>(failure)); 369 processEventQueue(); 370 } else { 371 long newDemand = outstandingDemand.updateAndGet(current -> { 372 if (Long.MAX_VALUE - current < n) { 373 return Long.MAX_VALUE; 374 } 375 376 return current + n; 377 }); 378 log.trace(() -> "Increased demand to " + newDemand); 379 processEventQueue(); 380 } 381 } 382 383 @Override cancel()384 public void cancel() { 385 log.trace(() -> "Received cancel() from " + subscriber); 386 387 // Create exception here instead of in supplier to preserve a more-useful stack trace. 388 highPriorityQueue.add(new CancelQueueEntry<>()); 389 processEventQueue(); 390 } 391 } 392 393 /** 394 * A lazily-initialized failure message for future events sent to this publisher after a terminal event has 395 * occurred. 396 */ 397 private static final class FailureMessage { 398 private Supplier<Throwable> failureMessageSupplier; 399 private Throwable failureMessage; 400 trySet(Supplier<Throwable> supplier)401 private void trySet(Supplier<Throwable> supplier) { 402 if (failureMessageSupplier == null) { 403 failureMessageSupplier = supplier; 404 } 405 } 406 isSet()407 private boolean isSet() { 408 return failureMessageSupplier != null; 409 } 410 get()411 private Throwable get() { 412 if (failureMessage == null) { 413 failureMessage = failureMessageSupplier.get(); 414 } 415 return failureMessage; 416 } 417 } 418 419 /** 420 * An entry in the {@link #standardPriorityQueue}. 421 */ 422 abstract static class QueueEntry<T> { 423 /** 424 * The future that was returned to a {@link #send(Object)}, {@link #complete()} or {@link #error(Throwable)} message. 425 */ 426 protected final CompletableFuture<Void> resultFuture = new CompletableFuture<>(); 427 428 /** 429 * Retrieve the type of this queue entry. 430 */ type()431 protected abstract Type type(); 432 433 protected enum Type { 434 ON_NEXT, 435 ON_COMPLETE, 436 ON_ERROR, 437 CANCEL 438 } 439 } 440 441 /** 442 * An entry added when we get a {@link #send(Object)} call. 443 */ 444 private static final class OnNextQueueEntry<T> extends QueueEntry<T> { 445 private final T value; 446 OnNextQueueEntry(T value)447 private OnNextQueueEntry(T value) { 448 this.value = value; 449 } 450 451 @Override type()452 protected Type type() { 453 return ON_NEXT; 454 } 455 } 456 457 /** 458 * An entry added when we get a {@link #complete()} call. 459 */ 460 private static final class OnCompleteQueueEntry<T> extends QueueEntry<T> { 461 @Override type()462 protected Type type() { 463 return ON_COMPLETE; 464 } 465 } 466 467 /** 468 * An entry added when we get an {@link #error(Throwable)} call. 469 */ 470 private static final class OnErrorQueueEntry<T> extends QueueEntry<T> { 471 private final Throwable failure; 472 OnErrorQueueEntry(Throwable failure)473 private OnErrorQueueEntry(Throwable failure) { 474 this.failure = failure; 475 } 476 477 @Override type()478 protected Type type() { 479 return ON_ERROR; 480 } 481 } 482 483 /** 484 * An entry added when we get a {@link SubscriptionImpl#cancel()} call. 485 */ 486 private static final class CancelQueueEntry<T> extends QueueEntry<T> { 487 @Override type()488 protected Type type() { 489 return CANCEL; 490 } 491 } 492 493 /** 494 * A subscription that does nothing. This is used for signaling {@code onError} to subscribers that subscribe to this 495 * publisher for the second time. Only one subscriber is supported. 496 */ 497 private static final class NoOpSubscription implements Subscription { 498 @Override request(long n)499 public void request(long n) { 500 } 501 502 @Override cancel()503 public void cancel() { 504 } 505 } 506 } 507