1 package software.amazon.awssdk.crt.eventstream; 2 3 import software.amazon.awssdk.crt.CRT; 4 import software.amazon.awssdk.crt.CrtResource; 5 import software.amazon.awssdk.crt.CrtRuntimeException; 6 import software.amazon.awssdk.crt.io.ClientBootstrap; 7 import software.amazon.awssdk.crt.io.ClientTlsContext; 8 import software.amazon.awssdk.crt.io.SocketOptions; 9 10 import java.nio.charset.StandardCharsets; 11 import java.util.List; 12 import java.util.concurrent.CompletableFuture; 13 14 /** 15 * Wrapper around an event stream rpc client initiated connection. 16 */ 17 public class ClientConnection extends CrtResource { 18 CompletableFuture<Integer> closeFuture = new CompletableFuture<>(); 19 20 /** 21 * Only for internal usage. This is invoked from JNI to create a new java wrapper for the connection. 22 */ ClientConnection(long clientConnection)23 ClientConnection(long clientConnection) { 24 acquireNativeHandle(clientConnection); 25 acquireClientConnection(clientConnection); 26 } 27 28 /** 29 * Closes the connection if it hasn't been closed already. 30 * @param shutdownErrorCode aws-c-* error code to shutdown with. Specify 0 for success. 31 */ closeConnection(int shutdownErrorCode)32 public void closeConnection(int shutdownErrorCode) { 33 if (isNull()) { 34 throw new IllegalStateException("close() has already been called on this object."); 35 } 36 closeClientConnection(getNativeHandle(), shutdownErrorCode); 37 } 38 39 /** 40 * @return true if the connection is open, false otherwise. 41 */ isOpen()42 public boolean isOpen() { 43 if (isNull()) { 44 return false; 45 } 46 return isClientConnectionOpen(getNativeHandle()); 47 } 48 49 /** 50 * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message 51 * flushing to the underlying transport. 52 * @param headers List of event-stream headers. Can be null. 53 * @param payload Payload to send for the message. Can be null. 54 * @param messageType Message type for the rpc message. 55 * @param messageFlags Union of message flags from MessageFlags.getByteValue() 56 * @return completable future for synchronizing on the message flushing to the underlying transport. 57 */ sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)58 public CompletableFuture<Void> sendProtocolMessage(final List<Header> headers, final byte[] payload, 59 final MessageType messageType, int messageFlags) { 60 if (isNull()) { 61 throw new IllegalStateException("close() has already been called on this object."); 62 } 63 64 CompletableFuture<Void> messageFlush = new CompletableFuture<>(); 65 66 sendProtocolMessage(headers, payload, messageType, messageFlags, errorCode -> { 67 if (errorCode == 0) { 68 messageFlush.complete(null); 69 } else { 70 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); 71 } 72 }); 73 74 return messageFlush; 75 } 76 77 /** 78 * Sends a protocol message on the connection. Callback will be invoked upon the message flushing to the underlying 79 * transport 80 * 81 * @param headers List of event-stream headers. Can be null. 82 * @param payload Payload to send for the message. Can be null. 83 * @param messageType Message type for the rpc message. 84 * @param messageFlags Union of message flags from MessageFlags.getByteValue() 85 * @param callback will be invoked upon the message flushing to the underlying transport 86 */ sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)87 public void sendProtocolMessage(final List<Header> headers, final byte[] payload, 88 final MessageType messageType, int messageFlags, MessageFlushCallback callback) { 89 if (isNull()) { 90 throw new IllegalStateException("close() has already been called on this object."); 91 } 92 93 byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; 94 95 int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); 96 97 if (result != 0) { 98 int errorCode = CRT.awsLastError(); 99 throw new CrtRuntimeException(errorCode); 100 } 101 } 102 103 /** 104 * Create a new stream. Activate() must be called on the stream for it to actually initiate the new stream. 105 * @param continuationHandler handler to process continuation messages and state changes. 106 * @return The new continuation object. 107 */ newStream(final ClientConnectionContinuationHandler continuationHandler)108 public ClientConnectionContinuation newStream(final ClientConnectionContinuationHandler continuationHandler) { 109 if (isNull()) { 110 throw new IllegalStateException("close() has already been called on this object."); 111 } 112 113 long continuationHandle = newClientStream(getNativeHandle(), continuationHandler); 114 115 if (continuationHandle == 0) { 116 int lastError = CRT.awsLastError(); 117 throw new CrtRuntimeException(lastError); 118 } 119 120 ClientConnectionContinuation connectionContinuation = new ClientConnectionContinuation(continuationHandle); 121 continuationHandler.continuation = connectionContinuation; 122 123 return connectionContinuation; 124 } 125 126 /** 127 * Initiates a new outgoing event-stream-rpc connection. The future will be completed once the connection either 128 * succeeds or fails. 129 * @param hostName hostname to connect to, this can be an IPv4 address, IPv6 address, a local socket address, or a 130 * dns name. 131 * @param port port to connect to hostName with. For local socket address, this value is ignored. 132 * For 32bit values exceeding Integer.MAX_VALUE use two's complement (i.e. -1 == 0xFFFFFFFF). 133 * @param socketOptions socketOptions to use. 134 * @param tlsContext (optional) tls context to use for using SSL/TLS in the connection. 135 * @param bootstrap clientBootstrap object to run the connection on. 136 * @param connectionHandler handler to process connection messages and state changes. 137 * @return The future will be completed once the connection either succeeds or fails. 138 */ connect(final String hostName, int port, final SocketOptions socketOptions, final ClientTlsContext tlsContext, final ClientBootstrap bootstrap, final ClientConnectionHandler connectionHandler)139 public static CompletableFuture<Void> connect(final String hostName, int port, final SocketOptions socketOptions, 140 final ClientTlsContext tlsContext, final ClientBootstrap bootstrap, 141 final ClientConnectionHandler connectionHandler) { 142 long tlsContextHandle = tlsContext != null ? tlsContext.getNativeHandle() : 0; 143 144 CompletableFuture<Void> future = new CompletableFuture<>(); 145 ClientConnectionHandler handlerShim = new ClientConnectionHandler() { 146 @Override 147 protected void onConnectionSetup(ClientConnection connection, int errorCode) { 148 connectionHandler.clientConnection = connection; 149 connectionHandler.onConnectionSetup(connection, errorCode); 150 151 if (errorCode == 0) { 152 future.complete(null); 153 } else { 154 future.completeExceptionally(new CrtRuntimeException(errorCode)); 155 } 156 } 157 158 @Override 159 protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) { 160 connectionHandler.onProtocolMessage(headers, payload, messageType, messageFlags); 161 } 162 163 @Override 164 protected void onConnectionClosed(int closeReason) { 165 connectionHandler.clientConnection.closeFuture.complete(closeReason); 166 connectionHandler.onConnectionClosed(closeReason); 167 } 168 }; 169 170 int resultCode = clientConnect(hostName.getBytes(StandardCharsets.UTF_8), 171 port, socketOptions.getNativeHandle(), tlsContextHandle, bootstrap.getNativeHandle(), handlerShim); 172 173 if (resultCode != 0) { 174 int lastError = CRT.awsLastError(); 175 throw new CrtRuntimeException(lastError); 176 } 177 178 return future; 179 } 180 181 /** 182 * @return a future for syncing on Connection closed. 183 */ getClosedFuture()184 public CompletableFuture<Integer> getClosedFuture() { 185 return closeFuture; 186 } 187 188 @Override releaseNativeHandle()189 protected void releaseNativeHandle() { 190 if (!isNull()) { 191 releaseClientConnection(getNativeHandle()); 192 } 193 } 194 195 @Override canReleaseReferencesImmediately()196 protected boolean canReleaseReferencesImmediately() { 197 return true; 198 } 199 clientConnect(byte[] hostName, int port, long socketOptions, long tlsContext, long bootstrap, ClientConnectionHandler connectionHandler)200 private static native int clientConnect(byte[] hostName, int port, long socketOptions, long tlsContext, long bootstrap, ClientConnectionHandler connectionHandler); isClientConnectionOpen(long connection)201 private static native boolean isClientConnectionOpen(long connection); closeClientConnection(long connection, int errorCode)202 private static native void closeClientConnection(long connection, int errorCode); acquireClientConnection(long connection)203 private static native void acquireClientConnection(long connection); releaseClientConnection(long connection)204 private static native void releaseClientConnection(long connection); sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)205 private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); newClientStream(long connectionPtr, ClientConnectionContinuationHandler continuationHandler)206 private static native long newClientStream(long connectionPtr, ClientConnectionContinuationHandler continuationHandler); 207 } 208