• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.reactive.*
6 import org.junit.*
7 import java.util.*
8 import kotlin.coroutines.*
9 
10 class FluxCompletionStressTest : TestBase() {
11     private val N_REPEATS = 10_000 * stressTestMultiplier
12 
13     private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = flux(context) {
14         for (x in start until start + count) send(x)
15     }
16 
17     @Test
18     fun testCompletion() {
19         val rnd = Random()
20         repeat(N_REPEATS) {
21             val count = rnd.nextInt(5)
22             runBlocking {
23                 withTimeout(5000) {
24                     var received = 0
25                     range(Dispatchers.Default, 1, count).collect { x ->
26                         received++
27                         if (x != received) error("$x != $received")
28                     }
29                     if (received != count) error("$received != $count")
30                 }
31             }
32         }
33     }
34 }