1 package software.amazon.awssdk.crt.eventstream; 2 3 import software.amazon.awssdk.crt.CRT; 4 import software.amazon.awssdk.crt.CrtResource; 5 import software.amazon.awssdk.crt.CrtRuntimeException; 6 7 import java.nio.charset.StandardCharsets; 8 import java.util.List; 9 import java.util.concurrent.CompletableFuture; 10 11 /** 12 * Java wrapper for event-stream-rpc client continuation. 13 */ 14 public class ClientConnectionContinuation extends CrtResource { 15 16 /** 17 * Package private invoked from JNI. Do not call directly. 18 */ ClientConnectionContinuation(long ptr)19 ClientConnectionContinuation(long ptr) { 20 acquireNativeHandle(ptr); 21 } 22 23 /** 24 * Initiates a new client stream. Sends new message for the new stream. 25 * @param operationName name for the operation to be invoked by the peer endpoint. 26 * @param headers headers for the event-stream message, may be null or empty. 27 * @param payload payload for the event-stream message, may be null or empty. 28 * @param messageType messageType for the message. Must be ApplicationMessage or ApplicationError 29 * @param messageFlags union of flags for MessageFlags.getByteValue() 30 * @param callback callback to be invoked upon the message being flushed to the underlying transport. 31 */ activate(final String operationName, final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)32 public void activate(final String operationName, 33 final List<Header> headers, final byte[] payload, 34 final MessageType messageType, int messageFlags, 35 MessageFlushCallback callback) { 36 if (isNull()) { 37 throw new IllegalStateException("close() has already been called on this object."); 38 } 39 40 byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; 41 42 int result = activateContinuation(getNativeHandle(), this, operationName.getBytes(StandardCharsets.UTF_8), 43 headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); 44 45 if (result != 0) { 46 int errorCode = CRT.awsLastError(); 47 throw new CrtRuntimeException(errorCode); 48 } 49 } 50 51 /** 52 * Sends the initial message on a continuation, and begins the message flow for a stream. 53 * @param operationName name of the operation to invoke on the server. 54 * @param headers list of additional event stream headers to include on the message. 55 * @param payload payload for the message 56 * @param messageType message type. Must be either ApplicationMessage or ApplicationError 57 * @param messageFlags message flags for the message. 58 * @return Completeable future for syncing with the connection completing or failing. 59 */ activate(final String operationName, final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)60 public CompletableFuture<Void> activate(final String operationName, 61 final List<Header> headers, final byte[] payload, 62 final MessageType messageType, int messageFlags) { 63 64 CompletableFuture<Void> messageFlush = new CompletableFuture<>(); 65 66 activate(operationName, headers, payload, messageType, messageFlags, new MessageFlushCallback() { 67 @Override 68 public void onCallbackInvoked(int errorCode) { 69 if (errorCode == 0) { 70 messageFlush.complete(null); 71 } else { 72 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); 73 } 74 } 75 }); 76 77 return messageFlush; 78 } 79 80 /** 81 * Sends message on the continuation 82 * @param headers list of additional event stream headers to include on the message. 83 * @param payload payload for the message 84 * @param messageType message type. Must be either ApplicationMessage or ApplicationError 85 * @param messageFlags message flags for the message, use TerminateStream to cause this message 86 * to close the continuation after sending. 87 * @param callback completion callback to be invoked when the message is synced to the underlying 88 * transport. 89 */ sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)90 public void sendMessage(final List<Header> headers, final byte[] payload, 91 final MessageType messageType, int messageFlags, 92 MessageFlushCallback callback) { 93 if (isNull()) { 94 throw new IllegalStateException("close() has already been called on this object."); 95 } 96 97 byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; 98 99 int result = sendContinuationMessage(getNativeHandle(), 100 headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); 101 102 if (result != 0) { 103 int errorCode = CRT.awsLastError(); 104 throw new CrtRuntimeException(errorCode); 105 } 106 } 107 108 /** 109 * Sends message on the continuation 110 * @param headers list of additional event stream headers to include on the message. 111 * @param payload payload for the message 112 * @param messageType message type. Must be either ApplicationMessage or ApplicationError 113 * @param messageFlags message flags for the message, use TerminateStream to cause this message 114 * to close the continuation after sending. 115 * @return Future for syncing when the message is flushed to the transport or fails. 116 */ sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)117 public CompletableFuture<Void> sendMessage(final List<Header> headers, final byte[] payload, 118 final MessageType messageType, int messageFlags) { 119 if (isNull()) { 120 throw new IllegalStateException("close() has already been called on this object."); 121 } 122 123 CompletableFuture<Void> messageFlush = new CompletableFuture<>(); 124 125 sendMessage(headers, payload, messageType, messageFlags, new MessageFlushCallback() { 126 @Override 127 public void onCallbackInvoked(int errorCode) { 128 if (errorCode == 0) { 129 messageFlush.complete(null); 130 } else { 131 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); 132 } 133 } 134 }); 135 136 return messageFlush; 137 } 138 139 @Override releaseNativeHandle()140 protected void releaseNativeHandle() { 141 if (!isNull()) { 142 releaseContinuation(getNativeHandle()); 143 } 144 } 145 146 @Override canReleaseReferencesImmediately()147 protected boolean canReleaseReferencesImmediately() { 148 return true; 149 } 150 activateContinuation(long continuationPtr, ClientConnectionContinuation continuation, byte[] operationName, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)151 private static native int activateContinuation(long continuationPtr, ClientConnectionContinuation continuation, byte[] operationName, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); sendContinuationMessage(long continuationPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)152 private static native int sendContinuationMessage(long continuationPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); releaseContinuation(long continuationPtr)153 private static native void releaseContinuation(long continuationPtr); 154 } 155