• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import org.junit.*
9 import org.reactivestreams.*
10 import java.util.concurrent.locks.*
11 
12 /**
13  * This test checks implementation of rule 2.7 for await methods - serial execution of subscription methods
14  */
15 class AwaitCancellationStressTest : TestBase() {
16     private val iterations = 10_000 * stressTestMultiplier
17 
18     @Test
19     fun testAwaitCancellationOrder() = runTest {
20         repeat(iterations) {
21             val job = launch(Dispatchers.Default) {
22                 testPublisher().awaitFirst()
23             }
24             job.cancelAndJoin()
25         }
26     }
27 
28     private fun testPublisher() = Publisher<Int> { s ->
29         val lock = ReentrantLock()
30         s.onSubscribe(object : Subscription {
31             override fun request(n: Long) {
32                 check(lock.tryLock())
33                 s.onNext(42)
34                 lock.unlock()
35             }
36 
37             override fun cancel() {
38                 check(lock.tryLock())
39                 lock.unlock()
40             }
41         })
42     }
43 }
44