• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.selects.*
7 import org.junit.Test
8 import kotlin.onSuccess
9 import kotlin.test.*
10 
11 class ObservableSubscriptionSelectTest : TestBase() {
12     @Test
13     fun testSelect() = runTest {
14         // source with n ints
15         val n = 1000 * stressTestMultiplier
16         val source = rxObservable { repeat(n) { send(it) } }
17         var a = 0
18         var b = 0
19         // open two subs
20         val channelA = source.openSubscription()
21         val channelB = source.openSubscription()
22         loop@ while (true) {
23             val done: Int = select {
24                 channelA.onReceiveCatching { result ->
25                     result.onSuccess { assertEquals(a++, it) }
26                     if (result.isSuccess) 1 else 0
27                 }
28                 channelB.onReceiveCatching { result ->
29                     result.onSuccess { assertEquals(b++, it) }
30                     if (result.isSuccess) 2 else 0
31                 }
32             }
33             when (done) {
34                 0 -> break@loop
35                 1 -> {
36                     val r = channelB.receiveCatching().getOrNull()
37                     if (r != null) assertEquals(b++, r)
38                 }
39                 2 -> {
40                     val r = channelA.receiveCatching().getOrNull()
41                     if (r != null) assertEquals(a++, r)
42                 }
43             }
44         }
45         channelA.cancel()
46         channelB.cancel()
47         // should receive one of them fully
48         assertTrue(a == n || b == n)
49     }
50 }
51