<lambda>null1package kotlinx.coroutines 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.atomicfu.* 5 import org.junit.* 6 import java.util.concurrent.* 7 import kotlin.test.* 8 import kotlin.test.Test 9 10 class CancellableContinuationResumeCloseStressTest : TestBase() { 11 @get:Rule 12 public val dispatcher = ExecutorRule(2) 13 14 private val startBarrier = CyclicBarrier(3) 15 private val doneBarrier = CyclicBarrier(2) 16 private val nRepeats = 1_000 * stressTestMultiplier 17 18 private val closed = atomic(false) 19 private var returnedOk = false 20 21 @Test 22 @Suppress("BlockingMethodInNonBlockingContext") 23 fun testStress() = runTest { 24 repeat(nRepeats) { 25 closed.value = false 26 returnedOk = false 27 val job = testJob() 28 startBarrier.await() 29 job.cancel() // (1) cancel job 30 job.join() 31 // check consistency 32 doneBarrier.await() 33 if (returnedOk) { 34 assertFalse(closed.value, "should not have closed resource -- returned Ok") 35 } else { 36 assertTrue(closed.value, "should have closed resource -- was cancelled") 37 } 38 } 39 } 40 41 private fun CoroutineScope.testJob(): Job = launch(dispatcher, start = CoroutineStart.ATOMIC) { 42 val ok = resumeClose() // might be cancelled 43 assertEquals("OK", ok) 44 returnedOk = true 45 } 46 47 private suspend fun resumeClose() = suspendCancellableCoroutine<String> { cont -> 48 dispatcher.executor.execute { 49 startBarrier.await() // (2) resume at the same time 50 cont.resume("OK") { 51 close() 52 } 53 doneBarrier.await() 54 } 55 startBarrier.await() // (3) return at the same time 56 } 57 58 fun close() { 59 assertFalse(closed.getAndSet(true)) 60 } 61 } 62