<lambda>null1 package kotlinx.coroutines.rx3
2
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.CancellationException
8 import org.junit.*
9 import org.junit.Test
10 import java.util.concurrent.*
11 import kotlin.test.*
12
13 class ObservableSingleTest : TestBase() {
14 @Before
15 fun setup() {
16 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17 }
18
19 @Test
20 fun testSingleNoWait() {
21 val observable = rxObservable {
22 send("OK")
23 }
24
25 checkSingleValue(observable) {
26 assertEquals("OK", it)
27 }
28 }
29
30 @Test
31 fun testSingleAwait() = runBlocking {
32 assertEquals("OK", Observable.just("O").awaitSingle() + "K")
33 }
34
35 @Test
36 fun testSingleEmitAndAwait() {
37 val observable = rxObservable {
38 send(Observable.just("O").awaitSingle() + "K")
39 }
40
41 checkSingleValue(observable) {
42 assertEquals("OK", it)
43 }
44 }
45
46 @Test
47 fun testSingleWithDelay() {
48 val observable = rxObservable {
49 send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
50 }
51
52 checkSingleValue(observable) {
53 assertEquals("OK", it)
54 }
55 }
56
57 @Test
58 fun testSingleException() {
59 val observable = rxObservable {
60 send(Observable.just("O", "K").awaitSingle() + "K")
61 }
62
63 checkErroneous(observable) {
64 assertIs<IllegalArgumentException>(it)
65 }
66 }
67
68 @Test
69 fun testAwaitFirst() {
70 val observable = rxObservable {
71 send(Observable.just("O", "#").awaitFirst() + "K")
72 }
73
74 checkSingleValue(observable) {
75 assertEquals("OK", it)
76 }
77 }
78
79 @Test
80 fun testAwaitFirstOrDefault() {
81 val observable = rxObservable {
82 send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
83 }
84
85 checkSingleValue(observable) {
86 assertEquals("OK", it)
87 }
88 }
89
90 @Test
91 fun testAwaitFirstOrDefaultWithValues() {
92 val observable = rxObservable {
93 send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
94 }
95
96 checkSingleValue(observable) {
97 assertEquals("OK", it)
98 }
99 }
100
101 @Test
102 fun testAwaitFirstOrNull() {
103 val observable = rxObservable {
104 send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
105 }
106
107 checkSingleValue(observable) {
108 assertEquals("OK", it)
109 }
110 }
111
112 @Test
113 fun testAwaitFirstOrNullWithValues() {
114 val observable = rxObservable {
115 send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
116 }
117
118 checkSingleValue(observable) {
119 assertEquals("OK", it)
120 }
121 }
122
123 @Test
124 fun testAwaitFirstOrElse() {
125 val observable = rxObservable {
126 send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
127 }
128
129 checkSingleValue(observable) {
130 assertEquals("OK", it)
131 }
132 }
133
134 @Test
135 fun testAwaitFirstOrElseWithValues() {
136 val observable = rxObservable {
137 send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
138 }
139
140 checkSingleValue(observable) {
141 assertEquals("OK", it)
142 }
143 }
144
145 @Test
146 fun testAwaitLast() {
147 val observable = rxObservable {
148 send(Observable.just("#", "O").awaitLast() + "K")
149 }
150
151 checkSingleValue(observable) {
152 assertEquals("OK", it)
153 }
154 }
155
156 /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
157 * the subscription when their [Job] is cancelled. */
158 @Test
159 fun testAwaitCancellation() = runTest {
160 expect(1)
161 val observable = ObservableSource<Int> { s ->
162 s.onSubscribe(object: Disposable {
163 override fun dispose() { expect(4) }
164 override fun isDisposed(): Boolean { expectUnreached(); return false }
165 })
166 }
167 val job = launch(start = CoroutineStart.UNDISPATCHED) {
168 try {
169 expect(2)
170 observable.awaitFirst()
171 } catch (e: CancellationException) {
172 expect(5)
173 throw e
174 }
175 }
176 expect(3)
177 job.cancelAndJoin()
178 finish(6)
179 }
180
181
182 @Test
183 fun testExceptionFromObservable() {
184 val observable = rxObservable {
185 try {
186 send(Observable.error<String>(RuntimeException("O")).awaitFirst())
187 } catch (e: RuntimeException) {
188 send(Observable.just(e.message!!).awaitLast() + "K")
189 }
190 }
191
192 checkSingleValue(observable) {
193 assertEquals("OK", it)
194 }
195 }
196
197 @Test
198 fun testExceptionFromCoroutine() {
199 val observable = rxObservable<String> {
200 throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
201 }
202
203 checkErroneous(observable) {
204 assertIs<IllegalStateException>(it)
205 assertEquals("OK", it.message)
206 }
207 }
208
209 @Test
210 fun testObservableIteration() {
211 val observable = rxObservable {
212 var result = ""
213 Observable.just("O", "K").collect { result += it }
214 send(result)
215 }
216
217 checkSingleValue(observable) {
218 assertEquals("OK", it)
219 }
220 }
221
222 @Test
223 fun testObservableIterationFailure() {
224 val observable = rxObservable {
225 try {
226 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
227 send("Fail")
228 } catch (e: RuntimeException) {
229 send(e.message!!)
230 }
231 }
232
233 checkSingleValue(observable) {
234 assertEquals("OK", it)
235 }
236 }
237 }
238