1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.http.crt.internal; 17 18 import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics; 19 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException; 20 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable; 21 22 import java.io.IOException; 23 import java.util.concurrent.CompletableFuture; 24 import software.amazon.awssdk.annotations.SdkInternalApi; 25 import software.amazon.awssdk.crt.CrtRuntimeException; 26 import software.amazon.awssdk.crt.http.HttpClientConnection; 27 import software.amazon.awssdk.crt.http.HttpException; 28 import software.amazon.awssdk.crt.http.HttpRequest; 29 import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; 30 import software.amazon.awssdk.http.SdkCancellationException; 31 import software.amazon.awssdk.http.async.AsyncExecuteRequest; 32 import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; 33 import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter; 34 import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; 35 import software.amazon.awssdk.metrics.MetricCollector; 36 import software.amazon.awssdk.metrics.NoOpMetricCollector; 37 import software.amazon.awssdk.utils.Logger; 38 39 @SdkInternalApi 40 public final class CrtAsyncRequestExecutor { 41 42 private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class); 43 execute(CrtAsyncRequestContext executionContext)44 public CompletableFuture<Void> execute(CrtAsyncRequestContext executionContext) { 45 // go ahead and get a reference to the metricCollector since multiple futures will 46 // need it regardless. 47 MetricCollector metricCollector = executionContext.metricCollector(); 48 boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector); 49 50 long acquireStartTime = 0; 51 52 if (shouldPublishMetrics) { 53 // go ahead and get acquireStartTime for the concurrency timer as early as possible, 54 // so it's as accurate as possible, but only do it in a branch since clock_gettime() 55 // results in a full sys call barrier (multiple mutexes and a hw interrupt). 56 acquireStartTime = System.nanoTime(); 57 } 58 59 CompletableFuture<Void> requestFuture = createAsyncExecutionFuture(executionContext.sdkRequest()); 60 61 // When a Connection is ready from the Connection Pool, schedule the Request on the connection 62 CompletableFuture<HttpClientConnection> httpClientConnectionCompletableFuture = 63 executionContext.crtConnPool().acquireConnection(); 64 65 long finalAcquireStartTime = acquireStartTime; 66 67 httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> { 68 AsyncExecuteRequest asyncRequest = executionContext.sdkRequest(); 69 70 if (shouldPublishMetrics) { 71 reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime); 72 } 73 74 // If we didn't get a connection for some reason, fail the request 75 if (throwable != null) { 76 Throwable toThrow = wrapConnectionFailureException(throwable); 77 reportAsyncFailure(crtConn, toThrow, requestFuture, asyncRequest.responseHandler()); 78 return; 79 } 80 81 executeRequest(executionContext, requestFuture, crtConn, asyncRequest); 82 }); 83 84 return requestFuture; 85 } 86 executeRequest(CrtAsyncRequestContext executionContext, CompletableFuture<Void> requestFuture, HttpClientConnection crtConn, AsyncExecuteRequest asyncRequest)87 private void executeRequest(CrtAsyncRequestContext executionContext, 88 CompletableFuture<Void> requestFuture, 89 HttpClientConnection crtConn, 90 AsyncExecuteRequest asyncRequest) { 91 HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext); 92 HttpStreamResponseHandler crtResponseHandler = 93 CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler()); 94 95 // Submit the request on the connection 96 try { 97 crtConn.makeRequest(crtRequest, crtResponseHandler).activate(); 98 } catch (HttpException e) { 99 Throwable toThrow = wrapWithIoExceptionIfRetryable(e); 100 reportAsyncFailure(crtConn, 101 toThrow, 102 requestFuture, 103 asyncRequest.responseHandler()); 104 } catch (IllegalStateException | CrtRuntimeException e) { 105 // CRT throws IllegalStateException if the connection is closed 106 reportAsyncFailure(crtConn, new IOException("An exception occurred when making the request", e), 107 requestFuture, 108 asyncRequest.responseHandler()); 109 } catch (Throwable throwable) { 110 reportAsyncFailure(crtConn, throwable, 111 requestFuture, 112 asyncRequest.responseHandler()); 113 } 114 } 115 116 /** 117 * Create the execution future and set up the cancellation logic. 118 * @return The created execution future. 119 */ createAsyncExecutionFuture(AsyncExecuteRequest request)120 private CompletableFuture<Void> createAsyncExecutionFuture(AsyncExecuteRequest request) { 121 CompletableFuture<Void> future = new CompletableFuture<>(); 122 123 future.whenComplete((r, t) -> { 124 if (t == null) { 125 return; 126 } 127 128 // TODO: Aborting request once it's supported in CRT 129 if (future.isCancelled()) { 130 request.responseHandler().onError(new SdkCancellationException("The request was cancelled")); 131 } 132 }); 133 134 return future; 135 } 136 137 /** 138 * Notify the provided response handler and future of the failure. 139 */ reportAsyncFailure(HttpClientConnection crtConn, Throwable cause, CompletableFuture<Void> executeFuture, SdkAsyncHttpResponseHandler responseHandler)140 private void reportAsyncFailure(HttpClientConnection crtConn, 141 Throwable cause, 142 CompletableFuture<Void> executeFuture, 143 SdkAsyncHttpResponseHandler responseHandler) { 144 if (crtConn != null) { 145 crtConn.close(); 146 } 147 148 try { 149 responseHandler.onError(cause); 150 } catch (Exception e) { 151 log.error(() -> "SdkAsyncHttpResponseHandler " + responseHandler + " threw an exception in onError. It will be " 152 + "ignored.", e); 153 } 154 executeFuture.completeExceptionally(cause); 155 } 156 } 157