1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.future 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.CancellationException 9 import org.hamcrest.core.* 10 import org.junit.* 11 import org.junit.Assert.* 12 import java.util.concurrent.* 13 import java.util.concurrent.atomic.* 14 import java.util.concurrent.locks.* 15 import java.util.function.* 16 import kotlin.concurrent.* 17 import kotlin.coroutines.* 18 import kotlin.reflect.* 19 20 class FutureTest : TestBase() { 21 @Before 22 fun setup() { 23 ignoreLostThreads("ForkJoinPool.commonPool-worker-") 24 } 25 26 @Test 27 fun testSimpleAwait() { 28 val future = GlobalScope.future { 29 CompletableFuture.supplyAsync { 30 "O" 31 }.await() + "K" 32 } 33 assertThat(future.get(), IsEqual("OK")) 34 } 35 36 @Test 37 fun testCompletedFuture() { 38 val toAwait = CompletableFuture<String>() 39 toAwait.complete("O") 40 val future = GlobalScope.future { 41 toAwait.await() + "K" 42 } 43 assertThat(future.get(), IsEqual("OK")) 44 } 45 46 @Test 47 fun testCompletedCompletionStage() { 48 val completable = CompletableFuture<String>() 49 completable.complete("O") 50 val toAwait: CompletionStage<String> = completable 51 val future = GlobalScope.future { 52 toAwait.await() + "K" 53 } 54 assertThat(future.get(), IsEqual("OK")) 55 } 56 57 @Test 58 fun testWaitForFuture() { 59 val toAwait = CompletableFuture<String>() 60 val future = GlobalScope.future { 61 toAwait.await() + "K" 62 } 63 assertFalse(future.isDone) 64 toAwait.complete("O") 65 assertThat(future.get(), IsEqual("OK")) 66 } 67 68 @Test 69 fun testWaitForCompletionStage() { 70 val completable = CompletableFuture<String>() 71 val toAwait: CompletionStage<String> = completable 72 val future = GlobalScope.future { 73 toAwait.await() + "K" 74 } 75 assertFalse(future.isDone) 76 completable.complete("O") 77 assertThat(future.get(), IsEqual("OK")) 78 } 79 80 @Test 81 fun testCompletedFutureExceptionally() { 82 val toAwait = CompletableFuture<String>() 83 toAwait.completeExceptionally(TestException("O")) 84 val future = GlobalScope.future { 85 try { 86 toAwait.await() 87 } catch (e: TestException) { 88 e.message!! 89 } + "K" 90 } 91 assertThat(future.get(), IsEqual("OK")) 92 } 93 94 @Test 95 // Test fast-path of CompletionStage.await() extension 96 fun testCompletedCompletionStageExceptionally() { 97 val completable = CompletableFuture<String>() 98 val toAwait: CompletionStage<String> = completable 99 completable.completeExceptionally(TestException("O")) 100 val future = GlobalScope.future { 101 try { 102 toAwait.await() 103 } catch (e: TestException) { 104 e.message!! 105 } + "K" 106 } 107 assertThat(future.get(), IsEqual("OK")) 108 } 109 110 @Test 111 // Test slow-path of CompletionStage.await() extension 112 fun testWaitForFutureWithException() = runTest { 113 expect(1) 114 val toAwait = CompletableFuture<String>() 115 val future = future(start = CoroutineStart.UNDISPATCHED) { 116 try { 117 expect(2) 118 toAwait.await() // will suspend (slow path) 119 } catch (e: TestException) { 120 expect(4) 121 e.message!! 122 } + "K" 123 } 124 expect(3) 125 assertFalse(future.isDone) 126 toAwait.completeExceptionally(TestException("O")) 127 yield() // to future coroutine 128 assertThat(future.get(), IsEqual("OK")) 129 finish(5) 130 } 131 132 @Test 133 fun testWaitForCompletionStageWithException() { 134 val completable = CompletableFuture<String>() 135 val toAwait: CompletionStage<String> = completable 136 val future = GlobalScope.future { 137 try { 138 toAwait.await() 139 } catch (e: TestException) { 140 e.message!! 141 } + "K" 142 } 143 assertFalse(future.isDone) 144 completable.completeExceptionally(TestException("O")) 145 assertThat(future.get(), IsEqual("OK")) 146 } 147 148 @Test 149 fun testExceptionInsideCoroutine() { 150 val future = GlobalScope.future { 151 if (CompletableFuture.supplyAsync { true }.await()) { 152 throw IllegalStateException("OK") 153 } 154 "fail" 155 } 156 try { 157 future.get() 158 fail("'get' should've throw an exception") 159 } catch (e: ExecutionException) { 160 assertTrue(e.cause is IllegalStateException) 161 assertThat(e.cause!!.message, IsEqual("OK")) 162 } 163 } 164 165 @Test 166 fun testCancellableAwaitFuture() = runBlocking { 167 expect(1) 168 val toAwait = CompletableFuture<String>() 169 val job = launch(start = CoroutineStart.UNDISPATCHED) { 170 expect(2) 171 try { 172 toAwait.await() // suspends 173 } catch (e: CancellationException) { 174 expect(5) // should throw cancellation exception 175 throw e 176 } 177 } 178 expect(3) 179 job.cancel() // cancel the job 180 toAwait.complete("fail") // too late, the waiting job was already cancelled 181 expect(4) // job processing of cancellation was scheduled, not executed yet 182 yield() // yield main thread to job 183 finish(6) 184 } 185 186 @Test 187 fun testContinuationWrapped() { 188 val depth = AtomicInteger() 189 val future = GlobalScope.future(wrapContinuation { 190 depth.andIncrement 191 it() 192 depth.andDecrement 193 }) { 194 assertEquals("Part before first suspension must be wrapped", 1, depth.get()) 195 val result = 196 CompletableFuture.supplyAsync { 197 while (depth.get() > 0); 198 assertEquals("Part inside suspension point should not be wrapped", 0, depth.get()) 199 "OK" 200 }.await() 201 assertEquals("Part after first suspension should be wrapped", 1, depth.get()) 202 CompletableFuture.supplyAsync { 203 while (depth.get() > 0); 204 assertEquals("Part inside suspension point should not be wrapped", 0, depth.get()) 205 "ignored" 206 }.await() 207 result 208 } 209 assertThat(future.get(), IsEqual("OK")) 210 } 211 212 @Test 213 fun testCompletableFutureStageAsDeferred() = runBlocking { 214 val lock = ReentrantLock().apply { lock() } 215 216 val deferred: Deferred<Int> = CompletableFuture.supplyAsync { 217 lock.withLock { 42 } 218 }.asDeferred() 219 220 assertFalse(deferred.isCompleted) 221 lock.unlock() 222 223 assertEquals(42, deferred.await()) 224 assertTrue(deferred.isCompleted) 225 } 226 227 @Test 228 fun testCompletedFutureAsDeferred() = runBlocking { 229 val deferred: Deferred<Int> = CompletableFuture.completedFuture(42).asDeferred() 230 assertEquals(42, deferred.await()) 231 } 232 233 @Test 234 fun testFailedFutureAsDeferred() = runBlocking { 235 val future = CompletableFuture<Int>().apply { 236 completeExceptionally(TestException("something went wrong")) 237 } 238 val deferred = future.asDeferred() 239 240 assertTrue(deferred.isCancelled) 241 val completionException = deferred.getCompletionExceptionOrNull()!! 242 assertTrue(completionException is TestException) 243 assertEquals("something went wrong", completionException.message) 244 245 try { 246 deferred.await() 247 fail("deferred.await() should throw an exception") 248 } catch (e: Throwable) { 249 assertTrue(e is TestException) 250 assertEquals("something went wrong", e.message) 251 } 252 } 253 254 @Test 255 fun testCompletableFutureWithExceptionAsDeferred() = runBlocking { 256 val lock = ReentrantLock().apply { lock() } 257 258 val deferred: Deferred<Int> = CompletableFuture.supplyAsync { 259 lock.withLock { throw TestException("something went wrong") } 260 }.asDeferred() 261 262 assertFalse(deferred.isCompleted) 263 lock.unlock() 264 try { 265 deferred.await() 266 fail("deferred.await() should throw an exception") 267 } catch (e: TestException) { 268 assertTrue(deferred.isCancelled) 269 assertEquals("something went wrong", e.message) 270 } 271 } 272 273 private val threadLocal = ThreadLocal<String>() 274 275 @Test 276 fun testApiBridge() = runTest { 277 val result = newSingleThreadContext("ctx").use { 278 val future = CompletableFuture.supplyAsync(Supplier { threadLocal.set("value") }, it.executor) 279 val job = async(it) { 280 future.await() 281 threadLocal.get() 282 } 283 284 job.await() 285 } 286 287 assertEquals("value", result) 288 } 289 290 @Test 291 fun testFutureCancellation() = runTest { 292 val future = awaitFutureWithCancel(true) 293 assertTrue(future.isCompletedExceptionally) 294 assertFailsWith<CancellationException> { future.get() } 295 finish(4) 296 } 297 298 @Test 299 fun testNoFutureCancellation() = runTest { 300 val future = awaitFutureWithCancel(false) 301 assertFalse(future.isCompletedExceptionally) 302 assertEquals(239, future.get()) 303 finish(4) 304 } 305 306 private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): CompletableFuture<Int> { 307 val latch = CountDownLatch(1) 308 val future = CompletableFuture.supplyAsync { 309 latch.await() 310 239 311 } 312 313 val deferred = async { 314 expect(2) 315 if (cancellable) future.await() 316 else future.asDeferred().await() 317 } 318 expect(1) 319 yield() 320 deferred.cancel() 321 expect(3) 322 latch.countDown() 323 return future 324 } 325 326 @Test 327 fun testStructuredException() = runTest( 328 expected = { it is TestException } // exception propagates to parent with structured concurrency 329 ) { 330 val result = future<Int>(Dispatchers.Unconfined) { 331 throw TestException("FAIL") 332 } 333 result.checkFutureException<TestException>() 334 } 335 336 @Test 337 fun testChildException() = runTest( 338 expected = { it is TestException } // exception propagates to parent with structured concurrency 339 ) { 340 val result = future(Dispatchers.Unconfined) { 341 // child crashes 342 launch { throw TestException("FAIL") } 343 42 344 } 345 result.checkFutureException<TestException>() 346 } 347 348 @Test 349 fun testExceptionAggregation() = runTest( 350 expected = { it is TestException } // exception propagates to parent with structured concurrency 351 ) { 352 val result = future(Dispatchers.Unconfined) { 353 // child crashes 354 launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") } 355 launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") } 356 throw TestException() 357 } 358 result.checkFutureException<TestException>(TestException1::class, TestException2::class) 359 finish(1) 360 } 361 362 @Test 363 fun testExternalCompletion() = runTest { 364 expect(1) 365 val result = future(Dispatchers.Unconfined) { 366 try { 367 delay(Long.MAX_VALUE) 368 } finally { 369 expect(2) 370 } 371 } 372 373 result.complete(Unit) 374 finish(3) 375 } 376 377 @Test 378 fun testExceptionOnExternalCompletion() = runTest( 379 expected = { it is TestException } // exception propagates to parent with structured concurrency 380 ) { 381 expect(1) 382 val result = future(Dispatchers.Unconfined) { 383 try { 384 delay(Long.MAX_VALUE) 385 } finally { 386 expect(2) 387 throw TestException() 388 } 389 } 390 result.complete(Unit) 391 finish(3) 392 } 393 394 @Test 395 fun testUnhandledExceptionOnExternalCompletion() = runTest( 396 unhandled = listOf( 397 { it -> it is TestException } // exception is unhandled because there is no parent 398 ) 399 ) { 400 expect(1) 401 // No parent here (NonCancellable), so nowhere to propagate exception 402 val result = future(NonCancellable + Dispatchers.Unconfined) { 403 try { 404 delay(Long.MAX_VALUE) 405 } finally { 406 expect(2) 407 throw TestException() // this exception cannot be handled 408 } 409 } 410 result.complete(Unit) 411 finish(3) 412 } 413 414 /** 415 * See [https://github.com/Kotlin/kotlinx.coroutines/issues/892] 416 */ 417 @Test 418 fun testTimeoutCancellationFailRace() { 419 repeat(10 * stressTestMultiplier) { 420 runBlocking { 421 withTimeoutOrNull(10) { 422 while (true) { 423 var caught = false 424 try { 425 CompletableFuture.supplyAsync { 426 throw TestException() 427 }.await() 428 } catch (ignored: TestException) { 429 caught = true 430 } 431 assertTrue(caught) // should have caught TestException or timed out 432 } 433 } 434 } 435 } 436 } 437 438 /** 439 * Tests that both [CompletionStage.await] and [CompletionStage.asDeferred] consistently unwrap 440 * [CompletionException] both in their slow and fast paths. 441 * See [issue #1479](https://github.com/Kotlin/kotlinx.coroutines/issues/1479). 442 */ 443 @Test 444 fun testConsistentExceptionUnwrapping() = runTest { 445 expect(1) 446 // Check the fast path 447 val fFast = CompletableFuture.supplyAsync { 448 expect(2) 449 throw TestException() 450 } 451 fFast.checkFutureException<TestException>() // wait until it completes 452 // Fast path in await and asDeferred.await() shall produce TestException 453 expect(3) 454 val dFast = fFast.asDeferred() 455 assertFailsWith<TestException> { fFast.await() } 456 assertFailsWith<TestException> { dFast.await() } 457 // Same test, but future has not completed yet, check the slow path 458 expect(4) 459 val barrier = CyclicBarrier(2) 460 val fSlow = CompletableFuture.supplyAsync { 461 barrier.await() 462 expect(6) 463 throw TestException() 464 } 465 val dSlow = fSlow.asDeferred() 466 launch(start = CoroutineStart.UNDISPATCHED) { 467 expect(5) 468 // Slow path on await shall produce TestException, too 469 assertFailsWith<TestException> { fSlow.await() } // will suspend here 470 assertFailsWith<TestException> { dSlow.await() } 471 finish(7) 472 } 473 barrier.await() 474 fSlow.checkFutureException<TestException>() // now wait until it completes 475 } 476 477 private inline fun <reified T: Throwable> CompletableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) { 478 val e = assertFailsWith<ExecutionException> { get() } 479 val cause = e.cause!! 480 assertTrue(cause is T) 481 for ((index, clazz) in suppressed.withIndex()) { 482 assertTrue(clazz.isInstance(cause.suppressed[index])) 483 } 484 } 485 486 private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() { 487 override fun dispatch(context: CoroutineContext, block: Runnable) { 488 wrapper { 489 block.run() 490 } 491 } 492 } 493 } 494