• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.async;
17 
18 import java.util.List;
19 import java.util.concurrent.CompletableFuture;
20 import java.util.function.Consumer;
21 import java.util.function.Function;
22 import java.util.function.Predicate;
23 import java.util.function.Supplier;
24 import org.reactivestreams.Publisher;
25 import org.reactivestreams.Subscriber;
26 import org.reactivestreams.Subscription;
27 import software.amazon.awssdk.annotations.SdkPublicApi;
28 import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
29 import software.amazon.awssdk.utils.async.BufferingSubscriber;
30 import software.amazon.awssdk.utils.async.EventListeningSubscriber;
31 import software.amazon.awssdk.utils.async.FilteringSubscriber;
32 import software.amazon.awssdk.utils.async.FlatteningSubscriber;
33 import software.amazon.awssdk.utils.async.LimitingSubscriber;
34 import software.amazon.awssdk.utils.async.SequentialSubscriber;
35 import software.amazon.awssdk.utils.internal.MappingSubscriber;
36 
37 /**
38  * Interface that is implemented by the Async auto-paginated responses.
39  */
40 @SdkPublicApi
41 public interface SdkPublisher<T> extends Publisher<T> {
42 
43     /**
44      * Adapts a {@link Publisher} to {@link SdkPublisher}.
45      *
46      * @param toAdapt {@link Publisher} to adapt.
47      * @param <T> Type of object being published.
48      * @return SdkPublisher
49      */
adapt(Publisher<T> toAdapt)50     static <T> SdkPublisher<T> adapt(Publisher<T> toAdapt) {
51         return toAdapt::subscribe;
52     }
53 
54     /**
55      * Filters published events to just those that are instances of the given class. This changes the type of
56      * publisher to the type specified in the {@link Class}.
57      *
58      * @param clzz Class to filter to. Includes subtypes of the class.
59      * @param <U> Type of class to filter to.
60      * @return New publisher, filtered to the given class.
61      */
filter(Class<U> clzz)62     default <U extends T> SdkPublisher<U> filter(Class<U> clzz) {
63         return filter(clzz::isInstance).map(clzz::cast);
64     }
65 
66     /**
67      * Filters published events to just those that match the given predicate. Unlike {@link #filter(Class)}, this method
68      * does not change the type of the {@link Publisher}.
69      *
70      * @param predicate Predicate to match events.
71      * @return New publisher, filtered to just the events that match the predicate.
72      */
filter(Predicate<T> predicate)73     default SdkPublisher<T> filter(Predicate<T> predicate) {
74         return subscriber -> subscribe(new FilteringSubscriber<>(subscriber, predicate));
75     }
76 
77     /**
78      * Perform a mapping on the published events. Returns a new publisher of the mapped events. Typically this method will
79      * change the type of the publisher.
80      *
81      * @param mapper Mapping function to apply.
82      * @param <U> Type being mapped to.
83      * @return New publisher with events mapped according to the given function.
84      */
map(Function<T, U> mapper)85     default <U> SdkPublisher<U> map(Function<T, U> mapper) {
86         return subscriber -> subscribe(MappingSubscriber.create(subscriber, mapper));
87     }
88 
89     /**
90      * Performs a mapping on the published events and creates a new publisher that emits the mapped events one by one.
91      *
92      * @param mapper Mapping function that produces an {@link Iterable} of new events to be flattened.
93      * @param <U> Type of flattened event being mapped to.
94      * @return New publisher of flattened events.
95      */
flatMapIterable(Function<T, Iterable<U>> mapper)96     default <U> SdkPublisher<U> flatMapIterable(Function<T, Iterable<U>> mapper) {
97         return subscriber -> map(mapper).subscribe(new FlatteningSubscriber<>(subscriber));
98     }
99 
100     /**
101      * Buffers the events into lists of the given buffer size. Note that the last batch of events may be less than
102      * the buffer size.
103      *
104      * @param bufferSize Number of events to buffer before delivering downstream.
105      * @return New publisher of buffered events.
106      */
buffer(int bufferSize)107     default SdkPublisher<List<T>> buffer(int bufferSize) {
108         return subscriber -> subscribe(new BufferingSubscriber<>(subscriber, bufferSize));
109     }
110 
111     /**
112      * Limit the number of published events and cancel the subscription after that limit has been reached. The limit
113      * may never be reached if the downstream publisher doesn't have many events to publish. Once it reaches the limit,
114      * subsequent requests will be ignored.
115      *
116      * @param limit Number of events to publish.
117      * @return New publisher that will only publish up to the specified number of events.
118      */
limit(int limit)119     default SdkPublisher<T> limit(int limit) {
120         return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
121     }
122 
123 
124     /**
125      * Creates a new publisher that emits trailing events provided by {@code trailingDataSupplier} in addition to the
126      * published events.
127      *
128      * @param trailingDataSupplier supplier to provide the trailing data
129      * @return New publisher that will publish additional events
130      */
addTrailingData(Supplier<Iterable<T>> trailingDataSupplier)131     default SdkPublisher<T> addTrailingData(Supplier<Iterable<T>> trailingDataSupplier) {
132         return subscriber -> subscribe(new AddingTrailingDataSubscriber<T>(subscriber, trailingDataSupplier));
133     }
134 
135     /**
136      * Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
137      *
138      * @param afterOnComplete The logic that should be run immediately after onComplete.
139      * @return New publisher that invokes the requested callback.
140      */
doAfterOnComplete(Runnable afterOnComplete)141     default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
142         return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
143     }
144 
145     /**
146      * Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
147      *
148      * @param afterOnError The logic that should be run immediately after onError.
149      * @return New publisher that invokes the requested callback.
150      */
doAfterOnError(Consumer<Throwable> afterOnError)151     default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
152         return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
153     }
154 
155     /**
156      * Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
157      *
158      * @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
159      * @return New publisher that invokes the requested callback.
160      */
doAfterOnCancel(Runnable afterOnCancel)161     default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
162         return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
163     }
164 
165     /**
166      * Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
167      * published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more
168      * control over backpressure is required, consider using {@link #subscribe(Subscriber)}.
169      *
170      * @param consumer Consumer to process event.
171      * @return CompletableFuture that will be notified when all events have been consumed or if an error occurs.
172      */
subscribe(Consumer<T> consumer)173     default CompletableFuture<Void> subscribe(Consumer<T> consumer) {
174         CompletableFuture<Void> future = new CompletableFuture<>();
175         subscribe(new SequentialSubscriber<>(consumer, future));
176         return future;
177     }
178 }
179