<lambda>null1package kotlinx.coroutines 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.atomicfu.* 5 import java.util.* 6 import java.util.concurrent.* 7 import kotlin.concurrent.* 8 import kotlin.test.* 9 10 class JobHandlersUpgradeStressTest : TestBase() { 11 private val nSeconds = 3 * stressTestMultiplier 12 private val nThreads = 4 13 14 private val cyclicBarrier = CyclicBarrier(1 + nThreads) 15 private val threads = mutableListOf<Thread>() 16 17 private val inters = atomic(0) 18 private val removed = atomic(0) 19 private val fired = atomic(0) 20 21 private val sink = atomic(0) 22 23 @Volatile 24 private var done = false 25 26 @Volatile 27 private var job: Job? = null 28 29 internal class State { 30 val state = atomic(0) 31 } 32 33 /** 34 * Tests handlers not being invoked more than once. 35 */ 36 @Test 37 fun testStress() { 38 println("--- JobHandlersUpgradeStressTest") 39 threads += thread(name = "creator", start = false) { 40 val rnd = Random() 41 while (true) { 42 job = if (done) null else Job() 43 cyclicBarrier.await() 44 val job = job ?: break 45 // burn some time 46 repeat(rnd.nextInt(3000)) { sink.incrementAndGet() } 47 // cancel job 48 job.cancel() 49 cyclicBarrier.await() 50 inters.incrementAndGet() 51 } 52 } 53 threads += List(nThreads) { threadId -> 54 thread(name = "handler-$threadId", start = false) { 55 val rnd = Random() 56 while (true) { 57 val onCancelling = rnd.nextBoolean() 58 val invokeImmediately: Boolean = rnd.nextBoolean() 59 cyclicBarrier.await() 60 val job = job ?: break 61 val state = State() 62 // burn some time 63 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } 64 val handle = 65 job.invokeOnCompletion(onCancelling = onCancelling, invokeImmediately = invokeImmediately) { 66 if (!state.state.compareAndSet(0, 1)) 67 error("Fired more than once or too late: state=${state.state.value}") 68 } 69 // burn some time 70 repeat(rnd.nextInt(1000)) { sink.incrementAndGet() } 71 // dispose 72 handle.dispose() 73 cyclicBarrier.await() 74 val resultingState = state.state.value 75 when (resultingState) { 76 0 -> removed.incrementAndGet() 77 1 -> fired.incrementAndGet() 78 else -> error("Cannot happen") 79 } 80 if (!state.state.compareAndSet(resultingState, 2)) 81 error("Cannot fire late: resultingState=$resultingState") 82 } 83 } 84 } 85 threads.forEach { it.start() } 86 repeat(nSeconds) { second -> 87 Thread.sleep(1000) 88 println("${second + 1}: ${inters.value} iterations") 89 } 90 done = true 91 threads.forEach { it.join() } 92 println(" Completed ${inters.value} iterations") 93 println(" Removed handler ${removed.value} times") 94 println(" Fired handler ${fired.value} times") 95 96 } 97 } 98