• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.http.crt.internal.response;
17 
18 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
19 
20 import java.nio.ByteBuffer;
21 import java.util.concurrent.CompletableFuture;
22 import software.amazon.awssdk.annotations.SdkInternalApi;
23 import software.amazon.awssdk.annotations.SdkTestInternalApi;
24 import software.amazon.awssdk.crt.CRT;
25 import software.amazon.awssdk.crt.http.HttpClientConnection;
26 import software.amazon.awssdk.crt.http.HttpException;
27 import software.amazon.awssdk.crt.http.HttpHeader;
28 import software.amazon.awssdk.crt.http.HttpHeaderBlock;
29 import software.amazon.awssdk.crt.http.HttpStream;
30 import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
31 import software.amazon.awssdk.http.SdkHttpResponse;
32 import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
33 import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
34 import software.amazon.awssdk.utils.Logger;
35 import software.amazon.awssdk.utils.Validate;
36 import software.amazon.awssdk.utils.async.SimplePublisher;
37 
38 /**
39  * Response handler adaptor for {@link AwsCrtAsyncHttpClient}.
40  * <p>
41  * Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
42  */
43 @SdkInternalApi
44 public final class CrtResponseAdapter implements HttpStreamResponseHandler {
45     private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class);
46 
47     private final HttpClientConnection connection;
48     private final CompletableFuture<Void> completionFuture;
49     private final SdkAsyncHttpResponseHandler responseHandler;
50     private final SimplePublisher<ByteBuffer> responsePublisher;
51 
52     private final SdkHttpResponse.Builder responseBuilder;
53     private final ResponseHandlerHelper responseHandlerHelper;
54 
CrtResponseAdapter(HttpClientConnection connection, CompletableFuture<Void> completionFuture, SdkAsyncHttpResponseHandler responseHandler)55     private CrtResponseAdapter(HttpClientConnection connection,
56                                CompletableFuture<Void> completionFuture,
57                                SdkAsyncHttpResponseHandler responseHandler) {
58         this(connection, completionFuture, responseHandler, new SimplePublisher<>());
59     }
60 
61 
62     @SdkTestInternalApi
CrtResponseAdapter(HttpClientConnection connection, CompletableFuture<Void> completionFuture, SdkAsyncHttpResponseHandler responseHandler, SimplePublisher<ByteBuffer> simplePublisher)63     public CrtResponseAdapter(HttpClientConnection connection,
64                                CompletableFuture<Void> completionFuture,
65                                SdkAsyncHttpResponseHandler responseHandler,
66                                SimplePublisher<ByteBuffer> simplePublisher) {
67         this.connection = Validate.paramNotNull(connection, "connection");
68         this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
69         this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
70         this.responseBuilder = SdkHttpResponse.builder();
71         this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection);
72         this.responsePublisher = simplePublisher;
73     }
74 
toCrtResponseHandler(HttpClientConnection crtConn, CompletableFuture<Void> requestFuture, SdkAsyncHttpResponseHandler responseHandler)75     public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
76                                                                  CompletableFuture<Void> requestFuture,
77                                                                  SdkAsyncHttpResponseHandler responseHandler) {
78         return new CrtResponseAdapter(crtConn, requestFuture, responseHandler);
79     }
80 
81     @Override
onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders)82     public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
83         responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders);
84     }
85 
86     @Override
onResponseHeadersDone(HttpStream stream, int headerType)87     public void onResponseHeadersDone(HttpStream stream, int headerType) {
88         if (headerType == HttpHeaderBlock.MAIN.getValue()) {
89             responseHandler.onHeaders(responseBuilder.build());
90             responseHandler.onStream(responsePublisher);
91         }
92     }
93 
94     @Override
onResponseBody(HttpStream stream, byte[] bodyBytesIn)95     public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
96         CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));
97 
98         if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
99             // Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
100             return bodyBytesIn.length;
101         }
102 
103         writeFuture.whenComplete((result, failure) -> {
104             if (failure != null) {
105                 handlePublisherError(stream, failure);
106                 return;
107             }
108 
109             responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length);
110         });
111 
112         return 0;
113     }
114 
115     @Override
onResponseComplete(HttpStream stream, int errorCode)116     public void onResponseComplete(HttpStream stream, int errorCode) {
117         if (errorCode == CRT.AWS_CRT_SUCCESS) {
118             onSuccessfulResponseComplete(stream);
119         } else {
120             onFailedResponseComplete(stream, new HttpException(errorCode));
121         }
122     }
123 
onSuccessfulResponseComplete(HttpStream stream)124     private void onSuccessfulResponseComplete(HttpStream stream) {
125         responsePublisher.complete().whenComplete((result, failure) -> {
126             if (failure != null) {
127                 handlePublisherError(stream, failure);
128                 return;
129             }
130             completionFuture.complete(null);
131         });
132 
133         responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream);
134     }
135 
handlePublisherError(HttpStream stream, Throwable failure)136     private void handlePublisherError(HttpStream stream, Throwable failure) {
137         failResponseHandlerAndFuture(stream, failure);
138         responseHandlerHelper.closeConnection(stream);
139     }
140 
onFailedResponseComplete(HttpStream stream, HttpException error)141     private void onFailedResponseComplete(HttpStream stream, HttpException error) {
142         log.debug(() -> "HTTP response encountered an error.", error);
143 
144         Throwable toThrow = wrapWithIoExceptionIfRetryable(error);;
145         responsePublisher.error(toThrow);
146         failResponseHandlerAndFuture(stream, toThrow);
147         responseHandlerHelper.closeConnection(stream);
148     }
149 
failResponseHandlerAndFuture(HttpStream stream, Throwable error)150     private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
151         callResponseHandlerOnError(error);
152         completionFuture.completeExceptionally(error);
153     }
154 
callResponseHandlerOnError(Throwable error)155     private void callResponseHandlerOnError(Throwable error) {
156         try {
157             responseHandler.onError(error);
158         } catch (RuntimeException e) {
159             log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
160         }
161     }
162 }
163