1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.async; 17 18 import java.util.List; 19 import java.util.concurrent.CompletableFuture; 20 import java.util.function.Consumer; 21 import java.util.function.Function; 22 import java.util.function.Predicate; 23 import java.util.function.Supplier; 24 import org.reactivestreams.Publisher; 25 import org.reactivestreams.Subscriber; 26 import org.reactivestreams.Subscription; 27 import software.amazon.awssdk.annotations.SdkPublicApi; 28 import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber; 29 import software.amazon.awssdk.utils.async.BufferingSubscriber; 30 import software.amazon.awssdk.utils.async.EventListeningSubscriber; 31 import software.amazon.awssdk.utils.async.FilteringSubscriber; 32 import software.amazon.awssdk.utils.async.FlatteningSubscriber; 33 import software.amazon.awssdk.utils.async.LimitingSubscriber; 34 import software.amazon.awssdk.utils.async.SequentialSubscriber; 35 import software.amazon.awssdk.utils.internal.MappingSubscriber; 36 37 /** 38 * Interface that is implemented by the Async auto-paginated responses. 39 */ 40 @SdkPublicApi 41 public interface SdkPublisher<T> extends Publisher<T> { 42 43 /** 44 * Adapts a {@link Publisher} to {@link SdkPublisher}. 45 * 46 * @param toAdapt {@link Publisher} to adapt. 47 * @param <T> Type of object being published. 48 * @return SdkPublisher 49 */ adapt(Publisher<T> toAdapt)50 static <T> SdkPublisher<T> adapt(Publisher<T> toAdapt) { 51 return toAdapt::subscribe; 52 } 53 54 /** 55 * Filters published events to just those that are instances of the given class. This changes the type of 56 * publisher to the type specified in the {@link Class}. 57 * 58 * @param clzz Class to filter to. Includes subtypes of the class. 59 * @param <U> Type of class to filter to. 60 * @return New publisher, filtered to the given class. 61 */ filter(Class<U> clzz)62 default <U extends T> SdkPublisher<U> filter(Class<U> clzz) { 63 return filter(clzz::isInstance).map(clzz::cast); 64 } 65 66 /** 67 * Filters published events to just those that match the given predicate. Unlike {@link #filter(Class)}, this method 68 * does not change the type of the {@link Publisher}. 69 * 70 * @param predicate Predicate to match events. 71 * @return New publisher, filtered to just the events that match the predicate. 72 */ filter(Predicate<T> predicate)73 default SdkPublisher<T> filter(Predicate<T> predicate) { 74 return subscriber -> subscribe(new FilteringSubscriber<>(subscriber, predicate)); 75 } 76 77 /** 78 * Perform a mapping on the published events. Returns a new publisher of the mapped events. Typically this method will 79 * change the type of the publisher. 80 * 81 * @param mapper Mapping function to apply. 82 * @param <U> Type being mapped to. 83 * @return New publisher with events mapped according to the given function. 84 */ map(Function<T, U> mapper)85 default <U> SdkPublisher<U> map(Function<T, U> mapper) { 86 return subscriber -> subscribe(MappingSubscriber.create(subscriber, mapper)); 87 } 88 89 /** 90 * Performs a mapping on the published events and creates a new publisher that emits the mapped events one by one. 91 * 92 * @param mapper Mapping function that produces an {@link Iterable} of new events to be flattened. 93 * @param <U> Type of flattened event being mapped to. 94 * @return New publisher of flattened events. 95 */ flatMapIterable(Function<T, Iterable<U>> mapper)96 default <U> SdkPublisher<U> flatMapIterable(Function<T, Iterable<U>> mapper) { 97 return subscriber -> map(mapper).subscribe(new FlatteningSubscriber<>(subscriber)); 98 } 99 100 /** 101 * Buffers the events into lists of the given buffer size. Note that the last batch of events may be less than 102 * the buffer size. 103 * 104 * @param bufferSize Number of events to buffer before delivering downstream. 105 * @return New publisher of buffered events. 106 */ buffer(int bufferSize)107 default SdkPublisher<List<T>> buffer(int bufferSize) { 108 return subscriber -> subscribe(new BufferingSubscriber<>(subscriber, bufferSize)); 109 } 110 111 /** 112 * Limit the number of published events and cancel the subscription after that limit has been reached. The limit 113 * may never be reached if the downstream publisher doesn't have many events to publish. Once it reaches the limit, 114 * subsequent requests will be ignored. 115 * 116 * @param limit Number of events to publish. 117 * @return New publisher that will only publish up to the specified number of events. 118 */ limit(int limit)119 default SdkPublisher<T> limit(int limit) { 120 return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit)); 121 } 122 123 124 /** 125 * Creates a new publisher that emits trailing events provided by {@code trailingDataSupplier} in addition to the 126 * published events. 127 * 128 * @param trailingDataSupplier supplier to provide the trailing data 129 * @return New publisher that will publish additional events 130 */ addTrailingData(Supplier<Iterable<T>> trailingDataSupplier)131 default SdkPublisher<T> addTrailingData(Supplier<Iterable<T>> trailingDataSupplier) { 132 return subscriber -> subscribe(new AddingTrailingDataSubscriber<T>(subscriber, trailingDataSupplier)); 133 } 134 135 /** 136 * Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}. 137 * 138 * @param afterOnComplete The logic that should be run immediately after onComplete. 139 * @return New publisher that invokes the requested callback. 140 */ doAfterOnComplete(Runnable afterOnComplete)141 default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) { 142 return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null)); 143 } 144 145 /** 146 * Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}. 147 * 148 * @param afterOnError The logic that should be run immediately after onError. 149 * @return New publisher that invokes the requested callback. 150 */ doAfterOnError(Consumer<Throwable> afterOnError)151 default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) { 152 return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null)); 153 } 154 155 /** 156 * Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}. 157 * 158 * @param afterOnCancel The logic that should be run immediately after cancellation of the subscription. 159 * @return New publisher that invokes the requested callback. 160 */ doAfterOnCancel(Runnable afterOnCancel)161 default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) { 162 return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel)); 163 } 164 165 /** 166 * Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event 167 * published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more 168 * control over backpressure is required, consider using {@link #subscribe(Subscriber)}. 169 * 170 * @param consumer Consumer to process event. 171 * @return CompletableFuture that will be notified when all events have been consumed or if an error occurs. 172 */ subscribe(Consumer<T> consumer)173 default CompletableFuture<Void> subscribe(Consumer<T> consumer) { 174 CompletableFuture<Void> future = new CompletableFuture<>(); 175 subscribe(new SequentialSubscriber<>(consumer, future)); 176 return future; 177 } 178 } 179