1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.http.crt.internal.response; 17 18 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable; 19 20 import java.nio.ByteBuffer; 21 import java.util.concurrent.CompletableFuture; 22 import software.amazon.awssdk.annotations.SdkInternalApi; 23 import software.amazon.awssdk.annotations.SdkTestInternalApi; 24 import software.amazon.awssdk.crt.CRT; 25 import software.amazon.awssdk.crt.http.HttpClientConnection; 26 import software.amazon.awssdk.crt.http.HttpException; 27 import software.amazon.awssdk.crt.http.HttpHeader; 28 import software.amazon.awssdk.crt.http.HttpHeaderBlock; 29 import software.amazon.awssdk.crt.http.HttpStream; 30 import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; 31 import software.amazon.awssdk.http.SdkHttpResponse; 32 import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; 33 import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient; 34 import software.amazon.awssdk.utils.Logger; 35 import software.amazon.awssdk.utils.Validate; 36 import software.amazon.awssdk.utils.async.SimplePublisher; 37 38 /** 39 * Response handler adaptor for {@link AwsCrtAsyncHttpClient}. 40 * <p> 41 * Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods 42 */ 43 @SdkInternalApi 44 public final class CrtResponseAdapter implements HttpStreamResponseHandler { 45 private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class); 46 47 private final HttpClientConnection connection; 48 private final CompletableFuture<Void> completionFuture; 49 private final SdkAsyncHttpResponseHandler responseHandler; 50 private final SimplePublisher<ByteBuffer> responsePublisher; 51 52 private final SdkHttpResponse.Builder responseBuilder; 53 private final ResponseHandlerHelper responseHandlerHelper; 54 CrtResponseAdapter(HttpClientConnection connection, CompletableFuture<Void> completionFuture, SdkAsyncHttpResponseHandler responseHandler)55 private CrtResponseAdapter(HttpClientConnection connection, 56 CompletableFuture<Void> completionFuture, 57 SdkAsyncHttpResponseHandler responseHandler) { 58 this(connection, completionFuture, responseHandler, new SimplePublisher<>()); 59 } 60 61 62 @SdkTestInternalApi CrtResponseAdapter(HttpClientConnection connection, CompletableFuture<Void> completionFuture, SdkAsyncHttpResponseHandler responseHandler, SimplePublisher<ByteBuffer> simplePublisher)63 public CrtResponseAdapter(HttpClientConnection connection, 64 CompletableFuture<Void> completionFuture, 65 SdkAsyncHttpResponseHandler responseHandler, 66 SimplePublisher<ByteBuffer> simplePublisher) { 67 this.connection = Validate.paramNotNull(connection, "connection"); 68 this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture"); 69 this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler"); 70 this.responseBuilder = SdkHttpResponse.builder(); 71 this.responseHandlerHelper = new ResponseHandlerHelper(responseBuilder, connection); 72 this.responsePublisher = simplePublisher; 73 } 74 toCrtResponseHandler(HttpClientConnection crtConn, CompletableFuture<Void> requestFuture, SdkAsyncHttpResponseHandler responseHandler)75 public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn, 76 CompletableFuture<Void> requestFuture, 77 SdkAsyncHttpResponseHandler responseHandler) { 78 return new CrtResponseAdapter(crtConn, requestFuture, responseHandler); 79 } 80 81 @Override onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders)82 public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) { 83 responseHandlerHelper.onResponseHeaders(stream, responseStatusCode, blockType, nextHeaders); 84 } 85 86 @Override onResponseHeadersDone(HttpStream stream, int headerType)87 public void onResponseHeadersDone(HttpStream stream, int headerType) { 88 if (headerType == HttpHeaderBlock.MAIN.getValue()) { 89 responseHandler.onHeaders(responseBuilder.build()); 90 responseHandler.onStream(responsePublisher); 91 } 92 } 93 94 @Override onResponseBody(HttpStream stream, byte[] bodyBytesIn)95 public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { 96 CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn)); 97 98 if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) { 99 // Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT. 100 return bodyBytesIn.length; 101 } 102 103 writeFuture.whenComplete((result, failure) -> { 104 if (failure != null) { 105 handlePublisherError(stream, failure); 106 return; 107 } 108 109 responseHandlerHelper.incrementWindow(stream, bodyBytesIn.length); 110 }); 111 112 return 0; 113 } 114 115 @Override onResponseComplete(HttpStream stream, int errorCode)116 public void onResponseComplete(HttpStream stream, int errorCode) { 117 if (errorCode == CRT.AWS_CRT_SUCCESS) { 118 onSuccessfulResponseComplete(stream); 119 } else { 120 onFailedResponseComplete(stream, new HttpException(errorCode)); 121 } 122 } 123 onSuccessfulResponseComplete(HttpStream stream)124 private void onSuccessfulResponseComplete(HttpStream stream) { 125 responsePublisher.complete().whenComplete((result, failure) -> { 126 if (failure != null) { 127 handlePublisherError(stream, failure); 128 return; 129 } 130 completionFuture.complete(null); 131 }); 132 133 responseHandlerHelper.cleanUpConnectionBasedOnStatusCode(stream); 134 } 135 handlePublisherError(HttpStream stream, Throwable failure)136 private void handlePublisherError(HttpStream stream, Throwable failure) { 137 failResponseHandlerAndFuture(stream, failure); 138 responseHandlerHelper.closeConnection(stream); 139 } 140 onFailedResponseComplete(HttpStream stream, HttpException error)141 private void onFailedResponseComplete(HttpStream stream, HttpException error) { 142 log.debug(() -> "HTTP response encountered an error.", error); 143 144 Throwable toThrow = wrapWithIoExceptionIfRetryable(error);; 145 responsePublisher.error(toThrow); 146 failResponseHandlerAndFuture(stream, toThrow); 147 responseHandlerHelper.closeConnection(stream); 148 } 149 failResponseHandlerAndFuture(HttpStream stream, Throwable error)150 private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) { 151 callResponseHandlerOnError(error); 152 completionFuture.completeExceptionally(error); 153 } 154 callResponseHandlerOnError(Throwable error)155 private void callResponseHandlerOnError(Throwable error) { 156 try { 157 responseHandler.onError(error); 158 } catch (RuntimeException e) { 159 log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e); 160 } 161 } 162 } 163