• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.selects.*
10 import org.junit.Test
11 import kotlin.onSuccess
12 import kotlin.test.*
13 
14 class ObservableSubscriptionSelectTest : TestBase() {
15     @Test
16     fun testSelect() = runTest {
17         // source with n ints
18         val n = 1000 * stressTestMultiplier
19         val source = rxObservable { repeat(n) { send(it) } }
20         var a = 0
21         var b = 0
22         // open two subs
23         val channelA = source.toChannel()
24         val channelB = source.toChannel()
25         loop@ while (true) {
26             val done: Int = select {
27                 channelA.onReceiveCatching { result ->
28                     result.onSuccess { assertEquals(a++, it) }
29                     if (result.isSuccess) 1 else 0
30                 }
31                 channelB.onReceiveCatching { result ->
32                     result.onSuccess { assertEquals(b++, it) }
33                     if (result.isSuccess) 2 else 0
34                 }
35             }
36             when (done) {
37                 0 -> break@loop
38                 1 -> {
39                     val r = channelB.receiveCatching().getOrNull()
40                     if (r != null) assertEquals(b++, r)
41                 }
42                 2 -> {
43                     val r = channelA.receiveCatching().getOrNull()
44                     if (r != null) assertEquals(a++, r)
45                 }
46             }
47         }
48         channelA.cancel()
49         channelB.cancel()
50         // should receive one of them fully
51         assertTrue(a == n || b == n)
52     }
53 }
54