1 package software.amazon.awssdk.crt.eventstream; 2 3 import software.amazon.awssdk.crt.CRT; 4 import software.amazon.awssdk.crt.CrtResource; 5 import software.amazon.awssdk.crt.CrtRuntimeException; 6 7 import java.util.List; 8 import java.util.concurrent.CompletableFuture; 9 10 /** 11 * Wrapper around aws-event-stream-rpc-server continuation. This class is marked AutoClosable. 12 * Note that by default ServerConnectionContinuationHandler will invoke close() in 13 * ServerConnectionContinuationHandler::onContinuationClosed(). 14 */ 15 public class ServerConnectionContinuation extends CrtResource { 16 17 /** 18 * Invoked from JNI 19 */ ServerConnectionContinuation(long continuationPtr)20 ServerConnectionContinuation(long continuationPtr) { 21 // tell c land we're acquiring 22 acquire(continuationPtr); 23 acquireNativeHandle(continuationPtr); 24 } 25 26 /** 27 * @return true if the continuation has been closed. False otherwise. 28 */ isClosed()29 public boolean isClosed() { 30 return isClosed(getNativeHandle()); 31 } 32 33 /** 34 * Sends message on the continuation 35 * @param headers list of additional event stream headers to include on the message. 36 * @param payload payload for the message 37 * @param messageType message type. Must be either ApplicationMessage or ApplicationError 38 * @param messageFlags message flags for the message, use TerminateStream to cause this message 39 * to close the continuation after sending. 40 * @return Future for syncing when the message is flushed to the transport or fails. 41 */ sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)42 public CompletableFuture<Void> sendMessage(final List<Header> headers, final byte[] payload, 43 final MessageType messageType, int messageFlags) { 44 CompletableFuture<Void> messageFlush = new CompletableFuture<>(); 45 46 sendMessage(headers, payload, messageType, messageFlags, errorCode -> { 47 if (errorCode == 0) { 48 messageFlush.complete(null); 49 } else { 50 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); 51 } 52 }); 53 54 return messageFlush; 55 } 56 57 /** 58 * Sends message on the continuation 59 * @param headers list of additional event stream headers to include on the message. 60 * @param payload payload for the message 61 * @param messageType message type. Must be either ApplicationMessage or ApplicationError 62 * @param messageFlags message flags for the message, use TerminateStream to cause this message 63 * to close the continuation after sending. 64 * @param callback completion callback to be invoked when the message is synced to the underlying 65 * transport. 66 */ sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)67 public void sendMessage(final List<Header> headers, final byte[] payload, 68 final MessageType messageType, int messageFlags, MessageFlushCallback callback) { 69 byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers): null; 70 71 int result = sendContinuationMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); 72 73 if (result != 0) { 74 int errorCode = CRT.awsLastError(); 75 throw new CrtRuntimeException(errorCode); 76 } 77 } 78 79 @Override releaseNativeHandle()80 protected void releaseNativeHandle() { 81 if (!isNull()) { 82 release(getNativeHandle()); 83 } 84 } 85 86 @Override canReleaseReferencesImmediately()87 protected boolean canReleaseReferencesImmediately() { 88 return true; 89 } 90 acquire(long continuationPtr)91 private static native void acquire(long continuationPtr); release(long continuationPtr)92 private static native void release(long continuationPtr); isClosed(long continuationPtr)93 private static native boolean isClosed(long continuationPtr); sendContinuationMessage(long continuation, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)94 private static native int sendContinuationMessage(long continuation, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); 95 } 96