1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import org.junit.* 9 import org.reactivestreams.* 10 import java.util.concurrent.locks.* 11 12 /** 13 * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods 14 */ 15 class AwaitCancellationStressTest : TestBase() { 16 private val iterations = 10_000 * stressTestMultiplier 17 18 @Test 19 fun testAwaitCancellationOrder() = runTest { 20 repeat(iterations) { 21 val job = launch(Dispatchers.Default) { 22 testPublisher().awaitFirst() 23 } 24 job.cancelAndJoin() 25 } 26 } 27 28 private fun testPublisher() = Publisher<Int> { s -> 29 val lock = ReentrantLock() 30 s.onSubscribe(object : Subscription { 31 override fun request(n: Long) { 32 check(lock.tryLock()) 33 s.onNext(42) 34 lock.unlock() 35 } 36 37 override fun cancel() { 38 check(lock.tryLock()) 39 lock.unlock() 40 } 41 }) 42 } 43 } 44