• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.utils.async;
17 
18 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.CANCEL;
19 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_COMPLETE;
20 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_ERROR;
21 import static software.amazon.awssdk.utils.async.SimplePublisher.QueueEntry.Type.ON_NEXT;
22 
23 import java.util.Queue;
24 import java.util.concurrent.CancellationException;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28 import java.util.concurrent.atomic.AtomicLong;
29 import java.util.function.Supplier;
30 import org.reactivestreams.Publisher;
31 import org.reactivestreams.Subscriber;
32 import org.reactivestreams.Subscription;
33 import software.amazon.awssdk.annotations.SdkProtectedApi;
34 import software.amazon.awssdk.utils.Logger;
35 import software.amazon.awssdk.utils.Validate;
36 
37 /**
38  * A {@link Publisher} to which callers can {@link #send(Object)} messages, simplifying the process of implementing a publisher.
39  *
40  * <p><b>Operations</b>
41  *
42  * <p>The {@code SimplePublisher} supports three simplified operations:
43  * <ol>
44  *     <li>{@link #send(Object)} for sending messages</li>
45  *     <li>{@link #complete()} for indicating the successful end of messages</li>
46  *     <li>{@link #error(Throwable)} for indicating the unsuccessful end of messages</li>
47  * </ol>
48  *
49  * Each of these operations returns a {@link CompletableFuture} for indicating when the message has been successfully sent.
50  *
51  * <p>Callers are expected to invoke a series of {@link #send(Object)}s followed by a single {@link #complete()} or
52  * {@link #error(Throwable)}. See the documentation on each operation for more details.
53  *
54  * <p>This publisher will store an unbounded number of messages. It is recommended that callers limit the number of in-flight
55  * {@link #send(Object)} operations in order to bound the amount of memory used by this publisher.
56  */
57 @SdkProtectedApi
58 public final class SimplePublisher<T> implements Publisher<T> {
59     private static final Logger log = Logger.loggerFor(SimplePublisher.class);
60 
61     /**
62      * Track the amount of outstanding demand requested by the active subscriber.
63      */
64     private final AtomicLong outstandingDemand = new AtomicLong();
65 
66     /**
67      * The queue of events to be processed, in the order they should be processed. These events are lower priority than those
68      * in {@link #highPriorityQueue} and will be processed after that queue is empty.
69      *
70      * <p>All logic within this publisher is represented using events in this queue. This ensures proper ordering of events
71      * processing and simplified reasoning about thread safety.
72      */
73     private final Queue<QueueEntry<T>> standardPriorityQueue = new ConcurrentLinkedQueue<>();
74 
75     /**
76      * The queue of events to be processed, in the order they should be processed. These events are higher priority than those
77      * in {@link #standardPriorityQueue} and will be processed first.
78      *
79      * <p>Events are written to this queue to "skip the line" in processing, so it's typically reserved for terminal events,
80      * like subscription cancellation.
81      *
82      * <p>All logic within this publisher is represented using events in this queue. This ensures proper ordering of events
83      * processing and simplified reasoning about thread safety.
84      */
85     private final Queue<QueueEntry<T>> highPriorityQueue = new ConcurrentLinkedQueue<>();
86 
87     /**
88      * Whether the {@link #standardPriorityQueue} and {@link #highPriorityQueue}s are currently being processed. Only one thread
89      * may read events from the queues at a time.
90      */
91     private final AtomicBoolean processingQueue = new AtomicBoolean(false);
92 
93     /**
94      * The failure message that should be sent to future events.
95      */
96     private final FailureMessage failureMessage = new FailureMessage();
97 
98     /**
99      * The subscriber provided via {@link #subscribe(Subscriber)}. This publisher only supports a single subscriber.
100      */
101     private Subscriber<? super T> subscriber;
102 
103     /**
104      * Send a message using this publisher.
105      *
106      * <p>Messages sent using this publisher will eventually be sent to a downstream subscriber, in the order they were
107      * written. When the message is sent to the subscriber, the returned future will be completed successfully.
108      *
109      * <p>This method may be invoked concurrently when the order of messages is not important.
110      *
111      * <p>In the time between when this method is invoked and the returned future is not completed, this publisher stores the
112      * request message in memory. Callers are recommended to limit the number of sends in progress at a time to bound the
113      * amount of memory used by this publisher.
114      *
115      * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
116      * if the {@code send} call was performed after a {@link #complete()} or {@link #error(Throwable)} call.
117      *
118      * @param value The message to send. Must not be null.
119      * @return A future that is completed when the message is sent to the subscriber.
120      */
send(T value)121     public CompletableFuture<Void> send(T value) {
122         log.trace(() -> "Received send() with " + value);
123 
124         OnNextQueueEntry<T> entry = new OnNextQueueEntry<>(value);
125         try {
126             Validate.notNull(value, "Null cannot be written.");
127             standardPriorityQueue.add(entry);
128             processEventQueue();
129         } catch (RuntimeException t) {
130             entry.resultFuture.completeExceptionally(t);
131         }
132         return entry.resultFuture;
133     }
134 
135     /**
136      * Indicate that no more {@link #send(Object)} calls will be made, and that stream of messages is completed successfully.
137      *
138      * <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the
139      * stream is treated as complete. The returned future will be completed successfully when the {@code complete} is sent to
140      * the downstream subscriber.
141      *
142      * <p>After this method is invoked, any future {@link #send(Object)}, {@code complete()} or {@link #error(Throwable)}
143      * calls will be completed exceptionally and not be processed.
144      *
145      * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
146      * if the {@code complete} call was performed after a {@code complete} or {@link #error(Throwable)} call.
147      *
148      * @return A future that is completed when the complete has been sent to the downstream subscriber.
149      */
complete()150     public CompletableFuture<Void> complete() {
151         log.trace(() -> "Received complete()");
152 
153         OnCompleteQueueEntry<T> entry = new OnCompleteQueueEntry<>();
154 
155         try {
156             standardPriorityQueue.add(entry);
157             processEventQueue();
158         } catch (RuntimeException t) {
159             entry.resultFuture.completeExceptionally(t);
160         }
161         return entry.resultFuture;
162     }
163 
164     /**
165      * Indicate that no more {@link #send(Object)} calls will be made, and that streaming of messages has failed.
166      *
167      * <p>This can be called before any in-flight {@code send} calls are complete. Such messages will be processed before the
168      * stream is treated as being in-error. The returned future will be completed successfully when the {@code error} is
169      * sent to the downstream subscriber.
170      *
171      * <p>After this method is invoked, any future {@link #send(Object)}, {@link #complete()} or {@code #error(Throwable)}
172      * calls will be completed exceptionally and not be processed.
173      *
174      * <p>The returned future will be completed exceptionally if the downstream subscriber cancels the subscription, or
175      * if the {@code complete} call was performed after a {@link #complete()} or {@code error} call.
176      *
177      * @param error The error to send.
178      * @return A future that is completed when the exception has been sent to the downstream subscriber.
179      */
error(Throwable error)180     public CompletableFuture<Void> error(Throwable error) {
181         log.trace(() -> "Received error() with " + error, error);
182 
183         OnErrorQueueEntry<T> entry = new OnErrorQueueEntry<>(error);
184 
185         try {
186             standardPriorityQueue.add(entry);
187             processEventQueue();
188         } catch (RuntimeException t) {
189             entry.resultFuture.completeExceptionally(t);
190         }
191         return entry.resultFuture;
192     }
193 
194     /**
195      * A method called by the downstream subscriber in order to subscribe to the publisher.
196      */
197     @Override
subscribe(Subscriber<? super T> s)198     public void subscribe(Subscriber<? super T> s) {
199         if (subscriber != null) {
200             s.onSubscribe(new NoOpSubscription());
201             s.onError(new IllegalStateException("Only one subscription may be active at a time."));
202         }
203         this.subscriber = s;
204         s.onSubscribe(new SubscriptionImpl());
205         processEventQueue();
206     }
207 
208     /**
209      * Process the messages in the event queue. This is invoked after every operation on the publisher that changes the state
210      * of the event queue.
211      *
212      * <p>Internally, this method will only be executed by one thread at a time. Any calls to this method will another thread
213      * is processing the queue will return immediately. This ensures: (1) thread safety in queue processing, (2) mutual recursion
214      * between onSubscribe/onNext with {@link Subscription#request(long)} are impossible.
215      */
processEventQueue()216     private void processEventQueue() {
217         do {
218             if (!processingQueue.compareAndSet(false, true)) {
219                 // Some other thread is processing the queue, so we don't need to.
220                 return;
221             }
222 
223             try {
224                 doProcessQueue();
225             } catch (Throwable e) {
226                 panicAndDie(e);
227                 break;
228             } finally {
229                 processingQueue.set(false);
230             }
231 
232             // Once releasing the processing-queue flag, we need to double-check that the queue still doesn't need to be
233             // processed, because new messages might have come in since we decided to release the flag.
234         } while (shouldProcessQueueEntry(standardPriorityQueue.peek()) ||
235                  shouldProcessQueueEntry(highPriorityQueue.peek()));
236     }
237 
238     /**
239      * Pop events off of the queue and process them in the order they are given, returning when we can no longer process the
240      * event at the head of the queue.
241      *
242      * <p>Invoked only from within the {@link #processEventQueue()} method with the {@link #processingQueue} flag held.
243      */
doProcessQueue()244     private void doProcessQueue() {
245         while (true) {
246             QueueEntry<T> entry = highPriorityQueue.peek();
247             Queue<?> sourceQueue = highPriorityQueue;
248 
249             if (entry == null) {
250                 entry = standardPriorityQueue.peek();
251                 sourceQueue = standardPriorityQueue;
252             }
253 
254             if (!shouldProcessQueueEntry(entry)) {
255                 // We're done processing entries.
256                 return;
257             }
258 
259             if (failureMessage.isSet()) {
260                 entry.resultFuture.completeExceptionally(failureMessage.get());
261             } else {
262                 switch (entry.type()) {
263                     case ON_NEXT:
264                         OnNextQueueEntry<T> onNextEntry = (OnNextQueueEntry<T>) entry;
265 
266                         log.trace(() -> "Calling onNext() with " + onNextEntry.value);
267                         subscriber.onNext(onNextEntry.value);
268                         long newDemand = outstandingDemand.decrementAndGet();
269                         log.trace(() -> "Decreased demand to " + newDemand);
270                         break;
271                     case ON_COMPLETE:
272                         failureMessage.trySet(() -> new IllegalStateException("onComplete() was already invoked."));
273 
274                         log.trace(() -> "Calling onComplete()");
275                         subscriber.onComplete();
276                         break;
277                     case ON_ERROR:
278 
279                         OnErrorQueueEntry<T> onErrorEntry = (OnErrorQueueEntry<T>) entry;
280                         failureMessage.trySet(() -> new IllegalStateException("onError() was already invoked.",
281                                                                              onErrorEntry.failure));
282                         log.trace(() -> "Calling onError() with " + onErrorEntry.failure, onErrorEntry.failure);
283                         subscriber.onError(onErrorEntry.failure);
284                         break;
285                     case CANCEL:
286                         failureMessage.trySet(() -> new CancellationException("subscription has been cancelled."));
287 
288                         subscriber = null; // Allow subscriber to be garbage collected after cancellation.
289                         break;
290                     default:
291                         // Should never happen. Famous last words?
292                         throw new IllegalStateException("Unknown entry type: " + entry.type());
293                 }
294 
295                 entry.resultFuture.complete(null);
296             }
297 
298             sourceQueue.remove();
299         }
300     }
301 
302     /**
303      * Return true if we should process the provided queue entry.
304      */
shouldProcessQueueEntry(QueueEntry<T> entry)305     private boolean shouldProcessQueueEntry(QueueEntry<T> entry) {
306         if (entry == null) {
307             // The queue is empty.
308             return false;
309         }
310 
311         if (failureMessage.isSet()) {
312             return true;
313         }
314 
315         if (subscriber == null) {
316             // We don't have a subscriber yet.
317             return false;
318         }
319 
320         if (entry.type() != ON_NEXT) {
321             // This event isn't an on-next event, so we don't need subscriber demand in order to process it.
322             return true;
323         }
324 
325         // This is an on-next event and we're not failing on-next events, so make sure we have demand available before
326         // processing it.
327         return outstandingDemand.get() > 0;
328     }
329 
330     /**
331      * Invoked from within {@link #processEventQueue()} when we can't process the queue for some reason. This is likely
332      * caused by a downstream subscriber throwing an exception from {@code onNext}, which it should never do.
333      *
334      * <p>Here we try our best to fail all of the entries in the queue, so that no callers have "stuck" futures.
335      */
panicAndDie(Throwable cause)336     private void panicAndDie(Throwable cause) {
337         try {
338             // Create exception here instead of in supplier to preserve a more-useful stack trace.
339             RuntimeException failure = new IllegalStateException("Encountered fatal error in publisher", cause);
340             failureMessage.trySet(() -> failure);
341             subscriber.onError(cause instanceof Error ? cause : failure);
342 
343             while (true) {
344                 QueueEntry<T> entry = standardPriorityQueue.poll();
345                 if (entry == null) {
346                     break;
347                 }
348                 entry.resultFuture.completeExceptionally(failure);
349             }
350         } catch (Throwable t) {
351             t.addSuppressed(cause);
352             log.error(() -> "Failed while processing a failure. This could result in stuck futures.", t);
353         }
354     }
355 
356     /**
357      * The subscription passed to the first {@link #subscriber} that subscribes to this publisher. This allows the downstream
358      * subscriber to request for more {@code onNext} calls or to {@code cancel} the stream of messages.
359      */
360     private class SubscriptionImpl implements Subscription {
361         @Override
request(long n)362         public void request(long n) {
363             log.trace(() -> "Received request() with " + n);
364             if (n <= 0) {
365                 // Create exception here instead of in supplier to preserve a more-useful stack trace.
366                 IllegalArgumentException failure = new IllegalArgumentException("A downstream publisher requested an invalid "
367                                                                                 + "amount of data: " + n);
368                 highPriorityQueue.add(new OnErrorQueueEntry<>(failure));
369                 processEventQueue();
370             } else {
371                 long newDemand = outstandingDemand.updateAndGet(current -> {
372                     if (Long.MAX_VALUE - current < n) {
373                         return Long.MAX_VALUE;
374                     }
375 
376                     return current + n;
377                 });
378                 log.trace(() -> "Increased demand to " + newDemand);
379                 processEventQueue();
380             }
381         }
382 
383         @Override
cancel()384         public void cancel() {
385             log.trace(() -> "Received cancel() from " + subscriber);
386 
387             // Create exception here instead of in supplier to preserve a more-useful stack trace.
388             highPriorityQueue.add(new CancelQueueEntry<>());
389             processEventQueue();
390         }
391     }
392 
393     /**
394      * A lazily-initialized failure message for future events sent to this publisher after a terminal event has
395      * occurred.
396      */
397     private static final class FailureMessage {
398         private Supplier<Throwable> failureMessageSupplier;
399         private Throwable failureMessage;
400 
trySet(Supplier<Throwable> supplier)401         private void trySet(Supplier<Throwable> supplier) {
402             if (failureMessageSupplier == null) {
403                 failureMessageSupplier = supplier;
404             }
405         }
406 
isSet()407         private boolean isSet() {
408             return failureMessageSupplier != null;
409         }
410 
get()411         private Throwable get() {
412             if (failureMessage == null) {
413                 failureMessage = failureMessageSupplier.get();
414             }
415             return failureMessage;
416         }
417     }
418 
419     /**
420      * An entry in the {@link #standardPriorityQueue}.
421      */
422     abstract static class QueueEntry<T> {
423         /**
424          * The future that was returned to a {@link #send(Object)}, {@link #complete()} or {@link #error(Throwable)} message.
425          */
426         protected final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
427 
428         /**
429          * Retrieve the type of this queue entry.
430          */
type()431         protected abstract Type type();
432 
433         protected enum Type {
434             ON_NEXT,
435             ON_COMPLETE,
436             ON_ERROR,
437             CANCEL
438         }
439     }
440 
441     /**
442      * An entry added when we get a {@link #send(Object)} call.
443      */
444     private static final class OnNextQueueEntry<T> extends QueueEntry<T> {
445         private final T value;
446 
OnNextQueueEntry(T value)447         private OnNextQueueEntry(T value) {
448             this.value = value;
449         }
450 
451         @Override
type()452         protected Type type() {
453             return ON_NEXT;
454         }
455     }
456 
457     /**
458      * An entry added when we get a {@link #complete()} call.
459      */
460     private static final class OnCompleteQueueEntry<T> extends QueueEntry<T> {
461         @Override
type()462         protected Type type() {
463             return ON_COMPLETE;
464         }
465     }
466 
467     /**
468      * An entry added when we get an {@link #error(Throwable)} call.
469      */
470     private static final class OnErrorQueueEntry<T> extends QueueEntry<T> {
471         private final Throwable failure;
472 
OnErrorQueueEntry(Throwable failure)473         private OnErrorQueueEntry(Throwable failure) {
474             this.failure = failure;
475         }
476 
477         @Override
type()478         protected Type type() {
479             return ON_ERROR;
480         }
481     }
482 
483     /**
484      * An entry added when we get a {@link SubscriptionImpl#cancel()} call.
485      */
486     private static final class CancelQueueEntry<T> extends QueueEntry<T> {
487         @Override
type()488         protected Type type() {
489             return CANCEL;
490         }
491     }
492 
493     /**
494      * A subscription that does nothing. This is used for signaling {@code onError} to subscribers that subscribe to this
495      * publisher for the second time. Only one subscriber is supported.
496      */
497     private static final class NoOpSubscription implements Subscription {
498         @Override
request(long n)499         public void request(long n) {
500         }
501 
502         @Override
cancel()503         public void cancel() {
504         }
505     }
506 }
507