package software.amazon.awssdk.crt.eventstream; import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.crt.CrtRuntimeException; import java.util.List; import java.util.concurrent.CompletableFuture; /** * Wrapper around event-stream-rpc-server-connection. Note this class is AutoClosable. * By default the ServerConnectionHandler::onClosed callback calls close(). */ public class ServerConnection extends CrtResource { CompletableFuture closedFuture = new CompletableFuture<>(); /** * Invoked from JNI. */ ServerConnection(long connectionPtr) { // tell c-land we're acquiring acquire(connectionPtr); acquireNativeHandle(connectionPtr); } /** * @return true if the connection is open. False otherwise. */ public boolean isConnectionOpen() { if (isNull()) { return false; } return isOpen(getNativeHandle()); } /** * Closes the connection with shutdownError * @param shutdownError error code to shutdown the connection with. If * shutting down cleanly, use 0. */ public void closeConnection(int shutdownError) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } closeConnection(getNativeHandle(), shutdownError); } /** * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message * flushing to the underlying transport. * @param headers List of event-stream headers. Can be null. * @param payload Payload to send for the message. Can be null. * @param messageType Message type for the rpc message. * @param messageFlags Union of message flags from MessageFlags.getByteValue() * @return completable future for synchronizing on the message flushing to the underlying transport. */ public CompletableFuture sendProtocolMessage(final List
headers, final byte[] payload, final MessageType messageType, int messageFlags) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } CompletableFuture messageFlush = new CompletableFuture<>(); sendProtocolMessage(headers, payload, messageType, messageFlags, new MessageFlushCallback() { @Override public void onCallbackInvoked(int errorCode) { if (errorCode == 0) { messageFlush.complete(null); } else { messageFlush.completeExceptionally(new CrtRuntimeException(errorCode)); } } }); return messageFlush; } /** * Sends a protocol message on the connection. Returns a completable future for synchronizing on the message * flushing to the underlying transport. * @param headers List of event-stream headers. Can be null. * @param payload Payload to send for the message. Can be null. * @param messageType Message type for the rpc message. * @param messageFlags Union of message flags from MessageFlags.getByteValue() * @param callback invoked upon the message flushing to the underlying transport. */ public void sendProtocolMessage(final List
headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback) { if (isNull()) { throw new IllegalStateException("close() has already been called on this object."); } byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null; int result = sendProtocolMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback); if (result != 0) { int errorCode = CRT.awsLastError(); throw new CrtRuntimeException(errorCode); } } /** * @return a future which completes upon the connection closing */ public CompletableFuture getClosedFuture() { return closedFuture; } @Override protected void releaseNativeHandle() { if (!isNull()) { release(getNativeHandle()); } } @Override protected boolean canReleaseReferencesImmediately() { return true; } private static native void acquire(long connectionPtr); private static native void release(long connectionPtr); private static native void closeConnection(long connectionPtr, int shutdownError); private static native boolean isOpen(long connectionPtr); private static native int sendProtocolMessage(long connectionPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback); }