• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import software.amazon.awssdk.crt.CRT;
4 import software.amazon.awssdk.crt.CrtResource;
5 import software.amazon.awssdk.crt.CrtRuntimeException;
6 import software.amazon.awssdk.crt.io.ClientBootstrap;
7 import software.amazon.awssdk.crt.io.ClientTlsContext;
8 import software.amazon.awssdk.crt.io.SocketOptions;
9 
10 import java.nio.charset.StandardCharsets;
11 import java.util.List;
12 import java.util.concurrent.CompletableFuture;
13 
14 /**
15  * Wrapper around an event stream rpc client initiated connection.
16  */
17 public class ClientConnection extends CrtResource {
18     CompletableFuture<Integer> closeFuture = new CompletableFuture<>();
19 
20     /**
21      * Only for internal usage. This is invoked from JNI to create a new java wrapper for the connection.
22      */
ClientConnection(long clientConnection)23     ClientConnection(long clientConnection) {
24         acquireNativeHandle(clientConnection);
25         acquireClientConnection(clientConnection);
26     }
27 
28     /**
29      * Closes the connection if it hasn't been closed already.
30      * @param shutdownErrorCode aws-c-* error code to shutdown with. Specify 0 for success.
31      */
closeConnection(int shutdownErrorCode)32     public void closeConnection(int shutdownErrorCode) {
33         if (isNull()) {
34             throw new IllegalStateException("close() has already been called on this object.");
35         }
36         closeClientConnection(getNativeHandle(), shutdownErrorCode);
37     }
38 
39     /**
40      * @return true if the connection is open, false otherwise.
41      */
isOpen()42     public boolean isOpen() {
43         if (isNull()) {
44             return false;
45         }
46         return isClientConnectionOpen(getNativeHandle());
47     }
48 
49     /**
50      * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message
51      * flushing to the underlying transport.
52      * @param headers List of event-stream headers. Can be null.
53      * @param payload Payload to send for the message. Can be null.
54      * @param messageType Message type for the rpc message.
55      * @param messageFlags Union of message flags from MessageFlags.getByteValue()
56      * @return completable future for synchronizing on the message flushing to the underlying transport.
57      */
sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)58     public CompletableFuture<Void> sendProtocolMessage(final List<Header> headers, final byte[] payload,
59                                                        final MessageType messageType, int messageFlags) {
60         if (isNull()) {
61             throw new IllegalStateException("close() has already been called on this object.");
62         }
63 
64         CompletableFuture<Void> messageFlush = new CompletableFuture<>();
65 
66         sendProtocolMessage(headers, payload, messageType, messageFlags, errorCode -> {
67             if (errorCode == 0) {
68                 messageFlush.complete(null);
69             } else {
70                 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode));
71             }
72         });
73 
74         return messageFlush;
75     }
76 
77     /**
78      * Sends a protocol message on the connection. Callback will be invoked upon the message flushing to the underlying
79      * transport
80      *
81      * @param headers List of event-stream headers. Can be null.
82      * @param payload Payload to send for the message. Can be null.
83      * @param messageType Message type for the rpc message.
84      * @param messageFlags Union of message flags from MessageFlags.getByteValue()
85      * @param callback will be invoked upon the message flushing to the underlying transport
86      */
sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)87     public void sendProtocolMessage(final List<Header> headers, final byte[] payload,
88                                     final MessageType messageType, int messageFlags, MessageFlushCallback callback) {
89         if (isNull()) {
90             throw new IllegalStateException("close() has already been called on this object.");
91         }
92 
93         byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null;
94 
95         int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback);
96 
97         if (result != 0) {
98             int errorCode = CRT.awsLastError();
99             throw new CrtRuntimeException(errorCode);
100         }
101     }
102 
103     /**
104      * Create a new stream. Activate() must be called on the stream for it to actually initiate the new stream.
105      * @param continuationHandler handler to process continuation messages and state changes.
106      * @return The new continuation object.
107      */
newStream(final ClientConnectionContinuationHandler continuationHandler)108     public ClientConnectionContinuation newStream(final ClientConnectionContinuationHandler continuationHandler) {
109         if (isNull()) {
110             throw new IllegalStateException("close() has already been called on this object.");
111         }
112 
113         long continuationHandle = newClientStream(getNativeHandle(), continuationHandler);
114 
115         if (continuationHandle == 0) {
116             int lastError = CRT.awsLastError();
117             throw new CrtRuntimeException(lastError);
118         }
119 
120         ClientConnectionContinuation connectionContinuation = new ClientConnectionContinuation(continuationHandle);
121         continuationHandler.continuation = connectionContinuation;
122 
123         return connectionContinuation;
124     }
125 
126     /**
127      * Initiates a new outgoing event-stream-rpc connection. The future will be completed once the connection either
128      * succeeds or fails.
129      * @param hostName hostname to connect to, this can be an IPv4 address, IPv6 address, a local socket address, or a
130      *                 dns name.
131      * @param port port to connect to hostName with. For local socket address, this value is ignored.
132      *             For 32bit values exceeding Integer.MAX_VALUE use two's complement (i.e. -1 == 0xFFFFFFFF).
133      * @param socketOptions socketOptions to use.
134      * @param tlsContext (optional) tls context to use for using SSL/TLS in the connection.
135      * @param bootstrap clientBootstrap object to run the connection on.
136      * @param connectionHandler handler to process connection messages and state changes.
137      * @return The future will be completed once the connection either succeeds or fails.
138      */
connect(final String hostName, int port, final SocketOptions socketOptions, final ClientTlsContext tlsContext, final ClientBootstrap bootstrap, final ClientConnectionHandler connectionHandler)139     public static CompletableFuture<Void> connect(final String hostName, int port, final SocketOptions socketOptions,
140                                            final ClientTlsContext tlsContext, final ClientBootstrap bootstrap,
141                                            final ClientConnectionHandler connectionHandler) {
142         long tlsContextHandle = tlsContext != null ? tlsContext.getNativeHandle() : 0;
143 
144         CompletableFuture<Void> future = new CompletableFuture<>();
145         ClientConnectionHandler handlerShim = new ClientConnectionHandler() {
146             @Override
147             protected void onConnectionSetup(ClientConnection connection, int errorCode) {
148                 connectionHandler.clientConnection = connection;
149                 connectionHandler.onConnectionSetup(connection, errorCode);
150 
151                 if (errorCode == 0) {
152                     future.complete(null);
153                 } else {
154                     future.completeExceptionally(new CrtRuntimeException(errorCode));
155                 }
156             }
157 
158             @Override
159             protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
160                 connectionHandler.onProtocolMessage(headers, payload, messageType, messageFlags);
161             }
162 
163             @Override
164             protected void onConnectionClosed(int closeReason) {
165                 connectionHandler.clientConnection.closeFuture.complete(closeReason);
166                 connectionHandler.onConnectionClosed(closeReason);
167             }
168         };
169 
170         int resultCode = clientConnect(hostName.getBytes(StandardCharsets.UTF_8),
171                 port, socketOptions.getNativeHandle(), tlsContextHandle, bootstrap.getNativeHandle(), handlerShim);
172 
173         if (resultCode != 0) {
174             int lastError = CRT.awsLastError();
175             throw new CrtRuntimeException(lastError);
176         }
177 
178         return future;
179     }
180 
181     /**
182      * @return a future for syncing on Connection closed.
183      */
getClosedFuture()184     public CompletableFuture<Integer> getClosedFuture() {
185         return closeFuture;
186     }
187 
188     @Override
releaseNativeHandle()189     protected void releaseNativeHandle() {
190         if (!isNull()) {
191             releaseClientConnection(getNativeHandle());
192         }
193     }
194 
195     @Override
canReleaseReferencesImmediately()196     protected boolean canReleaseReferencesImmediately() {
197         return true;
198     }
199 
clientConnect(byte[] hostName, int port, long socketOptions, long tlsContext, long bootstrap, ClientConnectionHandler connectionHandler)200     private static native int clientConnect(byte[] hostName, int port, long socketOptions, long tlsContext, long bootstrap, ClientConnectionHandler connectionHandler);
isClientConnectionOpen(long connection)201     private static native boolean isClientConnectionOpen(long connection);
closeClientConnection(long connection, int errorCode)202     private static native void closeClientConnection(long connection, int errorCode);
acquireClientConnection(long connection)203     private static native void acquireClientConnection(long connection);
releaseClientConnection(long connection)204     private static native void releaseClientConnection(long connection);
sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)205     private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback);
newClientStream(long connectionPtr, ClientConnectionContinuationHandler continuationHandler)206     private static native long newClientStream(long connectionPtr, ClientConnectionContinuationHandler continuationHandler);
207 }
208