• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.channels.*
4 import kotlinx.coroutines.testing.*
5 import java.util.concurrent.CyclicBarrier
6 import java.util.concurrent.atomic.*
7 import kotlin.test.*
8 import kotlin.time.Duration.Companion.seconds
9 
10 class JobOnCompletionStressTest: TestBase() {
11     private val N_ITERATIONS = 10_000 * stressTestMultiplier
12     private val pool = newFixedThreadPoolContext(2, "JobOnCompletionStressTest")
13 
14     private val completionHandlerSeesCompletedParent = AtomicBoolean(false)
15     private val completionHandlerSeesCancelledParent = AtomicBoolean(false)
16     private val encounteredException = AtomicReference<Throwable?>(null)
17 
18     @AfterTest
19     fun tearDown() {
20         pool.close()
21     }
22 
23     @Test
24     fun testOnCompletionRacingWithCompletion() = runTest {
25         testHandlerRacingWithCancellation(
26             onCancelling = false,
27             invokeImmediately = true,
28             parentCompletion = { complete(Unit) }
29         ) {
30             assertNull(encounteredException.get())
31             assertTrue(completionHandlerSeesCompletedParent.get())
32             assertFalse(completionHandlerSeesCancelledParent.get())
33         }
34     }
35 
36     @Test
37     fun testOnCompletionRacingWithCancellation() = runTest {
38         testHandlerRacingWithCancellation(
39             onCancelling = false,
40             invokeImmediately = true,
41             parentCompletion = { completeExceptionally(TestException()) }
42         ) {
43             assertIs<TestException>(encounteredException.get())
44             assertTrue(completionHandlerSeesCompletedParent.get())
45             assertTrue(completionHandlerSeesCancelledParent.get())
46         }
47     }
48 
49     @Test
50     fun testOnCancellingRacingWithCompletion() = runTest {
51         testHandlerRacingWithCancellation(
52             onCancelling = true,
53             invokeImmediately = true,
54             parentCompletion = { complete(Unit) }
55         ) {
56             assertNull(encounteredException.get())
57             assertTrue(completionHandlerSeesCompletedParent.get())
58             assertFalse(completionHandlerSeesCancelledParent.get())
59         }
60     }
61 
62     @Test
63     fun testOnCancellingRacingWithCancellation() = runTest {
64         testHandlerRacingWithCancellation(
65             onCancelling = true,
66             invokeImmediately = true,
67             parentCompletion = { completeExceptionally(TestException()) }
68         ) {
69             assertIs<TestException>(encounteredException.get())
70             assertTrue(completionHandlerSeesCancelledParent.get())
71         }
72     }
73 
74     @Test
75     fun testNonImmediateOnCompletionRacingWithCompletion() = runTest {
76         testHandlerRacingWithCancellation(
77             onCancelling = false,
78             invokeImmediately = false,
79             parentCompletion = { complete(Unit) }
80         ) {
81             assertNull(encounteredException.get())
82             assertTrue(completionHandlerSeesCompletedParent.get())
83             assertFalse(completionHandlerSeesCancelledParent.get())
84         }
85     }
86 
87     @Test
88     fun testNonImmediateOnCompletionRacingWithCancellation() = runTest {
89         testHandlerRacingWithCancellation(
90             onCancelling = false,
91             invokeImmediately = false,
92             parentCompletion = { completeExceptionally(TestException()) }
93         ) {
94             assertIs<TestException>(encounteredException.get())
95             assertTrue(completionHandlerSeesCompletedParent.get())
96             assertTrue(completionHandlerSeesCancelledParent.get())
97         }
98     }
99 
100     @Test
101     fun testNonImmediateOnCancellingRacingWithCompletion() = runTest {
102         testHandlerRacingWithCancellation(
103             onCancelling = true,
104             invokeImmediately = false,
105             parentCompletion = { complete(Unit) }
106         ) {
107             assertNull(encounteredException.get())
108             assertTrue(completionHandlerSeesCompletedParent.get())
109             assertFalse(completionHandlerSeesCancelledParent.get())
110         }
111     }
112 
113     @Test
114     fun testNonImmediateOnCancellingRacingWithCancellation() = runTest {
115         testHandlerRacingWithCancellation(
116             onCancelling = true,
117             invokeImmediately = false,
118             parentCompletion = { completeExceptionally(TestException()) }
119         ) {
120             assertIs<TestException>(encounteredException.get())
121             assertTrue(completionHandlerSeesCancelledParent.get())
122         }
123     }
124 
125     private suspend fun testHandlerRacingWithCancellation(
126         onCancelling: Boolean,
127         invokeImmediately: Boolean,
128         parentCompletion: CompletableDeferred<Unit>.() -> Unit,
129         validate: () -> Unit,
130     ) {
131         repeat(N_ITERATIONS) {
132             val entered = Channel<Unit>(1)
133             completionHandlerSeesCompletedParent.set(false)
134             completionHandlerSeesCancelledParent.set(false)
135             encounteredException.set(null)
136             val parent = createCompletableDeferredForTesting(it)
137             val barrier = CyclicBarrier(2)
138             val handlerInstallJob = coroutineScope {
139                 launch(pool) {
140                     barrier.await()
141                     parent.parentCompletion()
142                 }
143                 async(pool) {
144                     barrier.await()
145                     parent.invokeOnCompletion(
146                         onCancelling = onCancelling,
147                         invokeImmediately = invokeImmediately,
148                     ) { exception ->
149                         encounteredException.set(exception)
150                         completionHandlerSeesCompletedParent.set(parent.isCompleted)
151                         completionHandlerSeesCancelledParent.set(parent.isCancelled)
152                         entered.trySend(Unit)
153                     }
154                 }
155             }
156             if (invokeImmediately || handlerInstallJob.getCompleted() !== NonDisposableHandle) {
157                 withTimeout(1.seconds) {
158                     entered.receive()
159                 }
160                 try {
161                     validate()
162                 } catch (e: Throwable) {
163                     println("Iteration $it failed")
164                     println("invokeOnCompletion returned ${handlerInstallJob.getCompleted()}")
165                     throw e
166                 }
167             } else {
168                 assertTrue(entered.isEmpty)
169             }
170         }
171     }
172 }
173 
174 /**
175  * Creates a [CompletableDeferred], optionally adding completion handlers and/or other children to the job depending
176  * on [iteration].
177  * The purpose is to test not just attaching completion handlers to empty or one-element lists (see the [JobSupport]
178  * implementation for details on what this means), but also to lists with multiple elements.
179  */
createCompletableDeferredForTestingnull180 fun createCompletableDeferredForTesting(iteration: Int): CompletableDeferred<Unit> {
181     val parent = CompletableDeferred<Unit>()
182     /* We optionally add completion handlers and/or other children to the parent job
183        to test the scenarios where a child is placed into an empty list, a single-element list,
184        or a list with multiple elements. */
185     if (iteration.mod(2) == 0) {
186         parent.invokeOnCompletion { }
187     }
188     if (iteration.mod(3) == 0) {
189         GlobalScope.launch(parent) { }
190     }
191     return parent
192 }
193