1 package software.amazon.awssdk.crt.eventstream; 2 3 import software.amazon.awssdk.crt.CRT; 4 import software.amazon.awssdk.crt.CrtResource; 5 import software.amazon.awssdk.crt.CrtRuntimeException; 6 7 import java.util.List; 8 import java.util.concurrent.CompletableFuture; 9 10 /** 11 * Wrapper around event-stream-rpc-server-connection. Note this class is AutoClosable. 12 * By default the ServerConnectionHandler::onClosed callback calls close(). 13 */ 14 public class ServerConnection extends CrtResource { 15 CompletableFuture<Integer> closedFuture = new CompletableFuture<>(); 16 17 /** 18 * Invoked from JNI. 19 */ ServerConnection(long connectionPtr)20 ServerConnection(long connectionPtr) { 21 // tell c-land we're acquiring 22 acquire(connectionPtr); 23 acquireNativeHandle(connectionPtr); 24 } 25 26 /** 27 * @return true if the connection is open. False otherwise. 28 */ isConnectionOpen()29 public boolean isConnectionOpen() { 30 if (isNull()) { 31 return false; 32 } 33 return isOpen(getNativeHandle()); 34 } 35 36 /** 37 * Closes the connection with shutdownError 38 * @param shutdownError error code to shutdown the connection with. If 39 * shutting down cleanly, use 0. 40 */ closeConnection(int shutdownError)41 public void closeConnection(int shutdownError) { 42 if (isNull()) { 43 throw new IllegalStateException("close() has already been called on this object."); 44 } 45 closeConnection(getNativeHandle(), shutdownError); 46 } 47 48 /** 49 * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message 50 * flushing to the underlying transport. 51 * @param headers List of event-stream headers. Can be null. 52 * @param payload Payload to send for the message. Can be null. 53 * @param messageType Message type for the rpc message. 54 * @param messageFlags Union of message flags from MessageFlags.getByteValue() 55 * @return completable future for synchronizing on the message flushing to the underlying transport. 56 */ sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)57 public CompletableFuture<Void> sendProtocolMessage(final List<Header> headers, final byte[] payload, 58 final MessageType messageType, int messageFlags) { 59 if (isNull()) { 60 throw new IllegalStateException("close() has already been called on this object."); 61 } 62 CompletableFuture<Void> messageFlush = new CompletableFuture<>(); 63 64 sendProtocolMessage(headers, payload, messageType, messageFlags, new MessageFlushCallback() { 65 @Override 66 public void onCallbackInvoked(int errorCode) { 67 if (errorCode == 0) { 68 messageFlush.complete(null); 69 } else { 70 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); 71 } 72 } 73 }); 74 75 return messageFlush; 76 } 77 78 /** 79 * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message 80 * flushing to the underlying transport. 81 * @param headers List of event-stream headers. Can be null. 82 * @param payload Payload to send for the message. Can be null. 83 * @param messageType Message type for the rpc message. 84 * @param messageFlags Union of message flags from MessageFlags.getByteValue() 85 * @param callback invoked upon the message flushing to the underlying transport. 86 */ sendProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)87 public void sendProtocolMessage(final List<Header> headers, final byte[] payload, 88 final MessageType messageType, int messageFlags, MessageFlushCallback callback) { 89 if (isNull()) { 90 throw new IllegalStateException("close() has already been called on this object."); 91 } 92 byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; 93 94 int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); 95 96 if (result != 0) { 97 int errorCode = CRT.awsLastError(); 98 throw new CrtRuntimeException(errorCode); 99 } 100 } 101 102 /** 103 * @return a future which completes upon the connection closing 104 */ getClosedFuture()105 public CompletableFuture<Integer> getClosedFuture() { 106 return closedFuture; 107 } 108 109 @Override releaseNativeHandle()110 protected void releaseNativeHandle() { 111 if (!isNull()) { 112 release(getNativeHandle()); 113 } 114 } 115 116 @Override canReleaseReferencesImmediately()117 protected boolean canReleaseReferencesImmediately() { 118 return true; 119 } 120 acquire(long connectionPtr)121 private static native void acquire(long connectionPtr); release(long connectionPtr)122 private static native void release(long connectionPtr); closeConnection(long connectionPtr, int shutdownError)123 private static native void closeConnection(long connectionPtr, int shutdownError); isOpen(long connectionPtr)124 private static native boolean isOpen(long connectionPtr); sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)125 private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); 126 } 127