package software.amazon.awssdk.crt.http; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.SocketOptions; import software.amazon.awssdk.crt.io.TlsConnectionOptions; import software.amazon.awssdk.crt.AsyncCallback; import software.amazon.awssdk.crt.io.TlsContext; import java.util.concurrent.CompletableFuture; import java.net.URI; import java.nio.charset.Charset; /** * Manages a Pool of HTTP/2 Streams. Creates and manages HTTP/2 connections * under the hood. */ public class Http2StreamManager extends CrtResource { private static final String HTTP = "http"; private static final String HTTPS = "https"; private static final int DEFAULT_HTTP_PORT = 80; private static final int DEFAULT_HTTPS_PORT = 443; private final static Charset UTF8 = java.nio.charset.StandardCharsets.UTF_8; private final URI uri; private final int port; private final int maxConnections; private final int idealConcurrentStreamsPerConnection; private final int maxConcurrentStreamsPerConnection; private final CompletableFuture shutdownComplete = new CompletableFuture<>(); /** * Factory function for Http2StreamManager instances * * @param options configuration options * @return a new instance of an Http2StreamManager */ public static Http2StreamManager create(Http2StreamManagerOptions options) { return new Http2StreamManager(options); } private Http2StreamManager(Http2StreamManagerOptions options) { options.validateOptions(); HttpClientConnectionManagerOptions connectionManagerOptions = options.getConnectionManagerOptions(); URI uri = connectionManagerOptions.getUri(); ClientBootstrap clientBootstrap = connectionManagerOptions.getClientBootstrap(); SocketOptions socketOptions = connectionManagerOptions.getSocketOptions(); boolean useTls = HTTPS.equals(uri.getScheme()); TlsContext tlsContext = connectionManagerOptions.getTlsContext(); TlsConnectionOptions tlsConnectionOptions = connectionManagerOptions.getTlsConnectionOptions(); int maxConnections = connectionManagerOptions.getMaxConnections(); int port = connectionManagerOptions.getPort(); if (port == -1) { port = uri.getPort(); /* Pick a default port based on the scheme if one wasn't set */ if (port == -1) { if (HTTP.equals(uri.getScheme())) { port = DEFAULT_HTTP_PORT; } if (HTTPS.equals(uri.getScheme())) { port = DEFAULT_HTTPS_PORT; } } } int maxConcurrentStreamsPerConnection = options.getMaxConcurrentStreamsPerConnection(); int idealConcurrentStreamsPerConnection = options.getIdealConcurrentStreamsPerConnection(); this.uri = uri; this.port = port; this.maxConnections = maxConnections; this.idealConcurrentStreamsPerConnection = idealConcurrentStreamsPerConnection; this.maxConcurrentStreamsPerConnection = maxConcurrentStreamsPerConnection; int proxyConnectionType = 0; String proxyHost = null; int proxyPort = 0; TlsContext proxyTlsContext = null; int proxyAuthorizationType = 0; String proxyAuthorizationUsername = null; String proxyAuthorizationPassword = null; HttpProxyOptions proxyOptions = connectionManagerOptions.getProxyOptions(); if (proxyOptions != null) { proxyConnectionType = proxyOptions.getConnectionType().getValue(); proxyHost = proxyOptions.getHost(); proxyPort = proxyOptions.getPort(); proxyTlsContext = proxyOptions.getTlsContext(); proxyAuthorizationType = proxyOptions.getAuthorizationType().getValue(); proxyAuthorizationUsername = proxyOptions.getAuthorizationUsername(); proxyAuthorizationPassword = proxyOptions.getAuthorizationPassword(); } HttpMonitoringOptions monitoringOptions = connectionManagerOptions.getMonitoringOptions(); long monitoringThroughputThresholdInBytesPerSecond = 0; int monitoringFailureIntervalInSeconds = 0; if (monitoringOptions != null) { monitoringThroughputThresholdInBytesPerSecond = monitoringOptions.getMinThroughputBytesPerSecond(); monitoringFailureIntervalInSeconds = monitoringOptions.getAllowableThroughputFailureIntervalSeconds(); } acquireNativeHandle(http2StreamManagerNew(this, clientBootstrap.getNativeHandle(), socketOptions.getNativeHandle(), useTls && tlsContext!=null ? tlsContext.getNativeHandle() : 0, useTls && tlsConnectionOptions!=null ? tlsConnectionOptions.getNativeHandle() : 0, Http2ConnectionSetting.marshallSettingsForJNI(options.getInitialSettingsList()), uri.getHost().getBytes(UTF8), port, proxyConnectionType, proxyHost != null ? proxyHost.getBytes(UTF8) : null, proxyPort, proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0, proxyAuthorizationType, proxyAuthorizationUsername != null ? proxyAuthorizationUsername.getBytes(UTF8) : null, proxyAuthorizationPassword != null ? proxyAuthorizationPassword.getBytes(UTF8) : null, connectionManagerOptions.isManualWindowManagement(), monitoringThroughputThresholdInBytesPerSecond, monitoringFailureIntervalInSeconds, maxConnections, idealConcurrentStreamsPerConnection, maxConcurrentStreamsPerConnection, options.hasPriorKnowledge(), options.shouldCloseConnectionOnServerError(), options.getConnectionPingPeriodMs(), options.getConnectionPingTimeoutMs())); /* * we don't need to add a reference to socketOptions since it's copied during * connection manager construction */ addReferenceTo(clientBootstrap); if (useTls) { addReferenceTo(tlsContext); } } /** * Request a Http2Stream from StreamManager. * * @param request The Request to make to the Server. * @param streamHandler The Stream Handler to be called from the Native * EventLoop * @return A future for a Http2Stream that will be completed when the stream is * acquired. */ public CompletableFuture acquireStream(Http2Request request, HttpStreamBaseResponseHandler streamHandler) { return this.acquireStream((HttpRequestBase) request, streamHandler); } public CompletableFuture acquireStream(HttpRequest request, HttpStreamBaseResponseHandler streamHandler) { return this.acquireStream((HttpRequestBase) request, streamHandler); } private CompletableFuture acquireStream(HttpRequestBase request, HttpStreamBaseResponseHandler streamHandler) { CompletableFuture completionFuture = new CompletableFuture<>(); AsyncCallback acquireStreamCompleted = AsyncCallback.wrapFuture(completionFuture, null); if (isNull()) { completionFuture.completeExceptionally(new IllegalStateException( "Http2StreamManager has been closed, can't acquire new streams")); return completionFuture; } try { http2StreamManagerAcquireStream(this.getNativeHandle(), request.marshalForJni(), request.getBodyStream(), new HttpStreamResponseHandlerNativeAdapter(streamHandler), acquireStreamCompleted); } catch (CrtRuntimeException ex) { completionFuture.completeExceptionally(ex); } return completionFuture; } /** * @return maximum number of connections this connection manager will pool */ public int getMaxConnections() { return maxConnections; } /** * @return concurrency metrics for the current manager */ public HttpManagerMetrics getManagerMetrics() { if (isNull()) { throw new IllegalStateException("HttpClientConnectionManager has been closed, can't fetch metrics"); } return http2StreamManagerFetchMetrics(getNativeHandle()); } /** * Called from Native when all Streams from this Stream manager have finished * and underlying resources like connections opened under the hood has been * cleaned up * begin releasing Native Resources that Http2StreamManager depends on. */ private void onShutdownComplete() { releaseReferences(); this.shutdownComplete.complete(null); } /** * Determines whether a resource releases its dependencies at the same time the * native handle is released or if it waits. * Resources that wait are responsible for calling releaseReferences() manually. */ @Override protected boolean canReleaseReferencesImmediately() { return false; } /** * Closes this Connection Pool and any pending Connection Acquisitions */ @Override protected void releaseNativeHandle() { if (!isNull()) { /* * Release our Native pointer and schedule tasks on the Native Event Loop to * start sending HTTP/TLS/TCP * connection shutdown messages to peers for any open Connections. */ http2StreamManagerRelease(getNativeHandle()); } } public CompletableFuture getShutdownCompleteFuture() { return shutdownComplete; } /******************************************************************************* * Native methods ******************************************************************************/ private static native long http2StreamManagerNew(Http2StreamManager thisObj, long client_bootstrap, long socketOptions, long tlsContext, long tlsConnectionOptions, long[] marshalledSettings, byte[] endpoint, int port, int proxyConnectionType, byte[] proxyHost, int proxyPort, long proxyTlsContext, int proxyAuthorizationType, byte[] proxyAuthorizationUsername, byte[] proxyAuthorizationPassword, boolean isManualWindowManagement, long monitoringThroughputThresholdInBytesPerSecond, int monitoringFailureIntervalInSeconds, int maxConns, int ideal_concurrent_streams_per_connection, int max_concurrent_streams_per_connection, boolean priorKnowledge, boolean closeConnectionOnServerError, int connectionPingPeriodMs, int connectionPingTimeoutMs) throws CrtRuntimeException; private static native void http2StreamManagerRelease(long stream_manager) throws CrtRuntimeException; private static native void http2StreamManagerAcquireStream(long stream_manager, byte[] marshalledRequest, HttpRequestBodyStream bodyStream, HttpStreamResponseHandlerNativeAdapter responseHandler, AsyncCallback completedCallback) throws CrtRuntimeException; private static native HttpManagerMetrics http2StreamManagerFetchMetrics(long stream_manager) throws CrtRuntimeException; }