• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.testing.*
4 import org.junit.*
5 import org.junit.Test
6 import java.util.concurrent.*
7 
8 class AwaitStressTest : TestBase() {
9 
10     private val iterations = 50_000 * stressTestMultiplier
11     @get:Rule
12     public val pool =  ExecutorRule(4)
13 
14     @Test
<lambda>null15     fun testMultipleExceptions() = runTest {
16         val ctx = pool + NonCancellable
17         repeat(iterations) {
18             val barrier = CyclicBarrier(4)
19             val d1 = async(ctx) {
20                 barrier.await()
21                 throw TestException()
22             }
23             val d2 = async(ctx) {
24                 barrier.await()
25                 throw TestException()
26             }
27             val d3 = async(ctx) {
28                 barrier.await()
29                 1L
30             }
31             try {
32                 barrier.await()
33                 awaitAll(d1, d2, d3)
34                 expectUnreached()
35             } catch (e: TestException) {
36                 // Expected behaviour
37             }
38 
39             barrier.reset()
40         }
41     }
42 
43     @Test
<lambda>null44     fun testAwaitAll() = runTest {
45         val barrier = CyclicBarrier(3)
46         repeat(iterations) {
47             val d1 = async(pool) {
48                 barrier.await()
49                 1L
50             }
51             val d2 = async(pool) {
52                 barrier.await()
53                 2L
54             }
55             barrier.await()
56             awaitAll(d1, d2)
57             require(d1.isCompleted && d2.isCompleted)
58             barrier.reset()
59         }
60     }
61 
62     @Test
<lambda>null63     fun testConcurrentCancellation() = runTest {
64         var cancelledOnce = false
65         repeat(iterations) {
66             val barrier = CyclicBarrier(3)
67 
68             val d1 = async(pool) {
69                 barrier.await()
70                 delay(10_000)
71                 yield()
72             }
73 
74             val d2 = async(pool) {
75                 barrier.await()
76                 d1.cancel()
77             }
78 
79             barrier.await()
80             try {
81                 awaitAll(d1, d2)
82             } catch (e: CancellationException) {
83                 cancelledOnce = true
84             }
85         }
86 
87         require(cancelledOnce) { "Cancellation exception wasn't properly caught" }
88     }
89 
90     @Test
<lambda>null91     fun testMutatingCollection() = runTest {
92         val barrier = CyclicBarrier(4)
93 
94         repeat(iterations) {
95             // thread-safe collection that we are going to modify
96             val deferreds = CopyOnWriteArrayList<Deferred<Long>>()
97 
98             deferreds += async(pool) {
99                 barrier.await()
100                 1L
101             }
102 
103             deferreds += async(pool) {
104                 barrier.await()
105                 2L
106             }
107 
108             deferreds += async(pool) {
109                 barrier.await()
110                 deferreds.removeAt(2)
111                 3L
112             }
113 
114             val allJobs = ArrayList(deferreds)
115             barrier.await()
116             val results = deferreds.awaitAll() // shouldn't hang
117             check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L))
118             allJobs.awaitAll()
119             barrier.reset()
120         }
121     }
122 }
123