• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import software.amazon.awssdk.crt.CRT;
4 import software.amazon.awssdk.crt.CrtResource;
5 import software.amazon.awssdk.crt.CrtRuntimeException;
6 
7 import java.util.List;
8 import java.util.concurrent.CompletableFuture;
9 
10 /**
11  * Wrapper around event-stream-rpc-server-connection. Note this class is AutoClosable.
12  * By default the ServerConnectionHandler::onClosed callback calls close().
13  */
14 public class ServerConnection extends CrtResource {
15     CompletableFuture<Integer> closedFuture = new CompletableFuture<>();
16 
17     /**
18      * Invoked from JNI.
19      */
ServerConnection(long connectionPtr)20     ServerConnection(long connectionPtr) {
21         // tell c-land we're acquiring
22         acquire(connectionPtr);
23         acquireNativeHandle(connectionPtr);
24     }
25 
26     /**
27      * @return true if the connection is open. False otherwise.
28      */
isConnectionOpen()29     public boolean isConnectionOpen() {
30         if (isNull()) {
31             return false;
32         }
33         return isOpen(getNativeHandle());
34     }
35 
36     /**
37      * Closes the connection with shutdownError
38      * @param shutdownError error code to shutdown the connection with. If
39      *                      shutting down cleanly, use 0.
40      */
closeConnection(int shutdownError)41     public void closeConnection(int shutdownError) {
42         if (isNull()) {
43             throw new IllegalStateException("close() has already been called on this object.");
44         }
45         closeConnection(getNativeHandle(), shutdownError);
46     }
47 
48     /**
49      * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message
50      * flushing to the underlying transport.
51      * @param headers List of event-stream headers. Can be null.
52      * @param payload Payload to send for the message. Can be null.
53      * @param messageType Message type for the rpc message.
54      * @param messageFlags Union of message flags from MessageFlags.getByteValue()
55      * @return completable future for synchronizing on the message flushing to the underlying transport.
56      */
sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)57     public CompletableFuture<Void> sendProtocolMessage(final List<Header> headers, final byte[] payload,
58                                                        final MessageType messageType, int messageFlags) {
59         if (isNull()) {
60             throw new IllegalStateException("close() has already been called on this object.");
61         }
62         CompletableFuture<Void> messageFlush = new CompletableFuture<>();
63 
64         sendProtocolMessage(headers, payload, messageType, messageFlags, new MessageFlushCallback() {
65             @Override
66             public void onCallbackInvoked(int errorCode) {
67                 if (errorCode == 0) {
68                     messageFlush.complete(null);
69                 } else {
70                     messageFlush.completeExceptionally(new CrtRuntimeException(errorCode));
71                 }
72             }
73         });
74 
75         return messageFlush;
76     }
77 
78     /**
79      * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message
80      * flushing to the underlying transport.
81      * @param headers List of event-stream headers. Can be null.
82      * @param payload Payload to send for the message. Can be null.
83      * @param messageType Message type for the rpc message.
84      * @param messageFlags Union of message flags from MessageFlags.getByteValue()
85      * @param callback invoked upon the message flushing to the underlying transport.
86      */
sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)87     public void sendProtocolMessage(final List<Header> headers, final byte[] payload,
88                                     final MessageType messageType, int messageFlags, MessageFlushCallback callback) {
89         if (isNull()) {
90             throw new IllegalStateException("close() has already been called on this object.");
91         }
92         byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null;
93 
94         int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback);
95 
96         if (result != 0) {
97             int errorCode = CRT.awsLastError();
98             throw new CrtRuntimeException(errorCode);
99         }
100     }
101 
102     /**
103      * @return a future which completes upon the connection closing
104      */
getClosedFuture()105     public CompletableFuture<Integer> getClosedFuture() {
106         return closedFuture;
107     }
108 
109     @Override
releaseNativeHandle()110     protected void releaseNativeHandle() {
111         if (!isNull()) {
112             release(getNativeHandle());
113         }
114     }
115 
116     @Override
canReleaseReferencesImmediately()117     protected boolean canReleaseReferencesImmediately() {
118         return true;
119     }
120 
acquire(long connectionPtr)121     private static native void acquire(long connectionPtr);
release(long connectionPtr)122     private static native void release(long connectionPtr);
closeConnection(long connectionPtr, int shutdownError)123     private static native void closeConnection(long connectionPtr, int shutdownError);
isOpen(long connectionPtr)124     private static native boolean isOpen(long connectionPtr);
sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)125     private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback);
126 }
127