1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.utils.async; 17 18 import java.io.IOException; 19 import java.io.UncheckedIOException; 20 import java.util.Optional; 21 import java.util.Queue; 22 import java.util.concurrent.ConcurrentLinkedQueue; 23 import org.reactivestreams.Subscriber; 24 import org.reactivestreams.Subscription; 25 import software.amazon.awssdk.annotations.SdkProtectedApi; 26 import software.amazon.awssdk.utils.Validate; 27 28 /** 29 * An implementation of {@link Subscriber} that stores the events it receives for retrieval. 30 * 31 * <p>Events can be observed via {@link #peek()} and {@link #poll()}. The number of events stored is limited by the 32 * {@code maxElements} configured at construction. 33 */ 34 @SdkProtectedApi 35 public class StoringSubscriber<T> implements Subscriber<T> { 36 /** 37 * The maximum number of events that can be stored in this subscriber. The number of events in {@link #events} may be 38 * slightly higher once {@link #onComplete()} and {@link #onError(Throwable)} events are added. 39 */ 40 private final int maxEvents; 41 42 /** 43 * The events stored in this subscriber. The maximum size of this queue is approximately {@link #maxEvents}. 44 */ 45 private final Queue<Event<T>> events; 46 47 /** 48 * The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked. 49 */ 50 private Subscription subscription; 51 52 /** 53 * Create a subscriber that stores up to {@code maxElements} events for retrieval. 54 */ StoringSubscriber(int maxEvents)55 public StoringSubscriber(int maxEvents) { 56 Validate.isPositive(maxEvents, "Max elements must be positive."); 57 this.maxEvents = maxEvents; 58 this.events = new ConcurrentLinkedQueue<>(); 59 } 60 61 /** 62 * Check the first event stored in this subscriber. 63 * 64 * <p>This will return empty if no events are currently available (outstanding demand has not yet 65 * been filled). 66 */ peek()67 public Optional<Event<T>> peek() { 68 return Optional.ofNullable(events.peek()); 69 } 70 71 /** 72 * Remove and return the first event stored in this subscriber. 73 * 74 * <p>This will return empty if no events are currently available (outstanding demand has not yet 75 * been filled). 76 */ poll()77 public Optional<Event<T>> poll() { 78 Event<T> result = events.poll(); 79 if (result != null) { 80 subscription.request(1); 81 return Optional.of(result); 82 } 83 return Optional.empty(); 84 } 85 86 @Override onSubscribe(Subscription subscription)87 public void onSubscribe(Subscription subscription) { 88 if (this.subscription != null) { 89 subscription.cancel(); 90 } 91 92 this.subscription = subscription; 93 subscription.request(maxEvents); 94 } 95 96 @Override onNext(T t)97 public void onNext(T t) { 98 Validate.notNull(t, "onNext(null) is not allowed."); 99 100 try { 101 events.add(Event.value(t)); 102 } catch (RuntimeException e) { 103 subscription.cancel(); 104 onError(new IllegalStateException("Failed to store element.", e)); 105 } 106 } 107 108 @Override onComplete()109 public void onComplete() { 110 events.add(Event.complete()); 111 } 112 113 @Override onError(Throwable throwable)114 public void onError(Throwable throwable) { 115 events.add(Event.error(throwable)); 116 } 117 118 /** 119 * An event stored for later retrieval by this subscriber. 120 * 121 * <p>Stored events are one of the follow {@link #type()}s: 122 * <ul> 123 * <li>{@code VALUE} - A value received by {@link #onNext(Object)}, available via {@link #value()}.</li> 124 * <li>{@code COMPLETE} - Indicating {@link #onComplete()} was called.</li> 125 * <li>{@code ERROR} - Indicating {@link #onError(Throwable)} was called. The exception is available via 126 * {@link #runtimeError()}</li> 127 * <li>{@code EMPTY} - Indicating that no events remain in the queue (but more from upstream may be given later).</li> 128 * </ul> 129 */ 130 public static final class Event<T> { 131 private final EventType type; 132 private final T value; 133 private final Throwable error; 134 Event(EventType type, T value, Throwable error)135 private Event(EventType type, T value, Throwable error) { 136 this.type = type; 137 this.value = value; 138 this.error = error; 139 } 140 complete()141 private static <T> Event<T> complete() { 142 return new Event<>(EventType.ON_COMPLETE, null, null); 143 } 144 error(Throwable error)145 private static <T> Event<T> error(Throwable error) { 146 return new Event<>(EventType.ON_ERROR, null, error); 147 } 148 value(T value)149 private static <T> Event<T> value(T value) { 150 return new Event<>(EventType.ON_NEXT, value, null); 151 } 152 153 /** 154 * Retrieve the {@link EventType} of this event. 155 */ type()156 public EventType type() { 157 return type; 158 } 159 160 /** 161 * The value stored in this {@code VALUE} type. Null for all other event types. 162 */ value()163 public T value() { 164 return value; 165 } 166 167 /** 168 * The error stored in this {@code ERROR} type. Null for all other event types. If a checked exception was received via 169 * {@link #onError(Throwable)}, this will return a {@code RuntimeException} with the checked exception as its cause. 170 */ runtimeError()171 public RuntimeException runtimeError() { 172 if (type != EventType.ON_ERROR) { 173 return null; 174 } 175 176 if (error instanceof RuntimeException) { 177 return (RuntimeException) error; 178 } 179 180 if (error instanceof IOException) { 181 return new UncheckedIOException((IOException) error); 182 } 183 184 return new RuntimeException(error); 185 } 186 } 187 188 public enum EventType { 189 ON_NEXT, 190 ON_COMPLETE, 191 ON_ERROR 192 } 193 } 194