<lambda>null1package kotlinx.coroutines.reactor 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.reactive.* 6 import org.junit.* 7 import java.util.* 8 import kotlin.coroutines.* 9 10 class FluxCompletionStressTest : TestBase() { 11 private val N_REPEATS = 10_000 * stressTestMultiplier 12 13 private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flux(context) { 14 for (x in start until start + count) send(x) 15 } 16 17 @Test 18 fun testCompletion() { 19 val rnd = Random() 20 repeat(N_REPEATS) { 21 val count = rnd.nextInt(5) 22 runBlocking { 23 withTimeout(5000) { 24 var received = 0 25 range(Dispatchers.Default, 1, count).collect { x -> 26 received++ 27 if (x != received) error("$x != $received") 28 } 29 if (received != count) error("$received != $count") 30 } 31 } 32 } 33 } 34 }