• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License").
5  * You may not use this file except in compliance with the License.
6  * A copy of the License is located at
7  *
8  *  http://aws.amazon.com/apache2.0
9  *
10  * or in the "license" file accompanying this file. This file is distributed
11  * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12  * express or implied. See the License for the specific language governing
13  * permissions and limitations under the License.
14  */
15 
16 package software.amazon.awssdk.core.internal.async;
17 
18 import static org.assertj.core.api.Assertions.assertThat;
19 import static org.assertj.core.api.Assertions.assertThatThrownBy;
20 import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.DELETE;
21 import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.LEAVE;
22 
23 import com.google.common.jimfs.Jimfs;
24 import io.reactivex.Flowable;
25 import java.io.IOException;
26 import java.nio.ByteBuffer;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.FileAlreadyExistsException;
29 import java.nio.file.FileSystem;
30 import java.nio.file.Files;
31 import java.nio.file.Path;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collection;
35 import java.util.List;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CompletableFuture;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import java.util.concurrent.Future;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import org.apache.commons.lang3.RandomStringUtils;
45 import org.junit.jupiter.api.AfterEach;
46 import org.junit.jupiter.api.BeforeEach;
47 import org.junit.jupiter.api.Test;
48 import org.junit.jupiter.params.ParameterizedTest;
49 import org.junit.jupiter.params.provider.MethodSource;
50 import org.reactivestreams.Subscription;
51 import software.amazon.awssdk.core.FileTransformerConfiguration;
52 import software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption;
53 import software.amazon.awssdk.core.async.SdkPublisher;
54 
55 /**
56  * Tests for {@link FileAsyncResponseTransformer}.
57  */
58 class FileAsyncResponseTransformerTest {
59     private FileSystem testFs;
60 
61     @BeforeEach
setup()62     public void setup() {
63         testFs = Jimfs.newFileSystem();
64     }
65 
66     @AfterEach
teardown()67     public void teardown() throws IOException {
68         testFs.close();
69     }
70 
71     @Test
errorInStream_completesFuture()72     public void errorInStream_completesFuture() {
73         Path testPath = testFs.getPath("test_file.txt");
74         FileAsyncResponseTransformer xformer = new FileAsyncResponseTransformer(testPath);
75 
76         CompletableFuture prepareFuture = xformer.prepare();
77 
78         xformer.onResponse(new Object());
79         xformer.onStream(subscriber -> {
80             subscriber.onSubscribe(new Subscription() {
81                 @Override
82                 public void request(long l) {
83                 }
84 
85                 @Override
86                 public void cancel() {
87                 }
88             });
89 
90             subscriber.onError(new RuntimeException("Something went wrong"));
91         });
92 
93         assertThat(prepareFuture.isCompletedExceptionally()).isTrue();
94     }
95 
96     @Test
synchronousPublisher_shouldNotHang()97     public void synchronousPublisher_shouldNotHang() throws Exception {
98         List<CompletableFuture> futures = new ArrayList<>();
99 
100         for (int i = 0; i < 10; i++) {
101             Path testPath = testFs.getPath(i + "test_file.txt");
102             FileAsyncResponseTransformer transformer = new FileAsyncResponseTransformer(testPath);
103 
104             CompletableFuture prepareFuture = transformer.prepare();
105 
106             transformer.onResponse(new Object());
107 
108             transformer.onStream(testPublisher(RandomStringUtils.randomAlphanumeric(30000)));
109             futures.add(prepareFuture);
110         }
111 
112         CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
113         future.get(10, TimeUnit.SECONDS);
114         assertThat(future.isCompletedExceptionally()).isFalse();
115     }
116 
117     @Test
noConfiguration_fileAlreadyExists_shouldThrowException()118     void noConfiguration_fileAlreadyExists_shouldThrowException() throws Exception {
119         Path testPath = testFs.getPath("test_file.txt");
120         Files.write(testPath, RandomStringUtils.random(1000).getBytes(StandardCharsets.UTF_8));
121         assertThat(testPath).exists();
122 
123         String content = RandomStringUtils.randomAlphanumeric(30000);
124         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath);
125 
126         CompletableFuture<String> future = transformer.prepare();
127         transformer.onResponse("foobar");
128         assertThatThrownBy(() -> transformer.onStream(testPublisher(content))).hasRootCauseInstanceOf(FileAlreadyExistsException.class);
129     }
130 
131     @Test
createOrReplaceExisting_fileDoesNotExist_shouldCreateNewFile()132     void createOrReplaceExisting_fileDoesNotExist_shouldCreateNewFile() throws Exception {
133         Path testPath = testFs.getPath("test_file.txt");
134         assertThat(testPath).doesNotExist();
135         String newContent = RandomStringUtils.randomAlphanumeric(2000);
136         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
137                                                                                               FileTransformerConfiguration.defaultCreateOrReplaceExisting());
138 
139         stubSuccessfulStreaming(newContent, transformer);
140         assertThat(testPath).hasContent(newContent);
141     }
142 
143     @Test
createOrReplaceExisting_fileAlreadyExists_shouldReplaceExisting()144     void createOrReplaceExisting_fileAlreadyExists_shouldReplaceExisting() throws Exception {
145         Path testPath = testFs.getPath("test_file.txt");
146 
147         int existingBytesLength = 20;
148         String existingContent = RandomStringUtils.randomAlphanumeric(existingBytesLength);
149         byte[] existingContentBytes = existingContent.getBytes(StandardCharsets.UTF_8);
150         Files.write(testPath, existingContentBytes);
151 
152         int newBytesLength = 10;
153         String newContent = RandomStringUtils.randomAlphanumeric(newBytesLength);
154         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
155                                                                                               FileTransformerConfiguration.defaultCreateOrReplaceExisting());
156         stubSuccessfulStreaming(newContent, transformer);
157         assertThat(testPath).hasContent(newContent);
158     }
159 
160     @Test
createOrAppendExisting_fileDoesNotExist_shouldCreateNewFile()161     void createOrAppendExisting_fileDoesNotExist_shouldCreateNewFile() throws Exception {
162         Path testPath = testFs.getPath("test_file.txt");
163         assertThat(testPath).doesNotExist();
164         String newContent = RandomStringUtils.randomAlphanumeric(500);
165         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
166                                                                                               FileTransformerConfiguration.defaultCreateOrAppend());
167         stubSuccessfulStreaming(newContent, transformer);
168         assertThat(testPath).hasContent(newContent);
169     }
170 
171     @Test
createOrAppendExisting_fileExists_shouldAppend()172     void createOrAppendExisting_fileExists_shouldAppend() throws Exception {
173         Path testPath = testFs.getPath("test_file.txt");
174         String existingString = RandomStringUtils.randomAlphanumeric(10);
175         byte[] existingBytes = existingString.getBytes(StandardCharsets.UTF_8);
176         Files.write(testPath, existingBytes);
177         String content = RandomStringUtils.randomAlphanumeric(20);
178         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
179                                                                                               FileTransformerConfiguration.defaultCreateOrAppend());
180         stubSuccessfulStreaming(content, transformer);
181         assertThat(testPath).hasContent(existingString + content);
182     }
183 
184     @ParameterizedTest
185     @MethodSource("configurations")
exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration)186     void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration) throws Exception {
187         Path testPath = testFs.getPath("test_file.txt");
188         FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath,
189                                                                                               configuration);
190         stubException(RandomStringUtils.random(200), transformer);
191         if (configuration.failureBehavior() == LEAVE) {
192             assertThat(testPath).exists();
193         } else {
194             assertThat(testPath).doesNotExist();
195         }
196     }
197 
configurations()198     private static List<FileTransformerConfiguration> configurations() {
199         return Arrays.asList(
200             FileTransformerConfiguration.defaultCreateNew(),
201             FileTransformerConfiguration.defaultCreateOrAppend(),
202             FileTransformerConfiguration.defaultCreateOrReplaceExisting(),
203             FileTransformerConfiguration.builder()
204                                         .fileWriteOption(FileWriteOption.CREATE_NEW)
205                                         .failureBehavior(LEAVE).build(),
206             FileTransformerConfiguration.builder()
207                                         .fileWriteOption(FileWriteOption.CREATE_NEW)
208                                         .failureBehavior(DELETE).build(),
209             FileTransformerConfiguration.builder()
210                                         .fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
211                                         .failureBehavior(DELETE).build(),
212             FileTransformerConfiguration.builder()
213                                         .fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING)
214                                         .failureBehavior(LEAVE).build(),
215             FileTransformerConfiguration.builder()
216                                         .fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
217                                         .failureBehavior(DELETE).build(),
218             FileTransformerConfiguration.builder()
219                                         .fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING)
220                                         .failureBehavior(LEAVE).build());
221     }
222 
223     @Test
explicitExecutor_shouldUseExecutor()224     void explicitExecutor_shouldUseExecutor() throws Exception {
225         Path testPath = testFs.getPath("test_file.txt");
226         assertThat(testPath).doesNotExist();
227         String newContent = RandomStringUtils.randomAlphanumeric(2000);
228 
229         ExecutorService executor = Executors.newSingleThreadExecutor();
230         try {
231             SpyingExecutorService spyingExecutorService = new SpyingExecutorService(executor);
232             FileTransformerConfiguration configuration = FileTransformerConfiguration
233                 .builder()
234                 .fileWriteOption(FileWriteOption.CREATE_NEW)
235                 .failureBehavior(DELETE)
236                 .executorService(spyingExecutorService)
237                 .build();
238             FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration);
239 
240             stubSuccessfulStreaming(newContent, transformer);
241             assertThat(testPath).hasContent(newContent);
242             assertThat(spyingExecutorService.hasReceivedTasks()).isTrue();
243         } finally {
244             executor.shutdown();
245             assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
246         }
247     }
248 
stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer)249     private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
250         CompletableFuture<String> future = transformer.prepare();
251         transformer.onResponse("foobar");
252 
253         transformer.onStream(testPublisher(newContent));
254 
255         future.get(10, TimeUnit.SECONDS);
256         assertThat(future.isCompletedExceptionally()).isFalse();
257     }
258 
stubException(String newContent, FileAsyncResponseTransformer<String> transformer)259     private static void stubException(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
260         CompletableFuture<String> future = transformer.prepare();
261         transformer.onResponse("foobar");
262 
263         RuntimeException runtimeException = new RuntimeException("oops");
264         ByteBuffer content = ByteBuffer.wrap(newContent.getBytes(StandardCharsets.UTF_8));
265         transformer.onStream(SdkPublisher.adapt(Flowable.just(content, content)));
266         transformer.exceptionOccurred(runtimeException);
267 
268         assertThatThrownBy(() -> future.get(10, TimeUnit.SECONDS))
269             .hasRootCause(runtimeException);
270         assertThat(future.isCompletedExceptionally()).isTrue();
271     }
272 
testPublisher(String content)273     private static SdkPublisher<ByteBuffer> testPublisher(String content) {
274         return SdkPublisher.adapt(Flowable.just(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
275     }
276 
277     private static final class SpyingExecutorService implements ExecutorService {
278         private final ExecutorService executorService;
279         private boolean receivedTasks = false;
280 
SpyingExecutorService(ExecutorService executorService)281         private SpyingExecutorService(ExecutorService executorService) {
282             this.executorService = executorService;
283         }
284 
hasReceivedTasks()285         public boolean hasReceivedTasks() {
286             return receivedTasks;
287         }
288 
289         @Override
shutdown()290         public void shutdown() {
291             executorService.shutdown();
292         }
293 
294         @Override
shutdownNow()295         public List<Runnable> shutdownNow() {
296             return executorService.shutdownNow();
297         }
298 
299         @Override
isShutdown()300         public boolean isShutdown() {
301             return executorService.isShutdown();
302         }
303 
304         @Override
isTerminated()305         public boolean isTerminated() {
306             return executorService.isTerminated();
307         }
308 
309         @Override
awaitTermination(long timeout, TimeUnit unit)310         public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
311             return executorService.awaitTermination(timeout, unit);
312         }
313 
314         @Override
submit(Callable<T> task)315         public <T> Future<T> submit(Callable<T> task) {
316             receivedTasks = true;
317             return executorService.submit(task);
318         }
319 
320         @Override
submit(Runnable task, T result)321         public <T> Future<T> submit(Runnable task, T result) {
322             receivedTasks = true;
323             return executorService.submit(task, result);
324         }
325 
326         @Override
submit(Runnable task)327         public Future<?> submit(Runnable task) {
328             receivedTasks = true;
329             return executorService.submit(task);
330         }
331 
332         @Override
invokeAll(Collection<? extends Callable<T>> tasks)333         public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
334             receivedTasks = true;
335             return executorService.invokeAll(tasks);
336         }
337 
338         @Override
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)339         public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
340             receivedTasks = true;
341             return executorService.invokeAll(tasks, timeout, unit);
342         }
343 
344         @Override
invokeAny(Collection<? extends Callable<T>> tasks)345         public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
346             receivedTasks = true;
347             return executorService.invokeAny(tasks);
348         }
349 
350         @Override
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)351         public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
352             receivedTasks = true;
353             return executorService.invokeAny(tasks, timeout, unit);
354         }
355 
356         @Override
execute(Runnable command)357         public void execute(Runnable command) {
358             receivedTasks = true;
359             executorService.execute(command);
360         }
361     }
362 }