1 package software.amazon.awssdk.crt.eventstream; 2 3 import java.nio.ByteBuffer; 4 import java.util.ArrayList; 5 import java.util.List; 6 import java.util.concurrent.CompletableFuture; 7 8 /** 9 * Handler interface for responding to continuation events. It's auto closable. 10 * By default, onContinuationClosed() releases the underlying resource. 11 */ 12 public abstract class ServerConnectionContinuationHandler implements AutoCloseable { 13 protected ServerConnectionContinuation continuation; 14 private CompletableFuture<Void> completableFuture = new CompletableFuture<>(); 15 16 /** 17 * Constructor invoked by your subclass. 18 * @param continuation continuation to back the handler. 19 */ ServerConnectionContinuationHandler(final ServerConnectionContinuation continuation)20 protected ServerConnectionContinuationHandler(final ServerConnectionContinuation continuation) { 21 this.continuation = continuation; 22 } 23 24 /** 25 * Implement to handle the onContinuationClosed event. By default, releases the underlying 26 * resource by calling close(). If you override this function, be sure to either call close() 27 * yourself or invoke super.onContinuationClosed(). 28 */ onContinuationClosed()29 protected void onContinuationClosed() { 30 this.close(); 31 } 32 33 /** 34 * Invoked when a message is received on a continuation. 35 * @param headers List of EventStream headers for the message received. 36 * @param payload Payload for the message received 37 * @param messageType message type for the message 38 * @param messageFlags message flags for the message 39 */ onContinuationMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)40 protected abstract void onContinuationMessage(final List<Header> headers, 41 final byte[] payload, final MessageType messageType, int messageFlags); 42 onContinuationMessageShim(final byte[] headersPayload, final byte[] payload, int messageType, int messageFlags)43 void onContinuationMessageShim(final byte[] headersPayload, final byte[] payload, 44 int messageType, int messageFlags) { 45 List<Header> headers = new ArrayList<>(); 46 47 ByteBuffer headersBuffer = ByteBuffer.wrap(headersPayload); 48 while (headersBuffer.hasRemaining()) { 49 Header header = Header.fromByteBuffer(headersBuffer); 50 headers.add(header); 51 } 52 53 onContinuationMessage(headers, payload, MessageType.fromEnumValue(messageType), messageFlags); 54 } 55 onContinuationClosedShim()56 void onContinuationClosedShim() { 57 onContinuationClosed(); 58 completableFuture.complete(null); 59 } 60 61 /** 62 * @return a future that will be completed upon the continuation being closed. 63 */ getContinuationClosedFuture()64 public CompletableFuture<Void> getContinuationClosedFuture() { 65 return completableFuture; 66 } 67 68 @Override close()69 public void close() { 70 if (continuation != null) { 71 continuation.decRef(); 72 continuation = null; 73 } 74 } 75 } 76