• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import software.amazon.awssdk.crt.CRT;
4 import software.amazon.awssdk.crt.CrtResource;
5 import software.amazon.awssdk.crt.CrtRuntimeException;
6 
7 import java.nio.charset.StandardCharsets;
8 import java.util.List;
9 import java.util.concurrent.CompletableFuture;
10 
11 /**
12  * Java wrapper for event-stream-rpc client continuation.
13  */
14 public class ClientConnectionContinuation extends CrtResource {
15 
16     /**
17      * Package private invoked from JNI. Do not call directly.
18      */
ClientConnectionContinuation(long ptr)19     ClientConnectionContinuation(long ptr) {
20         acquireNativeHandle(ptr);
21     }
22 
23     /**
24      * Initiates a new client stream. Sends new message for the new stream.
25      * @param operationName name for the operation to be invoked by the peer endpoint.
26      * @param headers headers for the event-stream message, may be null or empty.
27      * @param payload payload for the event-stream message, may be null or empty.
28      * @param messageType messageType for the message. Must be ApplicationMessage or ApplicationError
29      * @param messageFlags union of flags for MessageFlags.getByteValue()
30      * @param callback callback to be invoked upon the message being flushed to the underlying transport.
31      */
activate(final String operationName, final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)32     public void activate(final String operationName,
33                          final List<Header> headers, final byte[] payload,
34                          final MessageType messageType, int messageFlags,
35                          MessageFlushCallback callback) {
36         if (isNull()) {
37             throw new IllegalStateException("close() has already been called on this object.");
38         }
39 
40         byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null;
41 
42         int result = activateContinuation(getNativeHandle(), this, operationName.getBytes(StandardCharsets.UTF_8),
43                 headersBuf, payload, messageType.getEnumValue(), messageFlags, callback);
44 
45         if (result != 0) {
46             int errorCode = CRT.awsLastError();
47             throw new CrtRuntimeException(errorCode);
48         }
49     }
50 
51     /**
52      * Sends the initial message on a continuation, and begins the message flow for a stream.
53      * @param operationName name of the operation to invoke on the server.
54      * @param headers list of additional event stream headers to include on the message.
55      * @param payload payload for the message
56      * @param messageType message type. Must be either ApplicationMessage or ApplicationError
57      * @param messageFlags message flags for the message.
58      * @return Completeable future for syncing with the connection completing or failing.
59      */
activate(final String operationName, final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)60     public CompletableFuture<Void> activate(final String operationName,
61                                       final List<Header> headers, final byte[] payload,
62                                       final MessageType messageType, int messageFlags) {
63 
64         CompletableFuture<Void> messageFlush = new CompletableFuture<>();
65 
66         activate(operationName, headers, payload, messageType, messageFlags, new MessageFlushCallback() {
67             @Override
68             public void onCallbackInvoked(int errorCode) {
69                 if (errorCode == 0) {
70                     messageFlush.complete(null);
71                 } else {
72                     messageFlush.completeExceptionally(new CrtRuntimeException(errorCode));
73                 }
74             }
75         });
76 
77         return messageFlush;
78     }
79 
80     /**
81      * Sends message on the continuation
82      * @param headers list of additional event stream headers to include on the message.
83      * @param payload payload for the message
84      * @param messageType message type. Must be either ApplicationMessage or ApplicationError
85      * @param messageFlags message flags for the message, use TerminateStream to cause this message
86      *                     to close the continuation after sending.
87      * @param callback completion callback to be invoked when the message is synced to the underlying
88      *                 transport.
89      */
sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags, MessageFlushCallback callback)90     public void sendMessage(final List<Header> headers, final byte[] payload,
91                          final MessageType messageType, int messageFlags,
92                          MessageFlushCallback callback) {
93         if (isNull()) {
94             throw new IllegalStateException("close() has already been called on this object.");
95         }
96 
97         byte[] headersBuf = headers != null ? Header.marshallHeadersForJNI(headers) : null;
98 
99         int result = sendContinuationMessage(getNativeHandle(),
100                 headersBuf, payload, messageType.getEnumValue(), messageFlags, callback);
101 
102         if (result != 0) {
103             int errorCode = CRT.awsLastError();
104             throw new CrtRuntimeException(errorCode);
105         }
106     }
107 
108     /**
109      * Sends message on the continuation
110      * @param headers list of additional event stream headers to include on the message.
111      * @param payload payload for the message
112      * @param messageType message type. Must be either ApplicationMessage or ApplicationError
113      * @param messageFlags message flags for the message, use TerminateStream to cause this message
114      *                     to close the continuation after sending.
115      * @return Future for syncing when the message is flushed to the transport or fails.
116      */
sendMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)117     public CompletableFuture<Void> sendMessage(final List<Header> headers, final byte[] payload,
118                                             final MessageType messageType, int messageFlags) {
119         if (isNull()) {
120             throw new IllegalStateException("close() has already been called on this object.");
121         }
122 
123         CompletableFuture<Void> messageFlush = new CompletableFuture<>();
124 
125         sendMessage(headers, payload, messageType, messageFlags, new MessageFlushCallback() {
126             @Override
127             public void onCallbackInvoked(int errorCode) {
128                 if (errorCode == 0) {
129                     messageFlush.complete(null);
130                 } else {
131                     messageFlush.completeExceptionally(new CrtRuntimeException(errorCode));
132                 }
133             }
134         });
135 
136         return messageFlush;
137     }
138 
139     @Override
releaseNativeHandle()140     protected void releaseNativeHandle() {
141         if (!isNull()) {
142             releaseContinuation(getNativeHandle());
143         }
144     }
145 
146     @Override
canReleaseReferencesImmediately()147     protected boolean canReleaseReferencesImmediately() {
148         return true;
149     }
150 
activateContinuation(long continuationPtr, ClientConnectionContinuation continuation, byte[] operationName, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)151     private static native int activateContinuation(long continuationPtr, ClientConnectionContinuation continuation, byte[] operationName, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback);
sendContinuationMessage(long continuationPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback)152     private static native int sendContinuationMessage(long continuationPtr, byte[] serialized_headers, byte[] payload, int message_type, int message_flags, MessageFlushCallback callback);
releaseContinuation(long continuationPtr)153     private static native void releaseContinuation(long continuationPtr);
154 }
155