<lambda>null1 package kotlinx.coroutines.rx2
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.selects.*
7 import org.junit.Test
8 import java.io.*
9 import kotlin.test.*
10
11 /**
12 * Test emitting multiple values with [rxObservable].
13 */
14 class ObservableMultiTest : TestBase() {
15 @Test
16 fun testNumbers() {
17 val n = 100 * stressTestMultiplier
18 val observable = rxObservable {
19 repeat(n) { send(it) }
20 }
21 checkSingleValue(observable.toList()) { list ->
22 assertEquals((0 until n).toList(), list)
23 }
24 }
25
26
27 @Test
28 fun testConcurrentStress() {
29 val n = 10_000 * stressTestMultiplier
30 val observable = rxObservable {
31 newCoroutineContext(coroutineContext)
32 // concurrent emitters (many coroutines)
33 val jobs = List(n) {
34 // launch
35 launch {
36 val i = it
37 send(i)
38 }
39 }
40 jobs.forEach { it.join() }
41 }
42 checkSingleValue(observable.toList()) { list ->
43 assertEquals(n, list.size)
44 assertEquals((0 until n).toList(), list.sorted())
45 }
46 }
47
48 @Test
49 fun testConcurrentStressOnSend() {
50 val n = 10_000 * stressTestMultiplier
51 val observable = rxObservable<Int> {
52 newCoroutineContext(coroutineContext)
53 // concurrent emitters (many coroutines)
54 val jobs = List(n) {
55 // launch
56 launch(Dispatchers.Default) {
57 val i = it
58 select<Unit> {
59 onSend(i) {}
60 }
61 }
62 }
63 jobs.forEach { it.join() }
64 }
65 checkSingleValue(observable.toList()) { list ->
66 assertEquals(n, list.size)
67 assertEquals((0 until n).toList(), list.sorted())
68 }
69 }
70
71 @Test
72 fun testIteratorResendUnconfined() {
73 val n = 10_000 * stressTestMultiplier
74 val observable = rxObservable(Dispatchers.Unconfined) {
75 Observable.range(0, n).collect { send(it) }
76 }
77 checkSingleValue(observable.toList()) { list ->
78 assertEquals((0 until n).toList(), list)
79 }
80 }
81
82 @Test
83 fun testIteratorResendPool() {
84 val n = 10_000 * stressTestMultiplier
85 val observable = rxObservable {
86 Observable.range(0, n).collect { send(it) }
87 }
88 checkSingleValue(observable.toList()) { list ->
89 assertEquals((0 until n).toList(), list)
90 }
91 }
92
93 @Test
94 fun testSendAndCrash() {
95 val observable = rxObservable {
96 send("O")
97 throw IOException("K")
98 }
99 val single = rxSingle {
100 var result = ""
101 try {
102 observable.collect { result += it }
103 } catch(e: IOException) {
104 result += e.message
105 }
106 result
107 }
108 checkSingleValue(single) {
109 assertEquals("OK", it)
110 }
111 }
112 }
113