• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.*
6 import org.reactivestreams.*
7 import java.util.concurrent.locks.*
8 
9 /**
10  * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods
11  */
12 class AwaitCancellationStressTest : TestBase() {
13     private val iterations = 10_000 * stressTestMultiplier
14 
15     @Test
16     fun testAwaitCancellationOrder() = runTest {
17         repeat(iterations) {
18             val job = launch(Dispatchers.Default) {
19                 testPublisher().awaitFirst()
20             }
21             job.cancelAndJoin()
22         }
23     }
24 
25     private fun testPublisher() = Publisher<Int> { s ->
26         val lock = ReentrantLock()
27         s.onSubscribe(object : Subscription {
28             override fun request(n: Long) {
29                 check(lock.tryLock())
30                 s.onNext(42)
31                 lock.unlock()
32             }
33 
34             override fun cancel() {
35                 check(lock.tryLock())
36                 lock.unlock()
37             }
38         })
39     }
40 }
41