• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.utils.async;
17 
18 import java.io.IOException;
19 import java.io.UncheckedIOException;
20 import java.util.Optional;
21 import java.util.Queue;
22 import java.util.concurrent.ConcurrentLinkedQueue;
23 import org.reactivestreams.Subscriber;
24 import org.reactivestreams.Subscription;
25 import software.amazon.awssdk.annotations.SdkProtectedApi;
26 import software.amazon.awssdk.utils.Validate;
27 
28 /**
29  * An implementation of {@link Subscriber} that stores the events it receives for retrieval.
30  *
31  * <p>Events can be observed via {@link #peek()} and {@link #poll()}. The number of events stored is limited by the
32  * {@code maxElements} configured at construction.
33  */
34 @SdkProtectedApi
35 public class StoringSubscriber<T> implements Subscriber<T> {
36     /**
37      * The maximum number of events that can be stored in this subscriber. The number of events in {@link #events} may be
38      * slightly higher once {@link #onComplete()} and {@link #onError(Throwable)} events are added.
39      */
40     private final int maxEvents;
41 
42     /**
43      * The events stored in this subscriber. The maximum size of this queue is approximately {@link #maxEvents}.
44      */
45     private final Queue<Event<T>> events;
46 
47     /**
48      * The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked.
49      */
50     private Subscription subscription;
51 
52     /**
53      * Create a subscriber that stores up to {@code maxElements} events for retrieval.
54      */
StoringSubscriber(int maxEvents)55     public StoringSubscriber(int maxEvents) {
56         Validate.isPositive(maxEvents, "Max elements must be positive.");
57         this.maxEvents = maxEvents;
58         this.events = new ConcurrentLinkedQueue<>();
59     }
60 
61     /**
62      * Check the first event stored in this subscriber.
63      *
64      * <p>This will return empty if no events are currently available (outstanding demand has not yet
65      * been filled).
66      */
peek()67     public Optional<Event<T>> peek() {
68         return Optional.ofNullable(events.peek());
69     }
70 
71     /**
72      * Remove and return the first event stored in this subscriber.
73      *
74      * <p>This will return empty if no events are currently available (outstanding demand has not yet
75      * been filled).
76      */
poll()77     public Optional<Event<T>> poll() {
78         Event<T> result = events.poll();
79         if (result != null) {
80             subscription.request(1);
81             return Optional.of(result);
82         }
83         return Optional.empty();
84     }
85 
86     @Override
onSubscribe(Subscription subscription)87     public void onSubscribe(Subscription subscription) {
88         if (this.subscription != null) {
89             subscription.cancel();
90         }
91 
92         this.subscription = subscription;
93         subscription.request(maxEvents);
94     }
95 
96     @Override
onNext(T t)97     public void onNext(T t) {
98         Validate.notNull(t, "onNext(null) is not allowed.");
99 
100         try {
101             events.add(Event.value(t));
102         } catch (RuntimeException e) {
103             subscription.cancel();
104             onError(new IllegalStateException("Failed to store element.", e));
105         }
106     }
107 
108     @Override
onComplete()109     public void onComplete() {
110         events.add(Event.complete());
111     }
112 
113     @Override
onError(Throwable throwable)114     public void onError(Throwable throwable) {
115         events.add(Event.error(throwable));
116     }
117 
118     /**
119      * An event stored for later retrieval by this subscriber.
120      *
121      * <p>Stored events are one of the follow {@link #type()}s:
122      * <ul>
123      *     <li>{@code VALUE} - A value received by {@link #onNext(Object)}, available via {@link #value()}.</li>
124      *     <li>{@code COMPLETE} - Indicating {@link #onComplete()} was called.</li>
125      *     <li>{@code ERROR} - Indicating {@link #onError(Throwable)} was called. The exception is available via
126      *     {@link #runtimeError()}</li>
127      *     <li>{@code EMPTY} - Indicating that no events remain in the queue (but more from upstream may be given later).</li>
128      * </ul>
129      */
130     public static final class Event<T> {
131         private final EventType type;
132         private final T value;
133         private final Throwable error;
134 
Event(EventType type, T value, Throwable error)135         private Event(EventType type, T value, Throwable error) {
136             this.type = type;
137             this.value = value;
138             this.error = error;
139         }
140 
complete()141         private static <T> Event<T> complete() {
142             return new Event<>(EventType.ON_COMPLETE, null, null);
143         }
144 
error(Throwable error)145         private static <T> Event<T> error(Throwable error) {
146             return new Event<>(EventType.ON_ERROR, null, error);
147         }
148 
value(T value)149         private static <T> Event<T> value(T value) {
150             return new Event<>(EventType.ON_NEXT, value, null);
151         }
152 
153         /**
154          * Retrieve the {@link EventType} of this event.
155          */
type()156         public EventType type() {
157             return type;
158         }
159 
160         /**
161          * The value stored in this {@code VALUE} type. Null for all other event types.
162          */
value()163         public T value() {
164             return value;
165         }
166 
167         /**
168          * The error stored in this {@code ERROR} type. Null for all other event types. If a checked exception was received via
169          * {@link #onError(Throwable)}, this will return a {@code RuntimeException} with the checked exception as its cause.
170          */
runtimeError()171         public RuntimeException runtimeError() {
172             if (type != EventType.ON_ERROR) {
173                 return null;
174             }
175 
176             if (error instanceof RuntimeException) {
177                 return (RuntimeException) error;
178             }
179 
180             if (error instanceof IOException) {
181                 return new UncheckedIOException((IOException) error);
182             }
183 
184             return new RuntimeException(error);
185         }
186     }
187 
188     public enum EventType {
189         ON_NEXT,
190         ON_COMPLETE,
191         ON_ERROR
192     }
193 }
194