• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package software.amazon.awssdk.crt.eventstream;
2 
3 import java.nio.charset.StandardCharsets;
4 import java.util.ArrayList;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.concurrent.CompletableFuture;
8 import java.util.function.Function;
9 
10 /**
11  * ServerConnectionHandler implementation that routes continuations to specific
12  * operation specific message handlers
13  */
14 public class OperationRoutingServerConnectionHandler extends ServerConnectionHandler {
15     private final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMap;
16 
17     /**
18      * binds an operation handler mapping to a server connection
19      * @param serverConnection connection to route messages for
20      * @param operationMapping mapping of operation names to message handlers.
21      */
OperationRoutingServerConnectionHandler(final ServerConnection serverConnection, final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMapping)22     public OperationRoutingServerConnectionHandler(final ServerConnection serverConnection,
23                                                    final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMapping) {
24         super(serverConnection);
25         this.operationMap = operationMapping;
26     }
27 
28     /**
29      * By default, automatically responds to pings when received, and routes connect requests.
30      *
31      * Feel free to override this function if you want different behavior.
32      */
33     @Override
onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags)34     protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
35         if (messageType == MessageType.Ping) {
36             int responseMessageFlag = 0;
37             MessageType responseMessageType = MessageType.PingResponse;
38 
39             connection.sendProtocolMessage(null, null, responseMessageType, responseMessageFlag);
40         } else if (messageType == MessageType.Connect) {
41             onConnectRequest(headers, payload);
42         } else if (messageType != MessageType.PingResponse){
43             int responseMessageFlag = 0;
44             MessageType responseMessageType = MessageType.ServerError;
45 
46             String responsePayload =
47                     "{ \"error\": \"Unrecognized Message Type\" }" +
48                     "\"message\": \" message type value: " + messageType.getEnumValue() + " is not recognized as a valid request path.\" }";
49 
50             Header contentTypeHeader = Header.createHeader(":content-type", "application/json");
51             List<Header> responseHeaders = new ArrayList<>();
52             responseHeaders.add(contentTypeHeader);
53             CompletableFuture<Void> voidCompletableFuture = connection.sendProtocolMessage(responseHeaders, responsePayload.getBytes(StandardCharsets.UTF_8), responseMessageType, responseMessageFlag);
54             voidCompletableFuture.thenAccept(result -> {connection.closeConnection(0); this.close();});
55         }
56     }
57 
58     /**
59      * To customize how the connect request is handled, override this function.
60      * You're responsible for sending the response. By default, it accepts all incoming
61      * connections.
62      * @param headers list of headers received in the message
63      * @param payload payload received in the message
64      */
onConnectRequest(List<Header> headers, byte[] payload)65     protected void onConnectRequest(List<Header> headers, byte[] payload) {
66         int responseMessageFlag = MessageFlags.ConnectionAccepted.getByteValue();
67         MessageType acceptResponseType = MessageType.ConnectAck;
68 
69         connection.sendProtocolMessage(null, null, acceptResponseType, responseMessageFlag);
70     }
71 
72     /**
73      * When a new stream continuation is received, it routes to the configured operation name to
74      * handler mapping. If no such mapping exists, an Unsupported Operation message of type ApplicationError
75      * is sent to the peer.
76      *
77      * You can't override this function, because well, if you do, you might as well not use this
78      * class in the first place.
79      *
80      * @param continuation continuation representing the new incoming stream
81      * @param operationName operation name for the new incoming stream
82      */
83     @Override
onIncomingStream(ServerConnectionContinuation continuation, String operationName)84     protected final ServerConnectionContinuationHandler onIncomingStream(ServerConnectionContinuation continuation, String operationName) {
85         Function<ServerConnectionContinuation, ServerConnectionContinuationHandler> registeredOperationHandlerFn = operationMap.get(operationName);
86 
87         if (registeredOperationHandlerFn != null) {
88             return registeredOperationHandlerFn.apply(continuation);
89         } else {
90             return new ServerConnectionContinuationHandler(continuation) {
91                 @Override
92                 protected void onContinuationClosed() {
93                     close();
94                 }
95 
96                 @Override
97                 protected void onContinuationMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) {
98                     int responseMessageFlag = MessageFlags.TerminateStream.getByteValue();
99                     MessageType responseMessageType = MessageType.ApplicationError;
100 
101                     String responsePayload =
102                             "{ \"error\": \"Unsupported Operation\", " +
103                               "\"message\": \"" + operationName + " is an unsupported operation.\" }";
104 
105                     Header contentTypeHeader = Header.createHeader(":content-type", "application/json");
106                     List<Header> responseHeaders = new ArrayList<>();
107                     responseHeaders.add(contentTypeHeader);
108 
109                     continuation.sendMessage(responseHeaders, responsePayload.getBytes(StandardCharsets.UTF_8), responseMessageType, responseMessageFlag);
110                 }
111             };
112         }
113     }
114 }
115