• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.CancellationException
11 import org.junit.*
12 import org.junit.Test
13 import java.util.concurrent.*
14 import kotlin.test.*
15 
16 class ObservableSingleTest : TestBase() {
17     @Before
18     fun setup() {
19         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
20     }
21 
22     @Test
23     fun testSingleNoWait() {
24         val observable = rxObservable {
25             send("OK")
26         }
27 
28         checkSingleValue(observable) {
29             assertEquals("OK", it)
30         }
31     }
32 
33     @Test
34     fun testSingleAwait() = runBlocking {
35         assertEquals("OK", Observable.just("O").awaitSingle() + "K")
36     }
37 
38     @Test
39     fun testSingleEmitAndAwait() {
40         val observable = rxObservable {
41             send(Observable.just("O").awaitSingle() + "K")
42         }
43 
44         checkSingleValue(observable) {
45             assertEquals("OK", it)
46         }
47     }
48 
49     @Test
50     fun testSingleWithDelay() {
51         val observable = rxObservable {
52             send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
53         }
54 
55         checkSingleValue(observable) {
56             assertEquals("OK", it)
57         }
58     }
59 
60     @Test
61     fun testSingleException() {
62         val observable = rxObservable {
63             send(Observable.just("O", "K").awaitSingle() + "K")
64         }
65 
66         checkErroneous(observable) {
67             assertTrue(it is IllegalArgumentException)
68         }
69     }
70 
71     @Test
72     fun testAwaitFirst() {
73         val observable = rxObservable {
74             send(Observable.just("O", "#").awaitFirst() + "K")
75         }
76 
77         checkSingleValue(observable) {
78             assertEquals("OK", it)
79         }
80     }
81 
82     @Test
83     fun testAwaitFirstOrDefault() {
84         val observable = rxObservable {
85             send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
86         }
87 
88         checkSingleValue(observable) {
89             assertEquals("OK", it)
90         }
91     }
92 
93     @Test
94     fun testAwaitFirstOrDefaultWithValues() {
95         val observable = rxObservable {
96             send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
97         }
98 
99         checkSingleValue(observable) {
100             assertEquals("OK", it)
101         }
102     }
103 
104     @Test
105     fun testAwaitFirstOrNull() {
106         val observable = rxObservable {
107             send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
108         }
109 
110         checkSingleValue(observable) {
111             assertEquals("OK", it)
112         }
113     }
114 
115     @Test
116     fun testAwaitFirstOrNullWithValues() {
117         val observable = rxObservable {
118             send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
119         }
120 
121         checkSingleValue(observable) {
122             assertEquals("OK", it)
123         }
124     }
125 
126     @Test
127     fun testAwaitFirstOrElse() {
128         val observable = rxObservable {
129             send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
130         }
131 
132         checkSingleValue(observable) {
133             assertEquals("OK", it)
134         }
135     }
136 
137     @Test
138     fun testAwaitFirstOrElseWithValues() {
139         val observable = rxObservable {
140             send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
141         }
142 
143         checkSingleValue(observable) {
144             assertEquals("OK", it)
145         }
146     }
147 
148     @Test
149     fun testAwaitLast() {
150         val observable = rxObservable {
151             send(Observable.just("#", "O").awaitLast() + "K")
152         }
153 
154         checkSingleValue(observable) {
155             assertEquals("OK", it)
156         }
157     }
158 
159     /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
160      * the subscription when their [Job] is cancelled. */
161     @Test
162     fun testAwaitCancellation() = runTest {
163         expect(1)
164         val observable = ObservableSource<Int> { s ->
165             s.onSubscribe(object: Disposable {
166                 override fun dispose() { expect(4) }
167                 override fun isDisposed(): Boolean { expectUnreached(); return false }
168             })
169         }
170         val job = launch(start = CoroutineStart.UNDISPATCHED) {
171             try {
172                 expect(2)
173                 observable.awaitFirst()
174             } catch (e: CancellationException) {
175                 expect(5)
176                 throw e
177             }
178         }
179         expect(3)
180         job.cancelAndJoin()
181         finish(6)
182     }
183 
184 
185     @Test
186     fun testExceptionFromObservable() {
187         val observable = rxObservable {
188             try {
189                 send(Observable.error<String>(RuntimeException("O")).awaitFirst())
190             } catch (e: RuntimeException) {
191                 send(Observable.just(e.message!!).awaitLast() + "K")
192             }
193         }
194 
195         checkSingleValue(observable) {
196             assertEquals("OK", it)
197         }
198     }
199 
200     @Test
201     fun testExceptionFromCoroutine() {
202         val observable = rxObservable<String> {
203             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
204         }
205 
206         checkErroneous(observable) {
207             assertTrue(it is IllegalStateException)
208             assertEquals("OK", it.message)
209         }
210     }
211 
212     @Test
213     fun testObservableIteration() {
214         val observable = rxObservable {
215             var result = ""
216             Observable.just("O", "K").collect { result += it }
217             send(result)
218         }
219 
220         checkSingleValue(observable) {
221             assertEquals("OK", it)
222         }
223     }
224 
225     @Test
226     fun testObservableIterationFailure() {
227         val observable = rxObservable {
228             try {
229                 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
230                 send("Fail")
231             } catch (e: RuntimeException) {
232                 send(e.message!!)
233             }
234         }
235 
236         checkSingleValue(observable) {
237             assertEquals("OK", it)
238         }
239     }
240 }
241