1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.* 10 import java.util.* 11 import kotlin.coroutines.* 12 13 class FluxCompletionStressTest : TestBase() { 14 private val N_REPEATS = 10_000 * stressTestMultiplier 15 16 private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flux(context) { 17 for (x in start until start + count) send(x) 18 } 19 20 @Test 21 fun testCompletion() { 22 val rnd = Random() 23 repeat(N_REPEATS) { 24 val count = rnd.nextInt(5) 25 runBlocking { 26 withTimeout(5000) { 27 var received = 0 28 range(Dispatchers.Default, 1, count).collect { x -> 29 received++ 30 if (x != received) error("$x != $received") 31 } 32 if (received != count) error("$received != $count") 33 } 34 } 35 } 36 } 37 }