1 package software.amazon.awssdk.crt.eventstream; 2 3 import java.nio.charset.StandardCharsets; 4 import java.util.ArrayList; 5 import java.util.List; 6 import java.util.Map; 7 import java.util.concurrent.CompletableFuture; 8 import java.util.function.Function; 9 10 /** 11 * ServerConnectionHandler implementation that routes continuations to specific 12 * operation specific message handlers 13 */ 14 public class OperationRoutingServerConnectionHandler extends ServerConnectionHandler { 15 private final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMap; 16 17 /** 18 * binds an operation handler mapping to a server connection 19 * @param serverConnection connection to route messages for 20 * @param operationMapping mapping of operation names to message handlers. 21 */ OperationRoutingServerConnectionHandler(final ServerConnection serverConnection, final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMapping)22 public OperationRoutingServerConnectionHandler(final ServerConnection serverConnection, 23 final Map<String, Function<ServerConnectionContinuation, ServerConnectionContinuationHandler>> operationMapping) { 24 super(serverConnection); 25 this.operationMap = operationMapping; 26 } 27 28 /** 29 * By default, automatically responds to pings when received, and routes connect requests. 30 * 31 * Feel free to override this function if you want different behavior. 32 */ 33 @Override onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags)34 protected void onProtocolMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) { 35 if (messageType == MessageType.Ping) { 36 int responseMessageFlag = 0; 37 MessageType responseMessageType = MessageType.PingResponse; 38 39 connection.sendProtocolMessage(null, null, responseMessageType, responseMessageFlag); 40 } else if (messageType == MessageType.Connect) { 41 onConnectRequest(headers, payload); 42 } else if (messageType != MessageType.PingResponse){ 43 int responseMessageFlag = 0; 44 MessageType responseMessageType = MessageType.ServerError; 45 46 String responsePayload = 47 "{ \"error\": \"Unrecognized Message Type\" }" + 48 "\"message\": \" message type value: " + messageType.getEnumValue() + " is not recognized as a valid request path.\" }"; 49 50 Header contentTypeHeader = Header.createHeader(":content-type", "application/json"); 51 List<Header> responseHeaders = new ArrayList<>(); 52 responseHeaders.add(contentTypeHeader); 53 CompletableFuture<Void> voidCompletableFuture = connection.sendProtocolMessage(responseHeaders, responsePayload.getBytes(StandardCharsets.UTF_8), responseMessageType, responseMessageFlag); 54 voidCompletableFuture.thenAccept(result -> {connection.closeConnection(0); this.close();}); 55 } 56 } 57 58 /** 59 * To customize how the connect request is handled, override this function. 60 * You're responsible for sending the response. By default, it accepts all incoming 61 * connections. 62 * @param headers list of headers received in the message 63 * @param payload payload received in the message 64 */ onConnectRequest(List<Header> headers, byte[] payload)65 protected void onConnectRequest(List<Header> headers, byte[] payload) { 66 int responseMessageFlag = MessageFlags.ConnectionAccepted.getByteValue(); 67 MessageType acceptResponseType = MessageType.ConnectAck; 68 69 connection.sendProtocolMessage(null, null, acceptResponseType, responseMessageFlag); 70 } 71 72 /** 73 * When a new stream continuation is received, it routes to the configured operation name to 74 * handler mapping. If no such mapping exists, an Unsupported Operation message of type ApplicationError 75 * is sent to the peer. 76 * 77 * You can't override this function, because well, if you do, you might as well not use this 78 * class in the first place. 79 * 80 * @param continuation continuation representing the new incoming stream 81 * @param operationName operation name for the new incoming stream 82 */ 83 @Override onIncomingStream(ServerConnectionContinuation continuation, String operationName)84 protected final ServerConnectionContinuationHandler onIncomingStream(ServerConnectionContinuation continuation, String operationName) { 85 Function<ServerConnectionContinuation, ServerConnectionContinuationHandler> registeredOperationHandlerFn = operationMap.get(operationName); 86 87 if (registeredOperationHandlerFn != null) { 88 return registeredOperationHandlerFn.apply(continuation); 89 } else { 90 return new ServerConnectionContinuationHandler(continuation) { 91 @Override 92 protected void onContinuationClosed() { 93 close(); 94 } 95 96 @Override 97 protected void onContinuationMessage(List<Header> headers, byte[] payload, MessageType messageType, int messageFlags) { 98 int responseMessageFlag = MessageFlags.TerminateStream.getByteValue(); 99 MessageType responseMessageType = MessageType.ApplicationError; 100 101 String responsePayload = 102 "{ \"error\": \"Unsupported Operation\", " + 103 "\"message\": \"" + operationName + " is an unsupported operation.\" }"; 104 105 Header contentTypeHeader = Header.createHeader(":content-type", "application/json"); 106 List<Header> responseHeaders = new ArrayList<>(); 107 responseHeaders.add(contentTypeHeader); 108 109 continuation.sendMessage(responseHeaders, responsePayload.getBytes(StandardCharsets.UTF_8), responseMessageType, responseMessageFlag); 110 } 111 }; 112 } 113 } 114 } 115