1 /* 2 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"). 5 * You may not use this file except in compliance with the License. 6 * A copy of the License is located at 7 * 8 * http://aws.amazon.com/apache2.0 9 * 10 * or in the "license" file accompanying this file. This file is distributed 11 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 12 * express or implied. See the License for the specific language governing 13 * permissions and limitations under the License. 14 */ 15 16 package software.amazon.awssdk.core.internal.async; 17 18 import static org.assertj.core.api.Assertions.assertThat; 19 import static org.assertj.core.api.Assertions.assertThatThrownBy; 20 import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.DELETE; 21 import static software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior.LEAVE; 22 23 import com.google.common.jimfs.Jimfs; 24 import io.reactivex.Flowable; 25 import java.io.IOException; 26 import java.nio.ByteBuffer; 27 import java.nio.charset.StandardCharsets; 28 import java.nio.file.FileAlreadyExistsException; 29 import java.nio.file.FileSystem; 30 import java.nio.file.Files; 31 import java.nio.file.Path; 32 import java.util.ArrayList; 33 import java.util.Arrays; 34 import java.util.Collection; 35 import java.util.List; 36 import java.util.concurrent.Callable; 37 import java.util.concurrent.CompletableFuture; 38 import java.util.concurrent.ExecutionException; 39 import java.util.concurrent.ExecutorService; 40 import java.util.concurrent.Executors; 41 import java.util.concurrent.Future; 42 import java.util.concurrent.TimeUnit; 43 import java.util.concurrent.TimeoutException; 44 import org.apache.commons.lang3.RandomStringUtils; 45 import org.junit.jupiter.api.AfterEach; 46 import org.junit.jupiter.api.BeforeEach; 47 import org.junit.jupiter.api.Test; 48 import org.junit.jupiter.params.ParameterizedTest; 49 import org.junit.jupiter.params.provider.MethodSource; 50 import org.reactivestreams.Subscription; 51 import software.amazon.awssdk.core.FileTransformerConfiguration; 52 import software.amazon.awssdk.core.FileTransformerConfiguration.FileWriteOption; 53 import software.amazon.awssdk.core.async.SdkPublisher; 54 55 /** 56 * Tests for {@link FileAsyncResponseTransformer}. 57 */ 58 class FileAsyncResponseTransformerTest { 59 private FileSystem testFs; 60 61 @BeforeEach setup()62 public void setup() { 63 testFs = Jimfs.newFileSystem(); 64 } 65 66 @AfterEach teardown()67 public void teardown() throws IOException { 68 testFs.close(); 69 } 70 71 @Test errorInStream_completesFuture()72 public void errorInStream_completesFuture() { 73 Path testPath = testFs.getPath("test_file.txt"); 74 FileAsyncResponseTransformer xformer = new FileAsyncResponseTransformer(testPath); 75 76 CompletableFuture prepareFuture = xformer.prepare(); 77 78 xformer.onResponse(new Object()); 79 xformer.onStream(subscriber -> { 80 subscriber.onSubscribe(new Subscription() { 81 @Override 82 public void request(long l) { 83 } 84 85 @Override 86 public void cancel() { 87 } 88 }); 89 90 subscriber.onError(new RuntimeException("Something went wrong")); 91 }); 92 93 assertThat(prepareFuture.isCompletedExceptionally()).isTrue(); 94 } 95 96 @Test synchronousPublisher_shouldNotHang()97 public void synchronousPublisher_shouldNotHang() throws Exception { 98 List<CompletableFuture> futures = new ArrayList<>(); 99 100 for (int i = 0; i < 10; i++) { 101 Path testPath = testFs.getPath(i + "test_file.txt"); 102 FileAsyncResponseTransformer transformer = new FileAsyncResponseTransformer(testPath); 103 104 CompletableFuture prepareFuture = transformer.prepare(); 105 106 transformer.onResponse(new Object()); 107 108 transformer.onStream(testPublisher(RandomStringUtils.randomAlphanumeric(30000))); 109 futures.add(prepareFuture); 110 } 111 112 CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 113 future.get(10, TimeUnit.SECONDS); 114 assertThat(future.isCompletedExceptionally()).isFalse(); 115 } 116 117 @Test noConfiguration_fileAlreadyExists_shouldThrowException()118 void noConfiguration_fileAlreadyExists_shouldThrowException() throws Exception { 119 Path testPath = testFs.getPath("test_file.txt"); 120 Files.write(testPath, RandomStringUtils.random(1000).getBytes(StandardCharsets.UTF_8)); 121 assertThat(testPath).exists(); 122 123 String content = RandomStringUtils.randomAlphanumeric(30000); 124 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath); 125 126 CompletableFuture<String> future = transformer.prepare(); 127 transformer.onResponse("foobar"); 128 assertThatThrownBy(() -> transformer.onStream(testPublisher(content))).hasRootCauseInstanceOf(FileAlreadyExistsException.class); 129 } 130 131 @Test createOrReplaceExisting_fileDoesNotExist_shouldCreateNewFile()132 void createOrReplaceExisting_fileDoesNotExist_shouldCreateNewFile() throws Exception { 133 Path testPath = testFs.getPath("test_file.txt"); 134 assertThat(testPath).doesNotExist(); 135 String newContent = RandomStringUtils.randomAlphanumeric(2000); 136 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, 137 FileTransformerConfiguration.defaultCreateOrReplaceExisting()); 138 139 stubSuccessfulStreaming(newContent, transformer); 140 assertThat(testPath).hasContent(newContent); 141 } 142 143 @Test createOrReplaceExisting_fileAlreadyExists_shouldReplaceExisting()144 void createOrReplaceExisting_fileAlreadyExists_shouldReplaceExisting() throws Exception { 145 Path testPath = testFs.getPath("test_file.txt"); 146 147 int existingBytesLength = 20; 148 String existingContent = RandomStringUtils.randomAlphanumeric(existingBytesLength); 149 byte[] existingContentBytes = existingContent.getBytes(StandardCharsets.UTF_8); 150 Files.write(testPath, existingContentBytes); 151 152 int newBytesLength = 10; 153 String newContent = RandomStringUtils.randomAlphanumeric(newBytesLength); 154 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, 155 FileTransformerConfiguration.defaultCreateOrReplaceExisting()); 156 stubSuccessfulStreaming(newContent, transformer); 157 assertThat(testPath).hasContent(newContent); 158 } 159 160 @Test createOrAppendExisting_fileDoesNotExist_shouldCreateNewFile()161 void createOrAppendExisting_fileDoesNotExist_shouldCreateNewFile() throws Exception { 162 Path testPath = testFs.getPath("test_file.txt"); 163 assertThat(testPath).doesNotExist(); 164 String newContent = RandomStringUtils.randomAlphanumeric(500); 165 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, 166 FileTransformerConfiguration.defaultCreateOrAppend()); 167 stubSuccessfulStreaming(newContent, transformer); 168 assertThat(testPath).hasContent(newContent); 169 } 170 171 @Test createOrAppendExisting_fileExists_shouldAppend()172 void createOrAppendExisting_fileExists_shouldAppend() throws Exception { 173 Path testPath = testFs.getPath("test_file.txt"); 174 String existingString = RandomStringUtils.randomAlphanumeric(10); 175 byte[] existingBytes = existingString.getBytes(StandardCharsets.UTF_8); 176 Files.write(testPath, existingBytes); 177 String content = RandomStringUtils.randomAlphanumeric(20); 178 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, 179 FileTransformerConfiguration.defaultCreateOrAppend()); 180 stubSuccessfulStreaming(content, transformer); 181 assertThat(testPath).hasContent(existingString + content); 182 } 183 184 @ParameterizedTest 185 @MethodSource("configurations") exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration)186 void exceptionOccurred_deleteFileBehavior(FileTransformerConfiguration configuration) throws Exception { 187 Path testPath = testFs.getPath("test_file.txt"); 188 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, 189 configuration); 190 stubException(RandomStringUtils.random(200), transformer); 191 if (configuration.failureBehavior() == LEAVE) { 192 assertThat(testPath).exists(); 193 } else { 194 assertThat(testPath).doesNotExist(); 195 } 196 } 197 configurations()198 private static List<FileTransformerConfiguration> configurations() { 199 return Arrays.asList( 200 FileTransformerConfiguration.defaultCreateNew(), 201 FileTransformerConfiguration.defaultCreateOrAppend(), 202 FileTransformerConfiguration.defaultCreateOrReplaceExisting(), 203 FileTransformerConfiguration.builder() 204 .fileWriteOption(FileWriteOption.CREATE_NEW) 205 .failureBehavior(LEAVE).build(), 206 FileTransformerConfiguration.builder() 207 .fileWriteOption(FileWriteOption.CREATE_NEW) 208 .failureBehavior(DELETE).build(), 209 FileTransformerConfiguration.builder() 210 .fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING) 211 .failureBehavior(DELETE).build(), 212 FileTransformerConfiguration.builder() 213 .fileWriteOption(FileWriteOption.CREATE_OR_APPEND_TO_EXISTING) 214 .failureBehavior(LEAVE).build(), 215 FileTransformerConfiguration.builder() 216 .fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING) 217 .failureBehavior(DELETE).build(), 218 FileTransformerConfiguration.builder() 219 .fileWriteOption(FileWriteOption.CREATE_OR_REPLACE_EXISTING) 220 .failureBehavior(LEAVE).build()); 221 } 222 223 @Test explicitExecutor_shouldUseExecutor()224 void explicitExecutor_shouldUseExecutor() throws Exception { 225 Path testPath = testFs.getPath("test_file.txt"); 226 assertThat(testPath).doesNotExist(); 227 String newContent = RandomStringUtils.randomAlphanumeric(2000); 228 229 ExecutorService executor = Executors.newSingleThreadExecutor(); 230 try { 231 SpyingExecutorService spyingExecutorService = new SpyingExecutorService(executor); 232 FileTransformerConfiguration configuration = FileTransformerConfiguration 233 .builder() 234 .fileWriteOption(FileWriteOption.CREATE_NEW) 235 .failureBehavior(DELETE) 236 .executorService(spyingExecutorService) 237 .build(); 238 FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration); 239 240 stubSuccessfulStreaming(newContent, transformer); 241 assertThat(testPath).hasContent(newContent); 242 assertThat(spyingExecutorService.hasReceivedTasks()).isTrue(); 243 } finally { 244 executor.shutdown(); 245 assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); 246 } 247 } 248 stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer)249 private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception { 250 CompletableFuture<String> future = transformer.prepare(); 251 transformer.onResponse("foobar"); 252 253 transformer.onStream(testPublisher(newContent)); 254 255 future.get(10, TimeUnit.SECONDS); 256 assertThat(future.isCompletedExceptionally()).isFalse(); 257 } 258 stubException(String newContent, FileAsyncResponseTransformer<String> transformer)259 private static void stubException(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception { 260 CompletableFuture<String> future = transformer.prepare(); 261 transformer.onResponse("foobar"); 262 263 RuntimeException runtimeException = new RuntimeException("oops"); 264 ByteBuffer content = ByteBuffer.wrap(newContent.getBytes(StandardCharsets.UTF_8)); 265 transformer.onStream(SdkPublisher.adapt(Flowable.just(content, content))); 266 transformer.exceptionOccurred(runtimeException); 267 268 assertThatThrownBy(() -> future.get(10, TimeUnit.SECONDS)) 269 .hasRootCause(runtimeException); 270 assertThat(future.isCompletedExceptionally()).isTrue(); 271 } 272 testPublisher(String content)273 private static SdkPublisher<ByteBuffer> testPublisher(String content) { 274 return SdkPublisher.adapt(Flowable.just(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)))); 275 } 276 277 private static final class SpyingExecutorService implements ExecutorService { 278 private final ExecutorService executorService; 279 private boolean receivedTasks = false; 280 SpyingExecutorService(ExecutorService executorService)281 private SpyingExecutorService(ExecutorService executorService) { 282 this.executorService = executorService; 283 } 284 hasReceivedTasks()285 public boolean hasReceivedTasks() { 286 return receivedTasks; 287 } 288 289 @Override shutdown()290 public void shutdown() { 291 executorService.shutdown(); 292 } 293 294 @Override shutdownNow()295 public List<Runnable> shutdownNow() { 296 return executorService.shutdownNow(); 297 } 298 299 @Override isShutdown()300 public boolean isShutdown() { 301 return executorService.isShutdown(); 302 } 303 304 @Override isTerminated()305 public boolean isTerminated() { 306 return executorService.isTerminated(); 307 } 308 309 @Override awaitTermination(long timeout, TimeUnit unit)310 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 311 return executorService.awaitTermination(timeout, unit); 312 } 313 314 @Override submit(Callable<T> task)315 public <T> Future<T> submit(Callable<T> task) { 316 receivedTasks = true; 317 return executorService.submit(task); 318 } 319 320 @Override submit(Runnable task, T result)321 public <T> Future<T> submit(Runnable task, T result) { 322 receivedTasks = true; 323 return executorService.submit(task, result); 324 } 325 326 @Override submit(Runnable task)327 public Future<?> submit(Runnable task) { 328 receivedTasks = true; 329 return executorService.submit(task); 330 } 331 332 @Override invokeAll(Collection<? extends Callable<T>> tasks)333 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { 334 receivedTasks = true; 335 return executorService.invokeAll(tasks); 336 } 337 338 @Override invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)339 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { 340 receivedTasks = true; 341 return executorService.invokeAll(tasks, timeout, unit); 342 } 343 344 @Override invokeAny(Collection<? extends Callable<T>> tasks)345 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { 346 receivedTasks = true; 347 return executorService.invokeAny(tasks); 348 } 349 350 @Override invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)351 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 352 receivedTasks = true; 353 return executorService.invokeAny(tasks, timeout, unit); 354 } 355 356 @Override execute(Runnable command)357 public void execute(Runnable command) { 358 receivedTasks = true; 359 executorService.execute(command); 360 } 361 } 362 }