• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import org.junit.*
8 import org.junit.Test
9 import java.util.concurrent.*
10 
11 class AwaitStressTest : TestBase() {
12 
13     private val iterations = 50_000 * stressTestMultiplier
14     @get:Rule
15     public val pool =  ExecutorRule(4)
16 
17     @Test
<lambda>null18     fun testMultipleExceptions() = runTest {
19         val ctx = pool + NonCancellable
20         repeat(iterations) {
21             val barrier = CyclicBarrier(4)
22             val d1 = async(ctx) {
23                 barrier.await()
24                 throw TestException()
25             }
26             val d2 = async(ctx) {
27                 barrier.await()
28                 throw TestException()
29             }
30             val d3 = async(ctx) {
31                 barrier.await()
32                 1L
33             }
34             try {
35                 barrier.await()
36                 awaitAll(d1, d2, d3)
37                 expectUnreached()
38             } catch (e: TestException) {
39                 // Expected behaviour
40             }
41 
42             barrier.reset()
43         }
44     }
45 
46     @Test
<lambda>null47     fun testAwaitAll() = runTest {
48         val barrier = CyclicBarrier(3)
49         repeat(iterations) {
50             val d1 = async(pool) {
51                 barrier.await()
52                 1L
53             }
54             val d2 = async(pool) {
55                 barrier.await()
56                 2L
57             }
58             barrier.await()
59             awaitAll(d1, d2)
60             require(d1.isCompleted && d2.isCompleted)
61             barrier.reset()
62         }
63     }
64 
65     @Test
<lambda>null66     fun testConcurrentCancellation() = runTest {
67         var cancelledOnce = false
68         repeat(iterations) {
69             val barrier = CyclicBarrier(3)
70 
71             val d1 = async(pool) {
72                 barrier.await()
73                 delay(10_000)
74                 yield()
75             }
76 
77             val d2 = async(pool) {
78                 barrier.await()
79                 d1.cancel()
80             }
81 
82             barrier.await()
83             try {
84                 awaitAll(d1, d2)
85             } catch (e: CancellationException) {
86                 cancelledOnce = true
87             }
88         }
89 
90         require(cancelledOnce) { "Cancellation exception wasn't properly caught" }
91     }
92 
93     @Test
<lambda>null94     fun testMutatingCollection() = runTest {
95         val barrier = CyclicBarrier(4)
96 
97         repeat(iterations) {
98             // thread-safe collection that we are going to modify
99             val deferreds = CopyOnWriteArrayList<Deferred<Long>>()
100 
101             deferreds += async(pool) {
102                 barrier.await()
103                 1L
104             }
105 
106             deferreds += async(pool) {
107                 barrier.await()
108                 2L
109             }
110 
111             deferreds += async(pool) {
112                 barrier.await()
113                 deferreds.removeAt(2)
114                 3L
115             }
116 
117             val allJobs = ArrayList(deferreds)
118             barrier.await()
119             val results = deferreds.awaitAll() // shouldn't hang
120             check(results == listOf(1L, 2L, 3L) || results == listOf(1L, 2L))
121             allJobs.awaitAll()
122             barrier.reset()
123         }
124     }
125 }
126