• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.async;
17 
18 import java.io.File;
19 import java.io.InputStream;
20 import java.nio.ByteBuffer;
21 import java.nio.file.Path;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.function.Consumer;
24 import software.amazon.awssdk.annotations.SdkPublicApi;
25 import software.amazon.awssdk.core.FileTransformerConfiguration;
26 import software.amazon.awssdk.core.ResponseBytes;
27 import software.amazon.awssdk.core.ResponseInputStream;
28 import software.amazon.awssdk.core.SdkResponse;
29 import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
30 import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
31 import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
32 import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
33 import software.amazon.awssdk.utils.Validate;
34 
35 /**
36  * Callback interface to handle a streaming asynchronous response.
37  * <p>
38  * <h2>Synchronization</h2>
39  * <p>
40  * All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be
41  * synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be
42  * invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread.
43  * <p>
44  * <h2>Invocation Order</h2>
45  * <p>
46  * The methods are called in the following order:
47  * <ul>
48  *     <li>
49  *     {@link #prepare()}: This method is always called first. Implementations should use this to setup or perform any
50  *     cleanup necessary. <b>Note that this will be called upon each request attempt</b>. If the {@link CompletableFuture}
51  *     returned from the previous invocation has already been completed, the implementation should return a new instance.
52  *     </li>
53  *     <li>
54  *     {@link #onResponse}: If the response was received successfully, this method is called next.
55  *     </li>
56  *     <li>
57  *     {@link #onStream(SdkPublisher)}: Called after {@code onResponse}. This is always invoked, even if the service
58  *     operation response does not contain a body. If the response does not have a body, then the {@link SdkPublisher} will
59  *     complete the subscription without signaling any elements.
60  *     </li>
61  *     <li>
62  *     {@link #exceptionOccurred(Throwable)}: If there is an error sending the request. This method is called before {@link
63  *     org.reactivestreams.Subscriber#onError(Throwable)}.
64  *     </li>
65  *     <li>
66  *     {@link org.reactivestreams.Subscriber#onError(Throwable)}: If an error is encountered while the {@code Publisher} is
67  *     publishing to a {@link org.reactivestreams.Subscriber}.
68  *     </li>
69  * </ul>
70  * <p>
71  * <h2>Retries</h2>
72  * <p>
73  * The transformer has the ability to trigger retries at any time by completing the {@link CompletableFuture} with an
74  * exception that is deemed retryable by the configured {@link software.amazon.awssdk.core.retry.RetryPolicy}.
75  *
76  * @param <ResponseT> POJO response type.
77  * @param <ResultT>   Type this response handler produces. I.E. the type you are transforming the response into.
78  */
79 @SdkPublicApi
80 public interface AsyncResponseTransformer<ResponseT, ResultT> {
81     /**
82      * Initial call to enable any setup required before the response is handled.
83      * <p>
84      * Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link
85      * software.amazon.awssdk.core.retry.RetryPolicy}.
86      * <p>
87      * This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is
88      * signaled.
89      *
90      * @return The future holding the transformed response.
91      */
prepare()92     CompletableFuture<ResultT> prepare();
93 
94     /**
95      * Called when the unmarshalled response object is ready.
96      *
97      * @param response The unmarshalled response.
98      */
onResponse(ResponseT response)99     void onResponse(ResponseT response);
100 
101     /**
102      * Called when the response stream is ready.
103      *
104      * @param publisher The publisher.
105      */
onStream(SdkPublisher<ByteBuffer> publisher)106     void onStream(SdkPublisher<ByteBuffer> publisher);
107 
108     /**
109      * Called when an error is encountered while making the request or receiving the response.
110      * Implementations should free up any resources in this method. This method may be called
111      * multiple times during the lifecycle of a request if automatic retries are enabled.
112      *
113      * @param error Error that occurred.
114      */
exceptionOccurred(Throwable error)115     void exceptionOccurred(Throwable error);
116 
117     /**
118      * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
119      * the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
120      * exception will be thrown.
121      *
122      * @param path        Path to file to write to.
123      * @param <ResponseT> Pojo Response type.
124      * @return AsyncResponseTransformer instance.
125      * @see #toFile(Path, FileTransformerConfiguration)
126      */
toFile(Path path)127     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path) {
128         return new FileAsyncResponseTransformer<>(path);
129     }
130 
131     /**
132      * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
133      * FileTransformerConfiguration}.
134      *
135      * @param path        Path to file to write to.
136      * @param config      configuration for the transformer
137      * @param <ResponseT> Pojo Response type.
138      * @return AsyncResponseTransformer instance.
139      * @see FileTransformerConfiguration
140      */
toFile(Path path, FileTransformerConfiguration config)141     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path, FileTransformerConfiguration config) {
142         return new FileAsyncResponseTransformer<>(path, config);
143     }
144 
145     /**
146      * This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
147      * avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
148      *
149      * @see #toFile(Path, FileTransformerConfiguration)
150      */
toFile( Path path, Consumer<FileTransformerConfiguration.Builder> config)151     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(
152         Path path, Consumer<FileTransformerConfiguration.Builder> config) {
153         Validate.paramNotNull(config, "config");
154         return new FileAsyncResponseTransformer<>(path, FileTransformerConfiguration.builder().applyMutation(config).build());
155     }
156 
157     /**
158      * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
159      * the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
160      * exception will be thrown.
161      *
162      * @param file        File to write to.
163      * @param <ResponseT> Pojo Response type.
164      * @return AsyncResponseTransformer instance.
165      */
toFile(File file)166     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File file) {
167         return toFile(file.toPath());
168     }
169 
170     /**
171      * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link
172      * FileTransformerConfiguration}.
173      *
174      * @param file        File to write to.
175      * @param config      configuration for the transformer
176      * @param <ResponseT> Pojo Response type.
177      * @return AsyncResponseTransformer instance.
178      * @see FileTransformerConfiguration
179      */
toFile(File file, FileTransformerConfiguration config)180     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File file, FileTransformerConfiguration config) {
181         return new FileAsyncResponseTransformer<>(file.toPath(), config);
182     }
183 
184     /**
185      * This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder,
186      * avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}.
187      *
188      * @see #toFile(File, FileTransformerConfiguration)
189      */
toFile( File file, Consumer<FileTransformerConfiguration.Builder> config)190     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(
191         File file, Consumer<FileTransformerConfiguration.Builder> config) {
192         Validate.paramNotNull(config, "config");
193         return new FileAsyncResponseTransformer<>(file.toPath(), FileTransformerConfiguration.builder()
194                                                                                              .applyMutation(config)
195                                                                                              .build());
196     }
197 
198     /**
199      * Creates an {@link AsyncResponseTransformer} that writes all content to a byte array.
200      *
201      * @param <ResponseT> Pojo response type.
202      * @return AsyncResponseTransformer instance.
203      */
toBytes()204     static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() {
205         return new ByteArrayAsyncResponseTransformer<>();
206     }
207 
208     /**
209      * Creates an {@link AsyncResponseTransformer} that publishes the response body content through a {@link ResponsePublisher},
210      * which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse} returned by the service.
211      * <p>
212      * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed
213      * once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some
214      * other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture}
215      * completed after the entire response body has finished streaming.
216      * <p>
217      * You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this
218      * transformer is only recommended for advanced use cases.
219      * <p>
220      * Example usage:
221      * <pre>
222      * {@code
223      *     CompletableFuture<ResponsePublisher<GetObjectResponse>> responseFuture =
224      *         s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toPublisher());
225      *     ResponsePublisher<GetObjectResponse> responsePublisher = responseFuture.join();
226      *     System.out.println(responsePublisher.response());
227      *     CompletableFuture<Void> drainPublisherFuture = responsePublisher.subscribe(System.out::println);
228      *     drainPublisherFuture.join();
229      * }
230      * </pre>
231      *
232      * @param <ResponseT> Pojo response type.
233      * @return AsyncResponseTransformer instance.
234      */
toPublisher()235     static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, ResponsePublisher<ResponseT>> toPublisher() {
236         return new PublisherAsyncResponseTransformer<>();
237     }
238 
239     /**
240      * Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an
241      * {@link InputStream}.
242      * <p>
243      * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will
244      * be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This
245      * behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only
246      * have their {@link CompletableFuture} completed after the entire response body has finished streaming.
247      * <p>
248      * You are responsible for performing blocking reads from this input stream and closing the stream when you are
249      * finished.
250      * <p>
251      * Example usage:
252      * <pre>
253      * {@code
254      *     CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture =
255      *         s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream());
256      *     try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.join()) {
257      *         responseStream.transferTo(System.out); // BLOCKS the calling thread
258      *     }
259      * }
260      * </pre>
261      */
262     static <ResponseT extends SdkResponse>
toBlockingInputStream()263             AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
264         return new InputStreamResponseTransformer<>();
265     }
266 }
267