• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.Test
6 import org.reactivestreams.*
7 import reactor.core.*
8 import reactor.core.publisher.*
9 import kotlin.concurrent.*
10 import kotlin.test.*
11 
12 class MonoAwaitStressTest: TestBase() {
13     private val N_REPEATS = 10_000 * stressTestMultiplier
14 
15     private var completed: Boolean = false
16 
17     private var thread: Thread? = null
18 
19     /**
20      * Tests that [Mono.awaitSingleOrNull] does await [CoreSubscriber.onComplete] and does not return
21      * the value as soon as it has it.
22      */
23     @Test
<lambda>null24     fun testAwaitingRacingWithCompletion() = runTest {
25         val mono = object: Mono<Int>() {
26             override fun subscribe(s: CoreSubscriber<in Int>) {
27                 s.onSubscribe(object : Subscription {
28                     override fun request(n: Long) {
29                         thread = thread {
30                             s.onNext(1)
31                             Thread.yield()
32                             completed = true
33                             s.onComplete()
34                         }
35                     }
36 
37                     override fun cancel() {
38                     }
39                 })
40             }
41         }
42         repeat(N_REPEATS) {
43             thread = null
44             completed = false
45             val value = mono.awaitSingleOrNull()
46             assertTrue(completed, "iteration $it")
47             assertEquals(1, value)
48             thread!!.join()
49         }
50     }
51 }
52