• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactive
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.selects.*
7 import org.junit.Test
8 import org.junit.runner.*
9 import org.junit.runners.*
10 import kotlin.test.*
11 
12 @RunWith(Parameterized::class)
13 class PublisherSubscriptionSelectTest(private val request: Int) : TestBase() {
14     companion object {
15         @Parameterized.Parameters(name = "request = {0}")
16         @JvmStatic
17         fun params(): Collection<Array<Any>> = listOf(0, 1, 10).map { arrayOf<Any>(it) }
18     }
19 
20     @Test
21     fun testSelect() = runTest {
22         // source with n ints
23         val n = 1000 * stressTestMultiplier
24         val source = publish { repeat(n) { send(it) } }
25         var a = 0
26         var b = 0
27         // open two subs
28         val channelA = source.toChannel(request)
29         val channelB = source.toChannel(request)
30         loop@ while (true) {
31             val done: Int = select {
32                 channelA.onReceiveCatching { result ->
33                     result.onSuccess { assertEquals(a++, it) }
34                     if (result.isSuccess) 1 else 0
35                 }
36                 channelB.onReceiveCatching { result ->
37                     result.onSuccess { assertEquals(b++, it) }
38                     if (result.isSuccess) 2 else 0
39                 }
40             }
41             when (done) {
42                 0 -> break@loop
43                 1 -> {
44                     val r = channelB.receiveCatching().getOrNull()
45                     if (r != null) assertEquals(b++, r)
46                 }
47                 2 -> {
48                     val r = channelA.receiveCatching().getOrNull()
49                     if (r != null) assertEquals(a++, r)
50                 }
51             }
52         }
53 
54         channelA.cancel()
55         channelB.cancel()
56         // should receive one of them fully
57         assertTrue(a == n || b == n)
58     }
59 }
60