1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.async; 17 18 import java.io.File; 19 import java.io.InputStream; 20 import java.nio.ByteBuffer; 21 import java.nio.file.Path; 22 import java.util.concurrent.CompletableFuture; 23 import java.util.function.Consumer; 24 import software.amazon.awssdk.annotations.SdkPublicApi; 25 import software.amazon.awssdk.core.FileTransformerConfiguration; 26 import software.amazon.awssdk.core.ResponseBytes; 27 import software.amazon.awssdk.core.ResponseInputStream; 28 import software.amazon.awssdk.core.SdkResponse; 29 import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer; 30 import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; 31 import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer; 32 import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer; 33 import software.amazon.awssdk.utils.Validate; 34 35 /** 36 * Callback interface to handle a streaming asynchronous response. 37 * <p> 38 * <h2>Synchronization</h2> 39 * <p> 40 * All operations, including those called on the {@link org.reactivestreams.Subscriber} of the stream are guaranteed to be 41 * synchronized externally; i.e. no two methods on this interface or on the {@link org.reactivestreams.Subscriber} will be 42 * invoked concurrently. It is <b>not</b> guaranteed that the methods will being invoked by the same thread. 43 * <p> 44 * <h2>Invocation Order</h2> 45 * <p> 46 * The methods are called in the following order: 47 * <ul> 48 * <li> 49 * {@link #prepare()}: This method is always called first. Implementations should use this to setup or perform any 50 * cleanup necessary. <b>Note that this will be called upon each request attempt</b>. If the {@link CompletableFuture} 51 * returned from the previous invocation has already been completed, the implementation should return a new instance. 52 * </li> 53 * <li> 54 * {@link #onResponse}: If the response was received successfully, this method is called next. 55 * </li> 56 * <li> 57 * {@link #onStream(SdkPublisher)}: Called after {@code onResponse}. This is always invoked, even if the service 58 * operation response does not contain a body. If the response does not have a body, then the {@link SdkPublisher} will 59 * complete the subscription without signaling any elements. 60 * </li> 61 * <li> 62 * {@link #exceptionOccurred(Throwable)}: If there is an error sending the request. This method is called before {@link 63 * org.reactivestreams.Subscriber#onError(Throwable)}. 64 * </li> 65 * <li> 66 * {@link org.reactivestreams.Subscriber#onError(Throwable)}: If an error is encountered while the {@code Publisher} is 67 * publishing to a {@link org.reactivestreams.Subscriber}. 68 * </li> 69 * </ul> 70 * <p> 71 * <h2>Retries</h2> 72 * <p> 73 * The transformer has the ability to trigger retries at any time by completing the {@link CompletableFuture} with an 74 * exception that is deemed retryable by the configured {@link software.amazon.awssdk.core.retry.RetryPolicy}. 75 * 76 * @param <ResponseT> POJO response type. 77 * @param <ResultT> Type this response handler produces. I.E. the type you are transforming the response into. 78 */ 79 @SdkPublicApi 80 public interface AsyncResponseTransformer<ResponseT, ResultT> { 81 /** 82 * Initial call to enable any setup required before the response is handled. 83 * <p> 84 * Note that this will be called for each request attempt, up to the number of retries allowed by the configured {@link 85 * software.amazon.awssdk.core.retry.RetryPolicy}. 86 * <p> 87 * This method is guaranteed to be called before the request is executed, and before {@link #onResponse(Object)} is 88 * signaled. 89 * 90 * @return The future holding the transformed response. 91 */ prepare()92 CompletableFuture<ResultT> prepare(); 93 94 /** 95 * Called when the unmarshalled response object is ready. 96 * 97 * @param response The unmarshalled response. 98 */ onResponse(ResponseT response)99 void onResponse(ResponseT response); 100 101 /** 102 * Called when the response stream is ready. 103 * 104 * @param publisher The publisher. 105 */ onStream(SdkPublisher<ByteBuffer> publisher)106 void onStream(SdkPublisher<ByteBuffer> publisher); 107 108 /** 109 * Called when an error is encountered while making the request or receiving the response. 110 * Implementations should free up any resources in this method. This method may be called 111 * multiple times during the lifecycle of a request if automatic retries are enabled. 112 * 113 * @param error Error that occurred. 114 */ exceptionOccurred(Throwable error)115 void exceptionOccurred(Throwable error); 116 117 /** 118 * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, 119 * the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an 120 * exception will be thrown. 121 * 122 * @param path Path to file to write to. 123 * @param <ResponseT> Pojo Response type. 124 * @return AsyncResponseTransformer instance. 125 * @see #toFile(Path, FileTransformerConfiguration) 126 */ toFile(Path path)127 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path) { 128 return new FileAsyncResponseTransformer<>(path); 129 } 130 131 /** 132 * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link 133 * FileTransformerConfiguration}. 134 * 135 * @param path Path to file to write to. 136 * @param config configuration for the transformer 137 * @param <ResponseT> Pojo Response type. 138 * @return AsyncResponseTransformer instance. 139 * @see FileTransformerConfiguration 140 */ toFile(Path path, FileTransformerConfiguration config)141 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path, FileTransformerConfiguration config) { 142 return new FileAsyncResponseTransformer<>(path, config); 143 } 144 145 /** 146 * This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, 147 * avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}. 148 * 149 * @see #toFile(Path, FileTransformerConfiguration) 150 */ toFile( Path path, Consumer<FileTransformerConfiguration.Builder> config)151 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile( 152 Path path, Consumer<FileTransformerConfiguration.Builder> config) { 153 Validate.paramNotNull(config, "config"); 154 return new FileAsyncResponseTransformer<>(path, FileTransformerConfiguration.builder().applyMutation(config).build()); 155 } 156 157 /** 158 * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error, 159 * the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an 160 * exception will be thrown. 161 * 162 * @param file File to write to. 163 * @param <ResponseT> Pojo Response type. 164 * @return AsyncResponseTransformer instance. 165 */ toFile(File file)166 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File file) { 167 return toFile(file.toPath()); 168 } 169 170 /** 171 * Creates an {@link AsyncResponseTransformer} that writes all the content to the given file with the specified {@link 172 * FileTransformerConfiguration}. 173 * 174 * @param file File to write to. 175 * @param config configuration for the transformer 176 * @param <ResponseT> Pojo Response type. 177 * @return AsyncResponseTransformer instance. 178 * @see FileTransformerConfiguration 179 */ toFile(File file, FileTransformerConfiguration config)180 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(File file, FileTransformerConfiguration config) { 181 return new FileAsyncResponseTransformer<>(file.toPath(), config); 182 } 183 184 /** 185 * This is a convenience method that creates an instance of the {@link FileTransformerConfiguration} builder, 186 * avoiding the need to create one manually via {@link FileTransformerConfiguration#builder()}. 187 * 188 * @see #toFile(File, FileTransformerConfiguration) 189 */ toFile( File file, Consumer<FileTransformerConfiguration.Builder> config)190 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile( 191 File file, Consumer<FileTransformerConfiguration.Builder> config) { 192 Validate.paramNotNull(config, "config"); 193 return new FileAsyncResponseTransformer<>(file.toPath(), FileTransformerConfiguration.builder() 194 .applyMutation(config) 195 .build()); 196 } 197 198 /** 199 * Creates an {@link AsyncResponseTransformer} that writes all content to a byte array. 200 * 201 * @param <ResponseT> Pojo response type. 202 * @return AsyncResponseTransformer instance. 203 */ toBytes()204 static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> toBytes() { 205 return new ByteArrayAsyncResponseTransformer<>(); 206 } 207 208 /** 209 * Creates an {@link AsyncResponseTransformer} that publishes the response body content through a {@link ResponsePublisher}, 210 * which is an {@link SdkPublisher} that also contains a reference to the {@link SdkResponse} returned by the service. 211 * <p> 212 * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will be completed 213 * once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This behavior differs from some 214 * other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only have their {@link CompletableFuture} 215 * completed after the entire response body has finished streaming. 216 * <p> 217 * You are responsible for subscribing to this publisher and managing the associated back-pressure. Therefore, this 218 * transformer is only recommended for advanced use cases. 219 * <p> 220 * Example usage: 221 * <pre> 222 * {@code 223 * CompletableFuture<ResponsePublisher<GetObjectResponse>> responseFuture = 224 * s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toPublisher()); 225 * ResponsePublisher<GetObjectResponse> responsePublisher = responseFuture.join(); 226 * System.out.println(responsePublisher.response()); 227 * CompletableFuture<Void> drainPublisherFuture = responsePublisher.subscribe(System.out::println); 228 * drainPublisherFuture.join(); 229 * } 230 * </pre> 231 * 232 * @param <ResponseT> Pojo response type. 233 * @return AsyncResponseTransformer instance. 234 */ toPublisher()235 static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, ResponsePublisher<ResponseT>> toPublisher() { 236 return new PublisherAsyncResponseTransformer<>(); 237 } 238 239 /** 240 * Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an 241 * {@link InputStream}. 242 * <p> 243 * When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will 244 * be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This 245 * behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only 246 * have their {@link CompletableFuture} completed after the entire response body has finished streaming. 247 * <p> 248 * You are responsible for performing blocking reads from this input stream and closing the stream when you are 249 * finished. 250 * <p> 251 * Example usage: 252 * <pre> 253 * {@code 254 * CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture = 255 * s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream()); 256 * try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.join()) { 257 * responseStream.transferTo(System.out); // BLOCKS the calling thread 258 * } 259 * } 260 * </pre> 261 */ 262 static <ResponseT extends SdkResponse> toBlockingInputStream()263 AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() { 264 return new InputStreamResponseTransformer<>(); 265 } 266 } 267