• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import java.nio.ByteBuffer;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.concurrent.CompletableFuture;
7 
8 /**
9  * Handler interface for responding to continuation events. It's auto closable.
10  * By default, onContinuationClosed() releases the underlying resource.
11  */
12 public abstract class ServerConnectionContinuationHandler implements AutoCloseable {
13     protected ServerConnectionContinuation continuation;
14     private CompletableFuture<Void> completableFuture = new CompletableFuture<>();
15 
16     /**
17      * Constructor invoked by your subclass.
18      * @param continuation continuation to back the handler.
19      */
ServerConnectionContinuationHandler(final ServerConnectionContinuation continuation)20     protected ServerConnectionContinuationHandler(final ServerConnectionContinuation continuation) {
21         this.continuation = continuation;
22     }
23 
24     /**
25      * Implement to handle the onContinuationClosed event. By default, releases the underlying
26      * resource by calling close(). If you override this function, be sure to either call close()
27      * yourself or invoke super.onContinuationClosed().
28      */
onContinuationClosed()29     protected void onContinuationClosed() {
30         this.close();
31     }
32 
33     /**
34      * Invoked when a message is received on a continuation.
35      * @param headers List of EventStream headers for the message received.
36      * @param payload Payload for the message received
37      * @param messageType message type for the message
38      * @param messageFlags message flags for the message
39      */
onContinuationMessage(final List<Header> headers, final byte[] payload, final MessageType messageType, int messageFlags)40     protected abstract void onContinuationMessage(final List<Header> headers,
41                                              final byte[] payload, final MessageType messageType, int messageFlags);
42 
onContinuationMessageShim(final byte[] headersPayload, final byte[] payload, int messageType, int messageFlags)43     void onContinuationMessageShim(final byte[] headersPayload, final byte[] payload,
44                                    int messageType, int messageFlags) {
45         List<Header> headers = new ArrayList<>();
46 
47         ByteBuffer headersBuffer = ByteBuffer.wrap(headersPayload);
48         while (headersBuffer.hasRemaining()) {
49             Header header = Header.fromByteBuffer(headersBuffer);
50             headers.add(header);
51         }
52 
53         onContinuationMessage(headers, payload, MessageType.fromEnumValue(messageType), messageFlags);
54     }
55 
onContinuationClosedShim()56     void onContinuationClosedShim() {
57         onContinuationClosed();
58         completableFuture.complete(null);
59     }
60 
61     /**
62      * @return a future that will be completed upon the continuation being closed.
63      */
getContinuationClosedFuture()64     public CompletableFuture<Void> getContinuationClosedFuture() {
65         return completableFuture;
66     }
67 
68     @Override
close()69     public void close() {
70         if (continuation != null) {
71             continuation.decRef();
72             continuation = null;
73         }
74     }
75 }
76