1 package software.amazon.awssdk.crt.eventstream; 2 3 import java.nio.ByteBuffer; 4 import java.nio.charset.StandardCharsets; 5 import java.util.ArrayList; 6 import java.util.List; 7 8 /** 9 * Handler for EventStream ServerConnections. It's marked AutoClosable. 10 * By default onConnectionClosed, calls the close() function on this object. 11 */ 12 public abstract class ServerConnectionHandler implements AutoCloseable { 13 protected ServerConnection connection; 14 ServerConnectionHandler(final ServerConnection connection)15 protected ServerConnectionHandler(final ServerConnection connection) { 16 this.connection = connection; 17 // it wasn't really doable to have JNI invoke the function from the ServerConnectionHandler, The ServerListener 18 // completes this future, when it's completed, as a convenience go ahead and invoke our own callback which 19 // by default cleans up the resources. 20 this.connection.getClosedFuture().whenComplete((shutdownReason, ex) -> { 21 onConnectionClosed(shutdownReason); 22 }); 23 } 24 25 /** 26 * Invoked when a message is received on a connection. 27 * @param headers List of EventStream headers for the message received. 28 * @param payload Payload for the message received 29 * @param messageType message type for the message 30 * @param messageFlags message flags for the message 31 */ onProtocolMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)32 protected abstract void onProtocolMessage(final List<Header> headers, 33 final byte[] payload, final MessageType messageType, int messageFlags); 34 35 /** 36 * Invoked from JNI. Marshals the native data into java objects and calls 37 * onProtocolMessage() 38 */ onProtocolMessage(final byte[] headersPayload, final byte[] payload, int messageType, int messageFlags)39 private void onProtocolMessage(final byte[] headersPayload, final byte[] payload, 40 int messageType, int messageFlags) { 41 List<Header> headers = new ArrayList<>(); 42 43 ByteBuffer headersBuffer = ByteBuffer.wrap(headersPayload); 44 while (headersBuffer.hasRemaining()) { 45 Header header = Header.fromByteBuffer(headersBuffer); 46 headers.add(header); 47 } 48 49 onProtocolMessage(headers, payload, MessageType.fromEnumValue(messageType), messageFlags); 50 } 51 52 /** 53 * Invoked upon an incoming stream from a client. 54 * @param continuation continuation object for sending continuation events to the client. 55 * @param operationName name of the operation the client wishes to invoke. 56 * @return a new instance of ServerConnectionContinuationHandler for handling continuation events. 57 */ onIncomingStream(final ServerConnectionContinuation continuation, String operationName)58 protected abstract ServerConnectionContinuationHandler onIncomingStream(final ServerConnectionContinuation continuation, String operationName); 59 onIncomingStream(final ServerConnectionContinuation continuation, byte[] operationName)60 private ServerConnectionContinuationHandler onIncomingStream(final ServerConnectionContinuation continuation, byte[] operationName) { 61 String operationNameStr = new String(operationName, StandardCharsets.UTF_8); 62 63 return onIncomingStream(continuation, operationNameStr); 64 } 65 66 /** 67 * Invoked upon the connection closing. By default, calls close() on this object. 68 * @param shutdownReason reason for the shutdown. 0 means clean shutdown. 69 */ onConnectionClosed(int shutdownReason)70 protected void onConnectionClosed(int shutdownReason) { 71 this.close(); 72 } 73 74 @Override close()75 public void close() { 76 if (connection.isConnectionOpen()) { 77 connection.closeConnection(0); 78 } 79 connection.decRef(); 80 connection = null; 81 } 82 } 83