1 package kotlinx.coroutines 2 3 import kotlinx.coroutines.testing.* 4 import org.junit.* 5 import org.junit.Test 6 import java.util.concurrent.* 7 8 class AwaitStressTest : TestBase() { 9 10 private val iterations = 50_000 * stressTestMultiplier 11 @get:Rule 12 public val pool = ExecutorRule(4) 13 14 @Test <lambda>null15 fun testMultipleExceptions() = runTest { 16 val ctx = pool + NonCancellable 17 repeat(iterations) { 18 val barrier = CyclicBarrier(4) 19 val d1 = async(ctx) { 20 barrier.await() 21 throw TestException() 22 } 23 val d2 = async(ctx) { 24 barrier.await() 25 throw TestException() 26 } 27 val d3 = async(ctx) { 28 barrier.await() 29 1L 30 } 31 try { 32 barrier.await() 33 awaitAll(d1, d2, d3) 34 expectUnreached() 35 } catch (e: TestException) { 36 // Expected behaviour 37 } 38 39 barrier.reset() 40 } 41 } 42 43 @Test <lambda>null44 fun testAwaitAll() = runTest { 45 val barrier = CyclicBarrier(3) 46 repeat(iterations) { 47 val d1 = async(pool) { 48 barrier.await() 49 1L 50 } 51 val d2 = async(pool) { 52 barrier.await() 53 2L 54 } 55 barrier.await() 56 awaitAll(d1, d2) 57 require(d1.isCompleted && d2.isCompleted) 58 barrier.reset() 59 } 60 } 61 62 @Test <lambda>null63 fun testConcurrentCancellation() = runTest { 64 var cancelledOnce = false 65 repeat(iterations) { 66 val barrier = CyclicBarrier(3) 67 68 val d1 = async(pool) { 69 barrier.await() 70 delay(10_000) 71 yield() 72 } 73 74 val d2 = async(pool) { 75 barrier.await() 76 d1.cancel() 77 } 78 79 barrier.await() 80 try { 81 awaitAll(d1, d2) 82 } catch (e: CancellationException) { 83 cancelledOnce = true 84 } 85 } 86 87 require(cancelledOnce) { "Cancellation exception wasn't properly caught" } 88 } 89 90 @Test <lambda>null91 fun testMutatingCollection() = runTest { 92 val barrier = CyclicBarrier(4) 93 94 repeat(iterations) { 95 // thread-safe collection that we are going to modify 96 val deferreds = CopyOnWriteArrayList<Deferred<Long>>() 97 98 deferreds += async(pool) { 99 barrier.await() 100 1L 101 } 102 103 deferreds += async(pool) { 104 barrier.await() 105 2L 106 } 107 108 deferreds += async(pool) { 109 barrier.await() 110 deferreds.removeAt(2) 111 3L 112 } 113 114 val allJobs = ArrayList(deferreds) 115 barrier.await() 116 val results = deferreds.awaitAll() // shouldn't hang 117 check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L)) 118 allJobs.awaitAll() 119 barrier.reset() 120 } 121 } 122 } 123