• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.atomicfu.*
5 import java.util.*
6 import java.util.concurrent.*
7 import kotlin.concurrent.*
8 import kotlin.test.*
9 
10 class JobHandlersUpgradeStressTest : TestBase() {
11     private val nSeconds = 3 * stressTestMultiplier
12     private val nThreads = 4
13 
14     private val cyclicBarrier = CyclicBarrier(1 + nThreads)
15     private val threads = mutableListOf<Thread>()
16 
17     private val inters = atomic(0)
18     private val removed = atomic(0)
19     private val fired = atomic(0)
20 
21     private val sink = atomic(0)
22 
23     @Volatile
24     private var done = false
25 
26     @Volatile
27     private var job: Job? = null
28 
29     internal class State {
30         val state = atomic(0)
31     }
32 
33     /**
34      * Tests handlers not being invoked more than once.
35      */
36     @Test
37     fun testStress() {
38         println("--- JobHandlersUpgradeStressTest")
39         threads += thread(name = "creator", start = false) {
40             val rnd = Random()
41             while (true) {
42                 job = if (done) null else Job()
43                 cyclicBarrier.await()
44                 val job = job ?: break
45                 // burn some time
46                 repeat(rnd.nextInt(3000)) { sink.incrementAndGet() }
47                 // cancel job
48                 job.cancel()
49                 cyclicBarrier.await()
50                 inters.incrementAndGet()
51             }
52         }
53         threads += List(nThreads) { threadId ->
54             thread(name = "handler-$threadId", start = false) {
55                 val rnd = Random()
56                 while (true) {
57                     val onCancelling = rnd.nextBoolean()
58                     val invokeImmediately: Boolean = rnd.nextBoolean()
59                     cyclicBarrier.await()
60                     val job = job ?: break
61                     val state = State()
62                     // burn some time
63                     repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
64                     val handle =
65                         job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) {
66                             if (!state.state.compareAndSet(0, 1))
67                                 error("Fired more than once or too late: state=${state.state.value}")
68                         }
69                     // burn some time
70                     repeat(rnd.nextInt(1000)) { sink.incrementAndGet() }
71                     // dispose
72                     handle.dispose()
73                     cyclicBarrier.await()
74                     val resultingState = state.state.value
75                     when (resultingState) {
76                         0 -> removed.incrementAndGet()
77                         1 -> fired.incrementAndGet()
78                         else -> error("Cannot happen")
79                     }
80                     if (!state.state.compareAndSet(resultingState, 2))
81                         error("Cannot fire late: resultingState=$resultingState")
82                 }
83             }
84         }
85         threads.forEach { it.start() }
86         repeat(nSeconds) { second ->
87             Thread.sleep(1000)
88             println("${second + 1}: ${inters.value} iterations")
89         }
90         done = true
91         threads.forEach { it.join() }
92         println("        Completed ${inters.value} iterations")
93         println("  Removed handler ${removed.value} times")
94         println("    Fired handler ${fired.value} times")
95 
96     }
97 }
98