• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.disposables.*
9 import io.reactivex.rxjava3.exceptions.*
10 import io.reactivex.rxjava3.functions.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.CancellationException
13 import org.junit.*
14 import org.junit.Test
15 import java.util.concurrent.*
16 import kotlin.test.*
17 
18 class SingleTest : TestBase() {
19     @Before
20     fun setup() {
21         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22     }
23 
24     @Test
25     fun testBasicSuccess() = runBlocking {
26         expect(1)
27         val single = rxSingle(currentDispatcher()) {
28             expect(4)
29             "OK"
30         }
31         expect(2)
32         single.subscribe { value ->
33             expect(5)
34             assertEquals("OK", value)
35         }
36         expect(3)
37         yield() // to started coroutine
38         finish(6)
39     }
40 
41     @Test
42     fun testBasicFailure() = runBlocking {
43         expect(1)
44         val single = rxSingle(currentDispatcher()) {
45             expect(4)
46             throw RuntimeException("OK")
47         }
48         expect(2)
49         single.subscribe({
50             expectUnreached()
51         }, { error ->
52             expect(5)
53             assertTrue(error is RuntimeException)
54             assertEquals("OK", error.message)
55         })
56         expect(3)
57         yield() // to started coroutine
58         finish(6)
59     }
60 
61 
62     @Test
63     fun testBasicUnsubscribe() = runBlocking {
64         expect(1)
65         val single = rxSingle(currentDispatcher()) {
66             expect(4)
67             yield() // back to main, will get cancelled
68             expectUnreached()
69 
70         }
71         expect(2)
72         // nothing is called on a disposed rx3 single
73         val sub = single.subscribe({
74             expectUnreached()
75         }, {
76             expectUnreached()
77         })
78         expect(3)
79         yield() // to started coroutine
80         expect(5)
81         sub.dispose() // will cancel coroutine
82         yield()
83         finish(6)
84     }
85 
86     @Test
87     fun testSingleNoWait() {
88         val single = rxSingle {
89             "OK"
90         }
91 
92         checkSingleValue(single) {
93             assertEquals("OK", it)
94         }
95     }
96 
97     @Test
98     fun testSingleAwait() = runBlocking {
99         assertEquals("OK", Single.just("O").await() + "K")
100     }
101 
102     /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
103      * [Job] is cancelled. */
104     @Test
105     fun testSingleAwaitCancellation() = runTest {
106         expect(1)
107         val single = SingleSource<Int> { s ->
108             s.onSubscribe(object: Disposable {
109                 override fun dispose() { expect(4) }
110                 override fun isDisposed(): Boolean { expectUnreached(); return false }
111             })
112         }
113         val job = launch(start = CoroutineStart.UNDISPATCHED) {
114             try {
115                 expect(2)
116                 single.await()
117             } catch (e: CancellationException) {
118                 expect(5)
119                 throw e
120             }
121         }
122         expect(3)
123         job.cancelAndJoin()
124         finish(6)
125     }
126 
127     @Test
128     fun testSingleEmitAndAwait() {
129         val single = rxSingle {
130             Single.just("O").await() + "K"
131         }
132 
133         checkSingleValue(single) {
134             assertEquals("OK", it)
135         }
136     }
137 
138     @Test
139     fun testSingleWithDelay() {
140         val single = rxSingle {
141             Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
142         }
143 
144         checkSingleValue(single) {
145             assertEquals("OK", it)
146         }
147     }
148 
149     @Test
150     fun testSingleException() {
151         val single = rxSingle {
152             Observable.just("O", "K").awaitSingle() + "K"
153         }
154 
155         checkErroneous(single) {
156             assert(it is IllegalArgumentException)
157         }
158     }
159 
160     @Test
161     fun testAwaitFirst() {
162         val single = rxSingle {
163             Observable.just("O", "#").awaitFirst() + "K"
164         }
165 
166         checkSingleValue(single) {
167             assertEquals("OK", it)
168         }
169     }
170 
171     @Test
172     fun testAwaitLast() {
173         val single = rxSingle {
174             Observable.just("#", "O").awaitLast() + "K"
175         }
176 
177         checkSingleValue(single) {
178             assertEquals("OK", it)
179         }
180     }
181 
182     @Test
183     fun testExceptionFromObservable() {
184         val single = rxSingle {
185             try {
186                 Observable.error<String>(RuntimeException("O")).awaitFirst()
187             } catch (e: RuntimeException) {
188                 Observable.just(e.message!!).awaitLast() + "K"
189             }
190         }
191 
192         checkSingleValue(single) {
193             assertEquals("OK", it)
194         }
195     }
196 
197     @Test
198     fun testExceptionFromCoroutine() {
199         val single = rxSingle<String> {
200             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
201         }
202 
203         checkErroneous(single) {
204             assert(it is IllegalStateException)
205             assertEquals("OK", it.message)
206         }
207     }
208 
209     @Test
210     fun testSuppressedException() = runTest {
211         val single = rxSingle(currentDispatcher()) {
212             launch(start = CoroutineStart.ATOMIC) {
213                 throw TestException() // child coroutine fails
214             }
215             try {
216                 delay(Long.MAX_VALUE)
217             } finally {
218                 throw TestException2() // but parent throws another exception while cleaning up
219             }
220         }
221         try {
222             single.await()
223             expectUnreached()
224         } catch (e: TestException) {
225             assertTrue(e.suppressed[0] is TestException2)
226         }
227     }
228 
229     @Test
230     fun testFatalExceptionInSubscribe() = runTest {
231         val handler = { e: Throwable ->
232             assertTrue(e is UndeliverableException && e.cause is LinkageError)
233             expect(2)
234         }
235         withExceptionHandler(handler) {
236             rxSingle(Dispatchers.Unconfined) {
237                 expect(1)
238                 42
239             }.subscribe(Consumer {
240                 throw LinkageError()
241             })
242             finish(3)
243         }
244     }
245 
246     @Test
247     fun testFatalExceptionInSingle() = runTest {
248         rxSingle(Dispatchers.Unconfined) {
249             throw LinkageError()
250         }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) }
251 
252         finish(2)
253     }
254 
255     @Test
256     fun testUnhandledException() = runTest {
257         expect(1)
258         var disposable: Disposable? = null
259         val handler = { e: Throwable ->
260             assertTrue(e is UndeliverableException && e.cause is TestException)
261             expect(5)
262         }
263         val single = rxSingle(currentDispatcher()) {
264             expect(4)
265             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
266             try {
267                 delay(Long.MAX_VALUE)
268             } finally {
269                 throw TestException() // would not be able to handle it since mono is disposed
270             }
271         }
272         withExceptionHandler(handler) {
273             single.subscribe(object : SingleObserver<Unit> {
274                 override fun onSubscribe(d: Disposable) {
275                     expect(2)
276                     disposable = d
277                 }
278 
279                 override fun onSuccess(t: Unit) {
280                     expectUnreached()
281                 }
282 
283                 override fun onError(t: Throwable) {
284                     expectUnreached()
285                 }
286             })
287             expect(3)
288             yield() // run coroutine
289             finish(6)
290         }
291     }
292 }
293