1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.internal.async; 17 18 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; 19 20 import java.io.ByteArrayOutputStream; 21 import java.nio.ByteBuffer; 22 import java.util.concurrent.CompletableFuture; 23 import org.reactivestreams.Subscriber; 24 import org.reactivestreams.Subscription; 25 import software.amazon.awssdk.annotations.SdkInternalApi; 26 import software.amazon.awssdk.core.ResponseBytes; 27 import software.amazon.awssdk.core.async.AsyncResponseTransformer; 28 import software.amazon.awssdk.core.async.SdkPublisher; 29 import software.amazon.awssdk.utils.BinaryUtils; 30 31 /** 32 * Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further 33 * conversions into types, like strings. 34 * 35 * This can be created with static methods on {@link AsyncResponseTransformer}. 36 * 37 * @param <ResponseT> Pojo response type. 38 * @see AsyncResponseTransformer#toBytes() 39 */ 40 @SdkInternalApi 41 public final class ByteArrayAsyncResponseTransformer<ResponseT> implements 42 AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> { 43 44 private volatile CompletableFuture<byte[]> cf; 45 private volatile ResponseT response; 46 47 @Override prepare()48 public CompletableFuture<ResponseBytes<ResponseT>> prepare() { 49 cf = new CompletableFuture<>(); 50 return cf.thenApply(arr -> ResponseBytes.fromByteArray(response, arr)); 51 } 52 53 @Override onResponse(ResponseT response)54 public void onResponse(ResponseT response) { 55 this.response = response; 56 } 57 58 @Override onStream(SdkPublisher<ByteBuffer> publisher)59 public void onStream(SdkPublisher<ByteBuffer> publisher) { 60 publisher.subscribe(new BaosSubscriber(cf)); 61 } 62 63 @Override exceptionOccurred(Throwable throwable)64 public void exceptionOccurred(Throwable throwable) { 65 cf.completeExceptionally(throwable); 66 } 67 68 static class BaosSubscriber implements Subscriber<ByteBuffer> { 69 private final CompletableFuture<byte[]> resultFuture; 70 71 private ByteArrayOutputStream baos = new ByteArrayOutputStream(); 72 73 private Subscription subscription; 74 BaosSubscriber(CompletableFuture<byte[]> resultFuture)75 BaosSubscriber(CompletableFuture<byte[]> resultFuture) { 76 this.resultFuture = resultFuture; 77 } 78 79 @Override onSubscribe(Subscription s)80 public void onSubscribe(Subscription s) { 81 if (this.subscription != null) { 82 s.cancel(); 83 return; 84 } 85 this.subscription = s; 86 subscription.request(Long.MAX_VALUE); 87 } 88 89 @Override onNext(ByteBuffer byteBuffer)90 public void onNext(ByteBuffer byteBuffer) { 91 invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer))); 92 subscription.request(1); 93 } 94 95 @Override onError(Throwable throwable)96 public void onError(Throwable throwable) { 97 baos = null; 98 resultFuture.completeExceptionally(throwable); 99 } 100 101 @Override onComplete()102 public void onComplete() { 103 resultFuture.complete(baos.toByteArray()); 104 } 105 } 106 } 107