• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.selects.*
10 import org.junit.Test
11 import java.io.*
12 import kotlin.test.*
13 
14 /**
15  * Test emitting multiple values with [rxObservable].
16  */
17 class ObservableMultiTest : TestBase() {
18     @Test
19     fun testNumbers() {
20         val n = 100 * stressTestMultiplier
21         val observable = rxObservable {
22             repeat(n) { send(it) }
23         }
24         checkSingleValue(observable.toList()) { list ->
25             assertEquals((0 until n).toList(), list)
26         }
27     }
28 
29 
30     @Test
31     fun testConcurrentStress() {
32         val n = 10_000 * stressTestMultiplier
33         val observable = rxObservable {
34             newCoroutineContext(coroutineContext)
35             // concurrent emitters (many coroutines)
36             val jobs = List(n) {
37                 // launch
38                 launch {
39                     val i = it
40                     send(i)
41                 }
42             }
43             jobs.forEach { it.join() }
44         }
45         checkSingleValue(observable.toList()) { list ->
46             assertEquals(n, list.size)
47             assertEquals((0 until n).toList(), list.sorted())
48         }
49     }
50 
51     @Test
52     fun testConcurrentStressOnSend() {
53         val n = 10_000 * stressTestMultiplier
54         val observable = rxObservable<Int> {
55             newCoroutineContext(coroutineContext)
56             // concurrent emitters (many coroutines)
57             val jobs = List(n) {
58                 // launch
59                 launch(Dispatchers.Default) {
60                     val i = it
61                     select<Unit> {
62                         onSend(i) {}
63                     }
64                 }
65             }
66             jobs.forEach { it.join() }
67         }
68         checkSingleValue(observable.toList()) { list ->
69             assertEquals(n, list.size)
70             assertEquals((0 until n).toList(), list.sorted())
71         }
72     }
73 
74     @Test
75     fun testIteratorResendUnconfined() {
76         val n = 10_000 * stressTestMultiplier
77         val observable = rxObservable(Dispatchers.Unconfined) {
78             Observable.range(0, n).collect { send(it) }
79         }
80         checkSingleValue(observable.toList()) { list ->
81             assertEquals((0 until n).toList(), list)
82         }
83     }
84 
85     @Test
86     fun testIteratorResendPool() {
87         val n = 10_000 * stressTestMultiplier
88         val observable = rxObservable {
89             Observable.range(0, n).collect { send(it) }
90         }
91         checkSingleValue(observable.toList()) { list ->
92             assertEquals((0 until n).toList(), list)
93         }
94     }
95 
96     @Test
97     fun testSendAndCrash() {
98         val observable = rxObservable {
99             send("O")
100             throw IOException("K")
101         }
102         val single = rxSingle {
103             var result = ""
104             try {
105                 observable.collect { result += it }
106             } catch(e: IOException) {
107                 result += e.message
108             }
109             result
110         }
111         checkSingleValue(single) {
112             assertEquals("OK", it)
113         }
114     }
115 }
116