• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.future
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import org.hamcrest.core.*
10 import org.junit.*
11 import org.junit.Assert.*
12 import java.util.concurrent.*
13 import java.util.concurrent.atomic.*
14 import java.util.concurrent.locks.*
15 import java.util.function.*
16 import kotlin.concurrent.*
17 import kotlin.coroutines.*
18 import kotlin.reflect.*
19 
20 class FutureTest : TestBase() {
21     @Before
22     fun setup() {
23         ignoreLostThreads("ForkJoinPool.commonPool-worker-")
24     }
25 
26     @Test
27     fun testSimpleAwait() {
28         val future = GlobalScope.future {
29             CompletableFuture.supplyAsync {
30                 "O"
31             }.await() + "K"
32         }
33         assertThat(future.get(), IsEqual("OK"))
34     }
35 
36     @Test
37     fun testCompletedFuture() {
38         val toAwait = CompletableFuture<String>()
39         toAwait.complete("O")
40         val future = GlobalScope.future {
41             toAwait.await() + "K"
42         }
43         assertThat(future.get(), IsEqual("OK"))
44     }
45 
46     @Test
47     fun testCompletedCompletionStage() {
48         val completable = CompletableFuture<String>()
49         completable.complete("O")
50         val toAwait: CompletionStage<String> = completable
51         val future = GlobalScope.future {
52             toAwait.await() + "K"
53         }
54         assertThat(future.get(), IsEqual("OK"))
55     }
56 
57     @Test
58     fun testWaitForFuture() {
59         val toAwait = CompletableFuture<String>()
60         val future = GlobalScope.future {
61             toAwait.await() + "K"
62         }
63         assertFalse(future.isDone)
64         toAwait.complete("O")
65         assertThat(future.get(), IsEqual("OK"))
66     }
67 
68     @Test
69     fun testWaitForCompletionStage() {
70         val completable = CompletableFuture<String>()
71         val toAwait: CompletionStage<String> = completable
72         val future = GlobalScope.future {
73             toAwait.await() + "K"
74         }
75         assertFalse(future.isDone)
76         completable.complete("O")
77         assertThat(future.get(), IsEqual("OK"))
78     }
79 
80     @Test
81     fun testCompletedFutureExceptionally() {
82         val toAwait = CompletableFuture<String>()
83         toAwait.completeExceptionally(TestException("O"))
84         val future = GlobalScope.future {
85             try {
86                 toAwait.await()
87             } catch (e: TestException) {
88                 e.message!!
89             } + "K"
90         }
91         assertThat(future.get(), IsEqual("OK"))
92     }
93 
94     @Test
95     // Test fast-path of CompletionStage.await() extension
96     fun testCompletedCompletionStageExceptionally() {
97         val completable = CompletableFuture<String>()
98         val toAwait: CompletionStage<String> = completable
99         completable.completeExceptionally(TestException("O"))
100         val future = GlobalScope.future {
101             try {
102                 toAwait.await()
103             } catch (e: TestException) {
104                 e.message!!
105             } + "K"
106         }
107         assertThat(future.get(), IsEqual("OK"))
108     }
109 
110     @Test
111     // Test slow-path of CompletionStage.await() extension
112     fun testWaitForFutureWithException() = runTest {
113         expect(1)
114         val toAwait = CompletableFuture<String>()
115         val future = future(start = CoroutineStart.UNDISPATCHED) {
116             try {
117                 expect(2)
118                 toAwait.await() // will suspend (slow path)
119             } catch (e: TestException) {
120                 expect(4)
121                 e.message!!
122             } + "K"
123         }
124         expect(3)
125         assertFalse(future.isDone)
126         toAwait.completeExceptionally(TestException("O"))
127         yield() // to future coroutine
128         assertThat(future.get(), IsEqual("OK"))
129         finish(5)
130     }
131 
132     @Test
133     fun testWaitForCompletionStageWithException() {
134         val completable = CompletableFuture<String>()
135         val toAwait: CompletionStage<String> = completable
136         val future = GlobalScope.future {
137             try {
138                 toAwait.await()
139             } catch (e: TestException) {
140                 e.message!!
141             } + "K"
142         }
143         assertFalse(future.isDone)
144         completable.completeExceptionally(TestException("O"))
145         assertThat(future.get(), IsEqual("OK"))
146     }
147 
148     @Test
149     fun testExceptionInsideCoroutine() {
150         val future = GlobalScope.future {
151             if (CompletableFuture.supplyAsync { true }.await()) {
152                 throw IllegalStateException("OK")
153             }
154             "fail"
155         }
156         try {
157             future.get()
158             fail("'get' should've throw an exception")
159         } catch (e: ExecutionException) {
160             assertTrue(e.cause is IllegalStateException)
161             assertThat(e.cause!!.message, IsEqual("OK"))
162         }
163     }
164 
165     @Test
166     fun testCancellableAwaitFuture() = runBlocking {
167         expect(1)
168         val toAwait = CompletableFuture<String>()
169         val job = launch(start = CoroutineStart.UNDISPATCHED) {
170             expect(2)
171             try {
172                 toAwait.await() // suspends
173             } catch (e: CancellationException) {
174                 expect(5) // should throw cancellation exception
175                 throw e
176             }
177         }
178         expect(3)
179         job.cancel() // cancel the job
180         toAwait.complete("fail") // too late, the waiting job was already cancelled
181         expect(4) // job processing of cancellation was scheduled, not executed yet
182         yield() // yield main thread to job
183         finish(6)
184     }
185 
186     @Test
187     fun testContinuationWrapped() {
188         val depth = AtomicInteger()
189         val future = GlobalScope.future(wrapContinuation {
190             depth.andIncrement
191             it()
192             depth.andDecrement
193         }) {
194             assertEquals("Part before first suspension must be wrapped", 1, depth.get())
195             val result =
196                     CompletableFuture.supplyAsync {
197                         while (depth.get() > 0);
198                         assertEquals("Part inside suspension point should not be wrapped", 0, depth.get())
199                         "OK"
200                     }.await()
201             assertEquals("Part after first suspension should be wrapped", 1, depth.get())
202             CompletableFuture.supplyAsync {
203                 while (depth.get() > 0);
204                 assertEquals("Part inside suspension point should not be wrapped", 0, depth.get())
205                 "ignored"
206             }.await()
207             result
208         }
209         assertThat(future.get(), IsEqual("OK"))
210     }
211 
212     @Test
213     fun testCompletableFutureStageAsDeferred() = runBlocking {
214         val lock = ReentrantLock().apply { lock() }
215 
216         val deferred: Deferred<Int> = CompletableFuture.supplyAsync {
217             lock.withLock { 42 }
218         }.asDeferred()
219 
220         assertFalse(deferred.isCompleted)
221         lock.unlock()
222 
223         assertEquals(42, deferred.await())
224         assertTrue(deferred.isCompleted)
225     }
226 
227     @Test
228     fun testCompletedFutureAsDeferred() = runBlocking {
229         val deferred: Deferred<Int> = CompletableFuture.completedFuture(42).asDeferred()
230         assertEquals(42, deferred.await())
231     }
232 
233     @Test
234     fun testFailedFutureAsDeferred() = runBlocking {
235         val future = CompletableFuture<Int>().apply {
236             completeExceptionally(TestException("something went wrong"))
237         }
238         val deferred = future.asDeferred()
239 
240         assertTrue(deferred.isCancelled)
241         val completionException = deferred.getCompletionExceptionOrNull()!!
242         assertTrue(completionException is TestException)
243         assertEquals("something went wrong", completionException.message)
244 
245         try {
246             deferred.await()
247             fail("deferred.await() should throw an exception")
248         } catch (e: Throwable) {
249             assertTrue(e is TestException)
250             assertEquals("something went wrong", e.message)
251         }
252     }
253 
254     @Test
255     fun testCompletableFutureWithExceptionAsDeferred() = runBlocking {
256         val lock = ReentrantLock().apply { lock() }
257 
258         val deferred: Deferred<Int> = CompletableFuture.supplyAsync {
259             lock.withLock { throw TestException("something went wrong") }
260         }.asDeferred()
261 
262         assertFalse(deferred.isCompleted)
263         lock.unlock()
264         try {
265             deferred.await()
266             fail("deferred.await() should throw an exception")
267         } catch (e: TestException) {
268             assertTrue(deferred.isCancelled)
269             assertEquals("something went wrong", e.message)
270         }
271     }
272 
273     private val threadLocal = ThreadLocal<String>()
274 
275     @Test
276     fun testApiBridge() = runTest {
277         val result = newSingleThreadContext("ctx").use {
278             val future = CompletableFuture.supplyAsync(Supplier { threadLocal.set("value") }, it.executor)
279             val job = async(it) {
280                 future.await()
281                 threadLocal.get()
282             }
283 
284             job.await()
285         }
286 
287         assertEquals("value", result)
288     }
289 
290     @Test
291     fun testFutureCancellation() = runTest {
292         val future = awaitFutureWithCancel(true)
293         assertTrue(future.isCompletedExceptionally)
294         assertFailsWith<CancellationException> { future.get() }
295         finish(4)
296     }
297 
298     @Test
299     fun testNoFutureCancellation() = runTest {
300         val future = awaitFutureWithCancel(false)
301         assertFalse(future.isCompletedExceptionally)
302         assertEquals(239, future.get())
303         finish(4)
304     }
305 
306     private suspend fun CoroutineScope.awaitFutureWithCancel(cancellable: Boolean): CompletableFuture<Int> {
307         val latch = CountDownLatch(1)
308         val future = CompletableFuture.supplyAsync {
309             latch.await()
310             239
311         }
312 
313         val deferred = async {
314             expect(2)
315             if (cancellable) future.await()
316             else future.asDeferred().await()
317         }
318         expect(1)
319         yield()
320         deferred.cancel()
321         expect(3)
322         latch.countDown()
323         return future
324     }
325 
326     @Test
327     fun testStructuredException() = runTest(
328         expected = { it is TestException } // exception propagates to parent with structured concurrency
329     ) {
330         val result = future<Int>(Dispatchers.Unconfined) {
331             throw TestException("FAIL")
332         }
333         result.checkFutureException<TestException>()
334     }
335 
336     @Test
337     fun testChildException() = runTest(
338         expected = { it is TestException } // exception propagates to parent with structured concurrency
339     ) {
340         val result = future(Dispatchers.Unconfined) {
341             // child crashes
342             launch { throw TestException("FAIL") }
343             42
344         }
345         result.checkFutureException<TestException>()
346     }
347 
348     @Test
349     fun testExceptionAggregation() = runTest(
350         expected = { it is TestException } // exception propagates to parent with structured concurrency
351     ) {
352         val result = future(Dispatchers.Unconfined) {
353             // child crashes
354             launch(start = CoroutineStart.ATOMIC) { throw TestException1("FAIL") }
355             launch(start = CoroutineStart.ATOMIC) { throw TestException2("FAIL") }
356             throw TestException()
357         }
358         result.checkFutureException<TestException>(TestException1::class, TestException2::class)
359         finish(1)
360     }
361 
362     @Test
363     fun testExternalCompletion() = runTest {
364         expect(1)
365         val result = future(Dispatchers.Unconfined) {
366             try {
367                 delay(Long.MAX_VALUE)
368             } finally {
369                 expect(2)
370             }
371         }
372 
373         result.complete(Unit)
374         finish(3)
375     }
376 
377     @Test
378     fun testExceptionOnExternalCompletion() = runTest(
379         expected = { it is TestException } // exception propagates to parent with structured concurrency
380     ) {
381         expect(1)
382         val result = future(Dispatchers.Unconfined) {
383             try {
384                 delay(Long.MAX_VALUE)
385             } finally {
386                 expect(2)
387                 throw TestException()
388             }
389         }
390         result.complete(Unit)
391         finish(3)
392     }
393 
394     @Test
395     fun testUnhandledExceptionOnExternalCompletion() = runTest(
396         unhandled = listOf(
397             { it -> it is TestException } // exception is unhandled because there is no parent
398         )
399     ) {
400         expect(1)
401         // No parent here (NonCancellable), so nowhere to propagate exception
402         val result = future(NonCancellable + Dispatchers.Unconfined) {
403             try {
404                 delay(Long.MAX_VALUE)
405             } finally {
406                 expect(2)
407                 throw TestException() // this exception cannot be handled
408             }
409         }
410         result.complete(Unit)
411         finish(3)
412     }
413 
414     /**
415      * See [https://github.com/Kotlin/kotlinx.coroutines/issues/892]
416      */
417     @Test
418     fun testTimeoutCancellationFailRace() {
419         repeat(10 * stressTestMultiplier) {
420             runBlocking {
421                 withTimeoutOrNull(10) {
422                     while (true) {
423                         var caught = false
424                         try {
425                             CompletableFuture.supplyAsync {
426                                 throw TestException()
427                             }.await()
428                         } catch (ignored: TestException) {
429                             caught = true
430                         }
431                         assertTrue(caught) // should have caught TestException or timed out
432                     }
433                 }
434             }
435         }
436     }
437 
438     /**
439      * Tests that both [CompletionStage.await] and [CompletionStage.asDeferred] consistently unwrap
440      * [CompletionException] both in their slow and fast paths.
441      * See [issue #1479](https://github.com/Kotlin/kotlinx.coroutines/issues/1479).
442      */
443     @Test
444     fun testConsistentExceptionUnwrapping() = runTest {
445         expect(1)
446         // Check the fast path
447         val fFast = CompletableFuture.supplyAsync {
448             expect(2)
449             throw TestException()
450         }
451         fFast.checkFutureException<TestException>() // wait until it completes
452         // Fast path in await and asDeferred.await() shall produce TestException
453         expect(3)
454         val dFast = fFast.asDeferred()
455         assertFailsWith<TestException> { fFast.await() }
456         assertFailsWith<TestException> { dFast.await() }
457         // Same test, but future has not completed yet, check the slow path
458         expect(4)
459         val barrier = CyclicBarrier(2)
460         val fSlow = CompletableFuture.supplyAsync {
461             barrier.await()
462             expect(6)
463             throw TestException()
464         }
465         val dSlow = fSlow.asDeferred()
466         launch(start = CoroutineStart.UNDISPATCHED) {
467             expect(5)
468             // Slow path on await shall produce TestException, too
469             assertFailsWith<TestException> { fSlow.await() } // will suspend here
470             assertFailsWith<TestException> { dSlow.await() }
471             finish(7)
472         }
473         barrier.await()
474         fSlow.checkFutureException<TestException>() // now wait until it completes
475     }
476 
477     private inline fun <reified T: Throwable> CompletableFuture<*>.checkFutureException(vararg suppressed: KClass<out Throwable>) {
478         val e = assertFailsWith<ExecutionException> { get() }
479         val cause = e.cause!!
480         assertTrue(cause is T)
481         for ((index, clazz) in suppressed.withIndex()) {
482             assertTrue(clazz.isInstance(cause.suppressed[index]))
483         }
484     }
485 
486     private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
487         override fun dispatch(context: CoroutineContext, block: Runnable) {
488             wrapper {
489                 block.run()
490             }
491         }
492     }
493 }
494