<lambda>null1package kotlinx.coroutines.reactive 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import org.junit.* 6 import java.util.* 7 import kotlin.coroutines.* 8 9 class PublisherCompletionStressTest : TestBase() { 10 private val N_REPEATS = 10_000 * stressTestMultiplier 11 12 private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish(context) { 13 for (x in start until start + count) send(x) 14 } 15 16 @Test 17 fun testCompletion() { 18 val rnd = Random() 19 repeat(N_REPEATS) { 20 val count = rnd.nextInt(5) 21 runBlocking { 22 withTimeout(5000) { 23 var received = 0 24 range(Dispatchers.Default, 1, count).collect { x -> 25 received++ 26 if (x != received) error("$x != $received") 27 } 28 if (received != count) error("$received != $count") 29 } 30 } 31 } 32 } 33 }