• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import software.amazon.awssdk.crt.CRT;
4 import software.amazon.awssdk.crt.CrtResource;
5 import software.amazon.awssdk.crt.CrtRuntimeException;
6 
7 import java.util.List;
8 import java.util.concurrent.CompletableFuture;
9 
10 /**
11  * Wrapper around aws-event-stream-rpc-server continuation. This class is marked AutoClosable.
12  * Note that by default ServerConnectionContinuationHandler will invoke close() in
13  * ServerConnectionContinuationHandler::onContinuationClosed().
14  */
15 public class ServerConnectionContinuation extends CrtResource {
16 
17     /**
18      * Invoked from JNI
19      */
ServerConnectionContinuation(long continuationPtr)20     ServerConnectionContinuation(long continuationPtr) {
21         // tell c land we're acquiring
22         acquire(continuationPtr);
23         acquireNativeHandle(continuationPtr);
24     }
25 
26     /**
27      * @return true if the continuation has been closed. False otherwise.
28      */
isClosed()29     public boolean isClosed() {
30         return isClosed(getNativeHandle());
31     }
32 
33     /**
34      * Sends message on the continuation
35      * @param headers list of additional event stream headers to include on the message.
36      * @param payload payload for the message
37      * @param messageType message type. Must be either ApplicationMessage or ApplicationError
38      * @param messageFlags message flags for the message, use TerminateStream to cause this message
39      *                     to close the continuation after sending.
40      * @return Future for syncing when the message is flushed to the transport or fails.
41      */
sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)42     public CompletableFuture<Void> sendMessage(final List<Header> headers, final byte[] payload,
43                                                        final MessageType messageType, int messageFlags) {
44         CompletableFuture<Void> messageFlush = new CompletableFuture<>();
45 
46         sendMessage(headers, payload, messageType, messageFlags, errorCode -> {
47             if (errorCode == 0) {
48                 messageFlush.complete(null);
49             } else {
50                 messageFlush.completeExceptionally(new CrtRuntimeException(errorCode));
51             }
52         });
53 
54         return messageFlush;
55     }
56 
57     /**
58      * Sends message on the continuation
59      * @param headers list of additional event stream headers to include on the message.
60      * @param payload payload for the message
61      * @param messageType message type. Must be either ApplicationMessage or ApplicationError
62      * @param messageFlags message flags for the message, use TerminateStream to cause this message
63      *                     to close the continuation after sending.
64      * @param callback completion callback to be invoked when the message is synced to the underlying
65      *                 transport.
66      */
sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)67     public void sendMessage(final List<Header> headers, final byte[] payload,
68                                     final MessageType messageType, int messageFlags, MessageFlushCallback callback) {
69         byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers): null;
70 
71         int result = sendContinuationMessage(getNativeHandle(), headersBuf, payload, messageType.getEnumValue(), messageFlags, callback);
72 
73         if (result != 0) {
74             int errorCode = CRT.awsLastError();
75             throw new CrtRuntimeException(errorCode);
76         }
77     }
78 
79     @Override
releaseNativeHandle()80     protected void releaseNativeHandle() {
81         if (!isNull()) {
82             release(getNativeHandle());
83         }
84     }
85 
86     @Override
canReleaseReferencesImmediately()87     protected boolean canReleaseReferencesImmediately() {
88         return true;
89     }
90 
acquire(long continuationPtr)91     private static native void acquire(long continuationPtr);
release(long continuationPtr)92     private static native void release(long continuationPtr);
isClosed(long continuationPtr)93     private static native boolean isClosed(long continuationPtr);
sendContinuationMessage(long continuation, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)94     private static native int sendContinuationMessage(long continuation, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback);
95 }
96