1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.services.eventstreams; 17 18 import static org.assertj.core.api.Assertions.assertThat; 19 import static org.mockito.ArgumentMatchers.any; 20 import static org.mockito.Mockito.when; 21 22 import io.reactivex.Flowable; 23 import java.nio.ByteBuffer; 24 import java.util.ArrayList; 25 import java.util.List; 26 import java.util.concurrent.CompletableFuture; 27 import java.util.stream.Collectors; 28 import java.util.stream.Stream; 29 import org.junit.Before; 30 import org.junit.Test; 31 import org.junit.runner.RunWith; 32 import org.mockito.Mock; 33 import org.mockito.invocation.InvocationOnMock; 34 import org.mockito.junit.MockitoJUnitRunner; 35 import org.reactivestreams.Subscriber; 36 import org.reactivestreams.Subscription; 37 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; 38 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; 39 import software.amazon.awssdk.http.SdkHttpResponse; 40 import software.amazon.awssdk.http.async.AsyncExecuteRequest; 41 import software.amazon.awssdk.http.async.SdkAsyncHttpClient; 42 import software.amazon.awssdk.http.async.SdkHttpContentPublisher; 43 import software.amazon.awssdk.regions.Region; 44 import software.amazon.awssdk.services.eventstreamrestjson.EventStreamRestJsonAsyncClient; 45 import software.amazon.awssdk.services.eventstreamrestjson.model.EventStream; 46 import software.amazon.awssdk.services.eventstreamrestjson.model.EventStreamOperationRequest; 47 import software.amazon.awssdk.services.eventstreamrestjson.model.EventStreamOperationResponseHandler; 48 import software.amazon.awssdk.services.eventstreamrestjson.model.InputEventStream; 49 import software.amazon.eventstream.Message; 50 import software.amazon.eventstream.MessageDecoder; 51 52 @RunWith(MockitoJUnitRunner.class) 53 public class EventMarshallingTest { 54 @Mock 55 public SdkAsyncHttpClient mockHttpClient; 56 57 private EventStreamRestJsonAsyncClient client; 58 59 private List<Message> marshalledEvents; 60 61 private MessageDecoder chunkDecoder; 62 private MessageDecoder eventDecoder; 63 64 @Before setup()65 public void setup() { 66 when(mockHttpClient.execute(any(AsyncExecuteRequest.class))).thenAnswer(this::mockExecute); 67 client = EventStreamRestJsonAsyncClient.builder() 68 .region(Region.US_WEST_2) 69 .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("akid", "skid"))) 70 .httpClient(mockHttpClient) 71 .build(); 72 73 marshalledEvents = new ArrayList<>(); 74 75 chunkDecoder = new MessageDecoder(); 76 eventDecoder = new MessageDecoder(); 77 } 78 79 @Test testMarshalling_setsCorrectEventType()80 public void testMarshalling_setsCorrectEventType() { 81 List<InputEventStream> inputEvents = Stream.of( 82 InputEventStream.inputEventBuilder().build(), 83 InputEventStream.inputEventBBuilder().build(), 84 InputEventStream.inputEventTwoBuilder().build() 85 ).collect(Collectors.toList()); 86 87 Flowable<InputEventStream> inputStream = Flowable.fromIterable(inputEvents); 88 89 client.eventStreamOperation(EventStreamOperationRequest.builder().build(), inputStream, EventStreamOperationResponseHandler.builder() 90 .subscriber(() -> new Subscriber<EventStream>() { 91 @Override 92 public void onSubscribe(Subscription subscription) { 93 94 } 95 96 @Override 97 public void onNext(EventStream eventStream) { 98 99 } 100 101 @Override 102 public void onError(Throwable throwable) { 103 104 } 105 106 @Override 107 public void onComplete() { 108 109 } 110 }) 111 .build()).join(); 112 113 List<String> expectedTypes = Stream.of( 114 "InputEvent", 115 "InputEventB", 116 "InputEventTwo" 117 ).collect(Collectors.toList());; 118 119 assertThat(marshalledEvents).hasSize(inputEvents.size()); 120 121 for (int i = 0; i < marshalledEvents.size(); ++i) { 122 Message marshalledEvent = marshalledEvents.get(i); 123 String expectedType = expectedTypes.get(i); 124 assertThat(marshalledEvent.getHeaders().get(":event-type").getString()) 125 .isEqualTo(expectedType); 126 } 127 } 128 mockExecute(InvocationOnMock invocation)129 private CompletableFuture<Void> mockExecute(InvocationOnMock invocation) { 130 AsyncExecuteRequest request = invocation.getArgument(0, AsyncExecuteRequest.class); 131 SdkHttpContentPublisher content = request.requestContentPublisher(); 132 List<ByteBuffer> chunks = Flowable.fromPublisher(content).toList().blockingGet(); 133 134 for (ByteBuffer c : chunks) { 135 chunkDecoder.feed(c); 136 } 137 138 for (Message m : chunkDecoder.getDecodedMessages()) { 139 eventDecoder.feed(m.getPayload()); 140 } 141 142 marshalledEvents.addAll(eventDecoder.getDecodedMessages()); 143 144 request.responseHandler().onHeaders(SdkHttpResponse.builder().statusCode(200).build()); 145 request.responseHandler().onStream(Flowable.empty()); 146 147 return CompletableFuture.completedFuture(null); 148 } 149 } 150