• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.internal.async;
17 
18 import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;
19 
20 import java.io.ByteArrayOutputStream;
21 import java.nio.ByteBuffer;
22 import java.util.concurrent.CompletableFuture;
23 import org.reactivestreams.Subscriber;
24 import org.reactivestreams.Subscription;
25 import software.amazon.awssdk.annotations.SdkInternalApi;
26 import software.amazon.awssdk.core.ResponseBytes;
27 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
28 import software.amazon.awssdk.core.async.SdkPublisher;
29 import software.amazon.awssdk.utils.BinaryUtils;
30 
31 /**
32  * Implementation of {@link AsyncResponseTransformer} that dumps content into a byte array and supports further
33  * conversions into types, like strings.
34  *
35  * This can be created with static methods on {@link AsyncResponseTransformer}.
36  *
37  * @param <ResponseT> Pojo response type.
38  * @see AsyncResponseTransformer#toBytes()
39  */
40 @SdkInternalApi
41 public final class ByteArrayAsyncResponseTransformer<ResponseT> implements
42         AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>> {
43 
44     private volatile CompletableFuture<byte[]> cf;
45     private volatile ResponseT response;
46 
47     @Override
prepare()48     public CompletableFuture<ResponseBytes<ResponseT>> prepare() {
49         cf = new CompletableFuture<>();
50         return cf.thenApply(arr -> ResponseBytes.fromByteArray(response, arr));
51     }
52 
53     @Override
onResponse(ResponseT response)54     public void onResponse(ResponseT response) {
55         this.response = response;
56     }
57 
58     @Override
onStream(SdkPublisher<ByteBuffer> publisher)59     public void onStream(SdkPublisher<ByteBuffer> publisher) {
60         publisher.subscribe(new BaosSubscriber(cf));
61     }
62 
63     @Override
exceptionOccurred(Throwable throwable)64     public void exceptionOccurred(Throwable throwable) {
65         cf.completeExceptionally(throwable);
66     }
67 
68     static class BaosSubscriber implements Subscriber<ByteBuffer> {
69         private final CompletableFuture<byte[]> resultFuture;
70 
71         private ByteArrayOutputStream baos = new ByteArrayOutputStream();
72 
73         private Subscription subscription;
74 
BaosSubscriber(CompletableFuture<byte[]> resultFuture)75         BaosSubscriber(CompletableFuture<byte[]> resultFuture) {
76             this.resultFuture = resultFuture;
77         }
78 
79         @Override
onSubscribe(Subscription s)80         public void onSubscribe(Subscription s) {
81             if (this.subscription != null) {
82                 s.cancel();
83                 return;
84             }
85             this.subscription = s;
86             subscription.request(Long.MAX_VALUE);
87         }
88 
89         @Override
onNext(ByteBuffer byteBuffer)90         public void onNext(ByteBuffer byteBuffer) {
91             invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
92             subscription.request(1);
93         }
94 
95         @Override
onError(Throwable throwable)96         public void onError(Throwable throwable) {
97             baos = null;
98             resultFuture.completeExceptionally(throwable);
99         }
100 
101         @Override
onComplete()102         public void onComplete() {
103             resultFuture.complete(baos.toByteArray());
104         }
105     }
106 }
107