• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.http.crt.internal;
17 
18 import static software.amazon.awssdk.http.crt.internal.CrtUtils.reportMetrics;
19 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapConnectionFailureException;
20 import static software.amazon.awssdk.http.crt.internal.CrtUtils.wrapWithIoExceptionIfRetryable;
21 
22 import java.io.IOException;
23 import java.util.concurrent.CompletableFuture;
24 import software.amazon.awssdk.annotations.SdkInternalApi;
25 import software.amazon.awssdk.crt.CrtRuntimeException;
26 import software.amazon.awssdk.crt.http.HttpClientConnection;
27 import software.amazon.awssdk.crt.http.HttpException;
28 import software.amazon.awssdk.crt.http.HttpRequest;
29 import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
30 import software.amazon.awssdk.http.SdkCancellationException;
31 import software.amazon.awssdk.http.async.AsyncExecuteRequest;
32 import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
33 import software.amazon.awssdk.http.crt.internal.request.CrtRequestAdapter;
34 import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter;
35 import software.amazon.awssdk.metrics.MetricCollector;
36 import software.amazon.awssdk.metrics.NoOpMetricCollector;
37 import software.amazon.awssdk.utils.Logger;
38 
39 @SdkInternalApi
40 public final class CrtAsyncRequestExecutor {
41 
42     private static final Logger log = Logger.loggerFor(CrtAsyncRequestExecutor.class);
43 
execute(CrtAsyncRequestContext executionContext)44     public CompletableFuture<Void> execute(CrtAsyncRequestContext executionContext) {
45         // go ahead and get a reference to the metricCollector since multiple futures will
46         // need it regardless.
47         MetricCollector metricCollector = executionContext.metricCollector();
48         boolean shouldPublishMetrics = metricCollector != null && !(metricCollector instanceof NoOpMetricCollector);
49 
50         long acquireStartTime = 0;
51 
52         if (shouldPublishMetrics) {
53             // go ahead and get acquireStartTime for the concurrency timer as early as possible,
54             // so it's as accurate as possible, but only do it in a branch since clock_gettime()
55             // results in a full sys call barrier (multiple mutexes and a hw interrupt).
56             acquireStartTime = System.nanoTime();
57         }
58 
59         CompletableFuture<Void> requestFuture = createAsyncExecutionFuture(executionContext.sdkRequest());
60 
61         // When a Connection is ready from the Connection Pool, schedule the Request on the connection
62         CompletableFuture<HttpClientConnection> httpClientConnectionCompletableFuture =
63             executionContext.crtConnPool().acquireConnection();
64 
65         long finalAcquireStartTime = acquireStartTime;
66 
67         httpClientConnectionCompletableFuture.whenComplete((crtConn, throwable) -> {
68             AsyncExecuteRequest asyncRequest = executionContext.sdkRequest();
69 
70             if (shouldPublishMetrics) {
71                 reportMetrics(executionContext.crtConnPool(), metricCollector, finalAcquireStartTime);
72             }
73 
74             // If we didn't get a connection for some reason, fail the request
75             if (throwable != null) {
76                 Throwable toThrow = wrapConnectionFailureException(throwable);
77                 reportAsyncFailure(crtConn, toThrow, requestFuture, asyncRequest.responseHandler());
78                 return;
79             }
80 
81             executeRequest(executionContext, requestFuture, crtConn, asyncRequest);
82         });
83 
84         return requestFuture;
85     }
86 
executeRequest(CrtAsyncRequestContext executionContext, CompletableFuture<Void> requestFuture, HttpClientConnection crtConn, AsyncExecuteRequest asyncRequest)87     private void executeRequest(CrtAsyncRequestContext executionContext,
88                                 CompletableFuture<Void> requestFuture,
89                                 HttpClientConnection crtConn,
90                                 AsyncExecuteRequest asyncRequest) {
91         HttpRequest crtRequest = CrtRequestAdapter.toAsyncCrtRequest(executionContext);
92         HttpStreamResponseHandler crtResponseHandler =
93             CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());
94 
95         // Submit the request on the connection
96         try {
97             crtConn.makeRequest(crtRequest, crtResponseHandler).activate();
98         } catch (HttpException e) {
99             Throwable toThrow = wrapWithIoExceptionIfRetryable(e);
100             reportAsyncFailure(crtConn,
101                           toThrow,
102                           requestFuture,
103                           asyncRequest.responseHandler());
104         } catch (IllegalStateException | CrtRuntimeException e) {
105             // CRT throws IllegalStateException if the connection is closed
106             reportAsyncFailure(crtConn, new IOException("An exception occurred when making the request", e),
107                           requestFuture,
108                           asyncRequest.responseHandler());
109         } catch (Throwable throwable) {
110             reportAsyncFailure(crtConn, throwable,
111                                requestFuture,
112                                asyncRequest.responseHandler());
113         }
114     }
115 
116     /**
117      * Create the execution future and set up the cancellation logic.
118      * @return The created execution future.
119      */
createAsyncExecutionFuture(AsyncExecuteRequest request)120     private CompletableFuture<Void> createAsyncExecutionFuture(AsyncExecuteRequest request) {
121         CompletableFuture<Void> future = new CompletableFuture<>();
122 
123         future.whenComplete((r, t) -> {
124             if (t == null) {
125                 return;
126             }
127 
128             // TODO: Aborting request once it's supported in CRT
129             if (future.isCancelled()) {
130                 request.responseHandler().onError(new SdkCancellationException("The request was cancelled"));
131             }
132         });
133 
134         return future;
135     }
136 
137     /**
138      * Notify the provided response handler and future of the failure.
139      */
reportAsyncFailure(HttpClientConnection crtConn, Throwable cause, CompletableFuture<Void> executeFuture, SdkAsyncHttpResponseHandler responseHandler)140     private void reportAsyncFailure(HttpClientConnection crtConn,
141                                Throwable cause,
142                                CompletableFuture<Void> executeFuture,
143                                SdkAsyncHttpResponseHandler responseHandler) {
144         if (crtConn != null) {
145             crtConn.close();
146         }
147 
148         try {
149             responseHandler.onError(cause);
150         } catch (Exception e) {
151             log.error(() -> "SdkAsyncHttpResponseHandler " + responseHandler + " threw an exception in onError. It will be "
152                             + "ignored.", e);
153         }
154         executeFuture.completeExceptionally(cause);
155     }
156 }
157