• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import java.nio.ByteBuffer;
4 import java.nio.charset.StandardCharsets;
5 import java.util.ArrayList;
6 import java.util.List;
7 
8 /**
9  * Handler for EventStream ServerConnections. It's marked AutoClosable.
10  * By default onConnectionClosed, calls the close() function on this object.
11  */
12 public abstract class ServerConnectionHandler implements AutoCloseable {
13     protected ServerConnection connection;
14 
ServerConnectionHandler(final ServerConnection connection)15     protected ServerConnectionHandler(final ServerConnection connection) {
16         this.connection = connection;
17         // it wasn't really doable to have JNI invoke the function from the ServerConnectionHandler, The ServerListener
18         // completes this future, when it's completed, as a convenience go ahead and invoke our own callback which
19         // by default cleans up the resources.
20         this.connection.getClosedFuture().whenComplete((shutdownReason, ex) -> {
21             onConnectionClosed(shutdownReason);
22         });
23     }
24 
25     /**
26      * Invoked when a message is received on a connection.
27      * @param headers List of EventStream headers for the message received.
28      * @param payload Payload for the message received
29      * @param messageType message type for the message
30      * @param messageFlags message flags for the message
31      */
onProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)32     protected abstract void onProtocolMessage(final List<Header> headers,
33                            final byte[] payload, final MessageType messageType, int messageFlags);
34 
35     /**
36      * Invoked from JNI. Marshals the native data into java objects and calls
37      * onProtocolMessage()
38      */
onProtocolMessage(final byte[] headersPayload, final byte[] payload, int messageType, int messageFlags)39     private void onProtocolMessage(final byte[] headersPayload, final byte[] payload,
40                                int messageType, int messageFlags) {
41         List<Header> headers = new ArrayList<>();
42 
43         ByteBuffer headersBuffer = ByteBuffer.wrap(headersPayload);
44         while (headersBuffer.hasRemaining()) {
45             Header header = Header.fromByteBuffer(headersBuffer);
46             headers.add(header);
47         }
48 
49         onProtocolMessage(headers, payload, MessageType.fromEnumValue(messageType), messageFlags);
50     }
51 
52     /**
53      * Invoked upon an incoming stream from a client.
54      * @param continuation continuation object for sending continuation events to the client.
55      * @param operationName name of the operation the client wishes to invoke.
56      * @return a new instance of ServerConnectionContinuationHandler for handling continuation events.
57      */
onIncomingStream(final ServerConnectionContinuation continuation, String operationName)58     protected abstract ServerConnectionContinuationHandler onIncomingStream(final ServerConnectionContinuation continuation, String operationName);
59 
onIncomingStream(final ServerConnectionContinuation continuation, byte[] operationName)60     private ServerConnectionContinuationHandler onIncomingStream(final ServerConnectionContinuation continuation, byte[] operationName) {
61         String operationNameStr = new String(operationName, StandardCharsets.UTF_8);
62 
63         return onIncomingStream(continuation, operationNameStr);
64     }
65 
66     /**
67      * Invoked upon the connection closing. By default, calls close() on this object.
68      * @param shutdownReason reason for the shutdown. 0 means clean shutdown.
69      */
onConnectionClosed(int shutdownReason)70     protected void onConnectionClosed(int shutdownReason) {
71         this.close();
72     }
73 
74     @Override
close()75     public void close() {
76         if (connection.isConnectionOpen()) {
77             connection.closeConnection(0);
78         }
79         connection.decRef();
80         connection = null;
81     }
82 }
83