• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import org.junit.*
6 import java.util.*
7 import kotlin.coroutines.*
8 
9 class PublisherCompletionStressTest : TestBase() {
10     private val N_REPEATS = 10_000 * stressTestMultiplier
11 
12     private fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish(context) {
13         for (x in start until start + count) send(x)
14     }
15 
16     @Test
17     fun testCompletion() {
18         val rnd = Random()
19         repeat(N_REPEATS) {
20             val count = rnd.nextInt(5)
21             runBlocking {
22                 withTimeout(5000) {
23                     var received = 0
24                     range(Dispatchers.Default, 1, count).collect { x ->
25                         received++
26                         if (x != received) error("$x != $received")
27                     }
28                     if (received != count) error("$received != $count")
29                 }
30             }
31         }
32     }
33 }