• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.selects.*
7 import org.junit.Test
8 import java.io.*
9 import kotlin.test.*
10 
11 /**
12  * Test emitting multiple values with [rxObservable].
13  */
14 class ObservableMultiTest : TestBase() {
15     @Test
16     fun testNumbers() {
17         val n = 100 * stressTestMultiplier
18         val observable = rxObservable {
19             repeat(n) { send(it) }
20         }
21         checkSingleValue(observable.toList()) { list ->
22             assertEquals((0 until n).toList(), list)
23         }
24     }
25 
26 
27     @Test
28     fun testConcurrentStress() {
29         val n = 10_000 * stressTestMultiplier
30         val observable = rxObservable {
31             newCoroutineContext(coroutineContext)
32             // concurrent emitters (many coroutines)
33             val jobs = List(n) {
34                 // launch
35                 launch(Dispatchers.Default) {
36                     val i = it
37                     send(i)
38                 }
39             }
40             jobs.forEach { it.join() }
41         }
42         checkSingleValue(observable.toList()) { list ->
43             assertEquals(n, list.size)
44             assertEquals((0 until n).toList(), list.sorted())
45         }
46     }
47 
48     @Test
49     fun testConcurrentStressOnSend() {
50         val n = 10_000 * stressTestMultiplier
51         val observable = rxObservable<Int> {
52             newCoroutineContext(coroutineContext)
53             // concurrent emitters (many coroutines)
54             val jobs = List(n) {
55                 // launch
56                 launch(Dispatchers.Default) {
57                     val i = it
58                     select<Unit> {
59                         onSend(i) {}
60                     }
61                 }
62             }
63             jobs.forEach { it.join() }
64         }
65         checkSingleValue(observable.toList()) { list ->
66             assertEquals(n, list.size)
67             assertEquals((0 until n).toList(), list.sorted())
68         }
69     }
70 
71     @Test
72     fun testIteratorResendUnconfined() {
73         val n = 10_000 * stressTestMultiplier
74         val observable = rxObservable(Dispatchers.Unconfined) {
75             Observable.range(0, n).collect { send(it) }
76         }
77         checkSingleValue(observable.toList()) { list ->
78             assertEquals((0 until n).toList(), list)
79         }
80     }
81 
82     @Test
83     fun testIteratorResendPool() {
84         val n = 10_000 * stressTestMultiplier
85         val observable = rxObservable {
86             Observable.range(0, n).collect { send(it) }
87         }
88         checkSingleValue(observable.toList()) { list ->
89             assertEquals((0 until n).toList(), list)
90         }
91     }
92 
93     @Test
94     fun testSendAndCrash() {
95         val observable = rxObservable {
96             send("O")
97             throw IOException("K")
98         }
99         val single = rxSingle {
100             var result = ""
101             try {
102                 observable.collect { result += it }
103             } catch(e: IOException) {
104                 result += e.message
105             }
106             result
107         }
108         checkSingleValue(single) {
109             assertEquals("OK", it)
110         }
111     }
112 }
113