• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.CancellationException
8 import org.junit.*
9 import org.junit.Test
10 import java.util.concurrent.*
11 import kotlin.test.*
12 
13 class ObservableSingleTest : TestBase() {
14     @Before
15     fun setup() {
16         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
17     }
18 
19     @Test
20     fun testSingleNoWait() {
21         val observable = rxObservable {
22             send("OK")
23         }
24 
25         checkSingleValue(observable) {
26             assertEquals("OK", it)
27         }
28     }
29 
30     @Test
31     fun testSingleAwait() = runBlocking {
32         assertEquals("OK", Observable.just("O").awaitSingle() + "K")
33     }
34 
35     @Test
36     fun testSingleEmitAndAwait() {
37         val observable = rxObservable {
38             send(Observable.just("O").awaitSingle() + "K")
39         }
40 
41         checkSingleValue(observable) {
42             assertEquals("OK", it)
43         }
44     }
45 
46     @Test
47     fun testSingleWithDelay() {
48         val observable = rxObservable {
49             send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K")
50         }
51 
52         checkSingleValue(observable) {
53             assertEquals("OK", it)
54         }
55     }
56 
57     @Test
58     fun testSingleException() {
59         val observable = rxObservable {
60             send(Observable.just("O", "K").awaitSingle() + "K")
61         }
62 
63         checkErroneous(observable) {
64             assertIs<IllegalArgumentException>(it)
65         }
66     }
67 
68     @Test
69     fun testAwaitFirst() {
70         val observable = rxObservable {
71             send(Observable.just("O", "#").awaitFirst() + "K")
72         }
73 
74         checkSingleValue(observable) {
75             assertEquals("OK", it)
76         }
77     }
78 
79     @Test
80     fun testAwaitFirstOrDefault() {
81         val observable = rxObservable {
82             send(Observable.empty<String>().awaitFirstOrDefault("O") + "K")
83         }
84 
85         checkSingleValue(observable) {
86             assertEquals("OK", it)
87         }
88     }
89 
90     @Test
91     fun testAwaitFirstOrDefaultWithValues() {
92         val observable = rxObservable {
93             send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K")
94         }
95 
96         checkSingleValue(observable) {
97             assertEquals("OK", it)
98         }
99     }
100 
101     @Test
102     fun testAwaitFirstOrNull() {
103         val observable = rxObservable {
104             send(Observable.empty<String>().awaitFirstOrNull() ?: "OK")
105         }
106 
107         checkSingleValue(observable) {
108             assertEquals("OK", it)
109         }
110     }
111 
112     @Test
113     fun testAwaitFirstOrNullWithValues() {
114         val observable = rxObservable {
115             send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
116         }
117 
118         checkSingleValue(observable) {
119             assertEquals("OK", it)
120         }
121     }
122 
123     @Test
124     fun testAwaitFirstOrElse() {
125         val observable = rxObservable {
126             send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K")
127         }
128 
129         checkSingleValue(observable) {
130             assertEquals("OK", it)
131         }
132     }
133 
134     @Test
135     fun testAwaitFirstOrElseWithValues() {
136         val observable = rxObservable {
137             send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K")
138         }
139 
140         checkSingleValue(observable) {
141             assertEquals("OK", it)
142         }
143     }
144 
145     @Test
146     fun testAwaitLast() {
147         val observable = rxObservable {
148             send(Observable.just("#", "O").awaitLast() + "K")
149         }
150 
151         checkSingleValue(observable) {
152             assertEquals("OK", it)
153         }
154     }
155 
156     /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of
157      * the subscription when their [Job] is cancelled. */
158     @Test
159     fun testAwaitCancellation() = runTest {
160         expect(1)
161         val observable = ObservableSource<Int> { s ->
162             s.onSubscribe(object: Disposable {
163                 override fun dispose() { expect(4) }
164                 override fun isDisposed(): Boolean { expectUnreached(); return false }
165             })
166         }
167         val job = launch(start = CoroutineStart.UNDISPATCHED) {
168             try {
169                 expect(2)
170                 observable.awaitFirst()
171             } catch (e: CancellationException) {
172                 expect(5)
173                 throw e
174             }
175         }
176         expect(3)
177         job.cancelAndJoin()
178         finish(6)
179     }
180 
181 
182     @Test
183     fun testExceptionFromObservable() {
184         val observable = rxObservable {
185             try {
186                 send(Observable.error<String>(RuntimeException("O")).awaitFirst())
187             } catch (e: RuntimeException) {
188                 send(Observable.just(e.message!!).awaitLast() + "K")
189             }
190         }
191 
192         checkSingleValue(observable) {
193             assertEquals("OK", it)
194         }
195     }
196 
197     @Test
198     fun testExceptionFromCoroutine() {
199         val observable = rxObservable<String> {
200             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
201         }
202 
203         checkErroneous(observable) {
204             assertIs<IllegalStateException>(it)
205             assertEquals("OK", it.message)
206         }
207     }
208 
209     @Test
210     fun testObservableIteration() {
211         val observable = rxObservable {
212             var result = ""
213             Observable.just("O", "K").collect { result += it }
214             send(result)
215         }
216 
217         checkSingleValue(observable) {
218             assertEquals("OK", it)
219         }
220     }
221 
222     @Test
223     fun testObservableIterationFailure() {
224         val observable = rxObservable {
225             try {
226                 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
227                 send("Fail")
228             } catch (e: RuntimeException) {
229                 send(e.message!!)
230             }
231         }
232 
233         checkSingleValue(observable) {
234             assertEquals("OK", it)
235         }
236     }
237 }
238