package software.amazon.awssdk.crt.eventstream; import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.CrtRuntimeException; import software.amazon.awssdk.crt.io.ClientBootstrap; import software.amazon.awssdk.crt.io.ClientTlsContext; import software.amazon.awssdk.crt.io.SocketOptions; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; /** * Wrapper around an event stream rpc client initiated connection. */ public class ClientConnection extends CrtResource { CompletableFuture closeFuture = new CompletableFuture<>(); /** * Only for internal usage. This is invoked from JNI to create a new java wrapper for the connection. */ ClientConnection(long clientConnection) { acquireNativeHandle(clientConnection); acquireClientConnection(clientConnection); } /** * Closes the connection if it hasn't been closed already. * @param shutdownErrorCode aws-c-* error code to shutdown with. Specify 0 for success. */ public void closeConnection(int shutdownErrorCode) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } closeClientConnection(getNativeHandle(), shutdownErrorCode); } /** * @return true if the connection is open, false otherwise. */ public boolean isOpen() { if (isNull()) { return false; } return isClientConnectionOpen(getNativeHandle()); } /** * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message * flushing to the underlying transport. * @param headers List of event-stream headers. Can be null. * @param payload Payload to send for the message. Can be null. * @param messageType Message type for the rpc message. * @param messageFlags Union of message flags from MessageFlags.getByteValue() * @return completable future for synchronizing on the message flushing to the underlying transport. */ public CompletableFuture sendProtocolMessage(final List
headers, final byte[] payload, final MessageType messageType, int messageFlags) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } CompletableFuture messageFlush = new CompletableFuture<>(); sendProtocolMessage(headers, payload, messageType, messageFlags, errorCode -> { if (errorCode == 0) { messageFlush.complete(null); } else { messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); } }); return messageFlush; } /** * Sends a protocol message on the connection. Callback will be invoked upon the message flushing to the underlying * transport * * @param headers List of event-stream headers. Can be null. * @param payload Payload to send for the message. Can be null. * @param messageType Message type for the rpc message. * @param messageFlags Union of message flags from MessageFlags.getByteValue() * @param callback will be invoked upon the message flushing to the underlying transport */ public void sendProtocolMessage(final List
headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); if (result != 0) { int errorCode = CRT.awsLastError(); throw new CrtRuntimeException(errorCode); } } /** * Create a new stream. Activate() must be called on the stream for it to actually initiate the new stream. * @param continuationHandler handler to process continuation messages and state changes. * @return The new continuation object. */ public ClientConnectionContinuation newStream(final ClientConnectionContinuationHandler continuationHandler) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } long continuationHandle = newClientStream(getNativeHandle(), continuationHandler); if (continuationHandle == 0) { int lastError = CRT.awsLastError(); throw new CrtRuntimeException(lastError); } ClientConnectionContinuation connectionContinuation = new ClientConnectionContinuation(continuationHandle); continuationHandler.continuation = connectionContinuation; return connectionContinuation; } /** * Initiates a new outgoing event-stream-rpc connection. The future will be completed once the connection either * succeeds or fails. * @param hostName hostname to connect to, this can be an IPv4 address, IPv6 address, a local socket address, or a * dns name. * @param port port to connect to hostName with. For local socket address, this value is ignored. * For 32bit values exceeding Integer.MAX_VALUE use two's complement (i.e. -1 == 0xFFFFFFFF). * @param socketOptions socketOptions to use. * @param tlsContext (optional) tls context to use for using SSL/TLS in the connection. * @param bootstrap clientBootstrap object to run the connection on. * @param connectionHandler handler to process connection messages and state changes. * @return The future will be completed once the connection either succeeds or fails. */ public static CompletableFuture connect(final String hostName, int port, final SocketOptions socketOptions, final ClientTlsContext tlsContext, final ClientBootstrap bootstrap, final ClientConnectionHandler connectionHandler) { long tlsContextHandle = tlsContext != null ? tlsContext.getNativeHandle() : 0; CompletableFuture future = new CompletableFuture<>(); ClientConnectionHandler handlerShim = new ClientConnectionHandler() { @Override protected void onConnectionSetup(ClientConnection connection, int errorCode) { connectionHandler.clientConnection = connection; connectionHandler.onConnectionSetup(connection, errorCode); if (errorCode == 0) { future.complete(null); } else { future.completeExceptionally(new CrtRuntimeException(errorCode)); } } @Override protected void onProtocolMessage(List
headers, byte[] payload, MessageType messageType, int messageFlags) { connectionHandler.onProtocolMessage(headers, payload, messageType, messageFlags); } @Override protected void onConnectionClosed(int closeReason) { connectionHandler.clientConnection.closeFuture.complete(closeReason); connectionHandler.onConnectionClosed(closeReason); } }; int resultCode = clientConnect(hostName.getBytes(StandardCharsets.UTF_8), port, socketOptions.getNativeHandle(), tlsContextHandle, bootstrap.getNativeHandle(), handlerShim); if (resultCode != 0) { int lastError = CRT.awsLastError(); throw new CrtRuntimeException(lastError); } return future; } /** * @return a future for syncing on Connection closed. */ public CompletableFuture getClosedFuture() { return closeFuture; } @Override protected void releaseNativeHandle() { if (!isNull()) { releaseClientConnection(getNativeHandle()); } } @Override protected boolean canReleaseReferencesImmediately() { return true; } private static native int clientConnect(byte[] hostName, int port, long socketOptions, long tlsContext, long bootstrap, ClientConnectionHandler connectionHandler); private static native boolean isClientConnectionOpen(long connection); private static native void closeClientConnection(long connection, int errorCode); private static native void acquireClientConnection(long connection); private static native void releaseClientConnection(long connection); private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); private static native long newClientStream(long connectionPtr, ClientConnectionContinuationHandler continuationHandler); }