1 package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import org.junit.Test 6 import org.reactivestreams.* 7 import reactor.core.* 8 import reactor.core.publisher.* 9 import kotlin.concurrent.* 10 import kotlin.test.* 11 12 class MonoAwaitStressTest: TestBase() { 13 private val N_REPEATS = 10_000 * stressTestMultiplier 14 15 private var completed: Boolean = false 16 17 private var thread: Thread? = null 18 19 /** 20 * Tests that [Mono.awaitSingleOrNull] does await [CoreSubscriber.onComplete] and does not return 21 * the value as soon as it has it. 22 */ 23 @Test <lambda>null24 fun testAwaitingRacingWithCompletion() = runTest { 25 val mono = object: Mono<Int>() { 26 override fun subscribe(s: CoreSubscriber<in Int>) { 27 s.onSubscribe(object : Subscription { 28 override fun request(n: Long) { 29 thread = thread { 30 s.onNext(1) 31 Thread.yield() 32 completed = true 33 s.onComplete() 34 } 35 } 36 37 override fun cancel() { 38 } 39 }) 40 } 41 } 42 repeat(N_REPEATS) { 43 thread = null 44 completed = false 45 val value = mono.awaitSingleOrNull() 46 assertTrue(completed, "iteration $it") 47 assertEquals(1, value) 48 thread!!.join() 49 } 50 } 51 } 52