• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.exceptions.*
10 import io.reactivex.functions.*
11 import kotlinx.coroutines.*
12 import org.junit.*
13 import org.junit.Test
14 import java.util.concurrent.*
15 import kotlin.test.*
16 
17 class SingleTest : TestBase() {
18     @Before
19     fun setup() {
20         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
21     }
22 
23     @Test
24     fun testBasicSuccess() = runBlocking {
25         expect(1)
26         val single = rxSingle(currentDispatcher()) {
27             expect(4)
28             "OK"
29         }
30         expect(2)
31         single.subscribe { value ->
32             expect(5)
33             assertEquals("OK", value)
34         }
35         expect(3)
36         yield() // to started coroutine
37         finish(6)
38     }
39 
40     @Test
41     fun testBasicFailure() = runBlocking {
42         expect(1)
43         val single = rxSingle(currentDispatcher()) {
44             expect(4)
45             throw RuntimeException("OK")
46         }
47         expect(2)
48         single.subscribe({
49             expectUnreached()
50         }, { error ->
51             expect(5)
52             assertTrue(error is RuntimeException)
53             assertEquals("OK", error.message)
54         })
55         expect(3)
56         yield() // to started coroutine
57         finish(6)
58     }
59 
60 
61     @Test
62     fun testBasicUnsubscribe() = runBlocking {
63         expect(1)
64         val single = rxSingle(currentDispatcher()) {
65             expect(4)
66             yield() // back to main, will get cancelled
67             expectUnreached()
68 
69         }
70         expect(2)
71         // nothing is called on a disposed rx2 single
72         val sub = single.subscribe({
73             expectUnreached()
74         }, {
75             expectUnreached()
76         })
77         expect(3)
78         yield() // to started coroutine
79         expect(5)
80         sub.dispose() // will cancel coroutine
81         yield()
82         finish(6)
83     }
84 
85     @Test
86     fun testSingleNoWait() {
87         val single = rxSingle {
88             "OK"
89         }
90 
91         checkSingleValue(single) {
92             assertEquals("OK", it)
93         }
94     }
95 
96     @Test
97     fun testSingleAwait() = runBlocking {
98         assertEquals("OK", Single.just("O").await() + "K")
99     }
100 
101     @Test
102     fun testSingleEmitAndAwait() {
103         val single = rxSingle {
104             Single.just("O").await() + "K"
105         }
106 
107         checkSingleValue(single) {
108             assertEquals("OK", it)
109         }
110     }
111 
112     @Test
113     fun testSingleWithDelay() {
114         val single = rxSingle {
115             Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
116         }
117 
118         checkSingleValue(single) {
119             assertEquals("OK", it)
120         }
121     }
122 
123     @Test
124     fun testSingleException() {
125         val single = rxSingle {
126             Observable.just("O", "K").awaitSingle() + "K"
127         }
128 
129         checkErroneous(single) {
130             assert(it is IllegalArgumentException)
131         }
132     }
133 
134     @Test
135     fun testAwaitFirst() {
136         val single = rxSingle {
137             Observable.just("O", "#").awaitFirst() + "K"
138         }
139 
140         checkSingleValue(single) {
141             assertEquals("OK", it)
142         }
143     }
144 
145     @Test
146     fun testAwaitLast() {
147         val single = rxSingle {
148             Observable.just("#", "O").awaitLast() + "K"
149         }
150 
151         checkSingleValue(single) {
152             assertEquals("OK", it)
153         }
154     }
155 
156     @Test
157     fun testExceptionFromObservable() {
158         val single = rxSingle {
159             try {
160                 Observable.error<String>(RuntimeException("O")).awaitFirst()
161             } catch (e: RuntimeException) {
162                 Observable.just(e.message!!).awaitLast() + "K"
163             }
164         }
165 
166         checkSingleValue(single) {
167             assertEquals("OK", it)
168         }
169     }
170 
171     @Test
172     fun testExceptionFromCoroutine() {
173         val single = rxSingle<String> {
174             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
175         }
176 
177         checkErroneous(single) {
178             assert(it is IllegalStateException)
179             assertEquals("OK", it.message)
180         }
181     }
182 
183     @Test
184     fun testSuppressedException() = runTest {
185         val single = rxSingle(currentDispatcher()) {
186             launch(start = CoroutineStart.ATOMIC) {
187                 throw TestException() // child coroutine fails
188             }
189             try {
190                 delay(Long.MAX_VALUE)
191             } finally {
192                 throw TestException2() // but parent throws another exception while cleaning up
193             }
194         }
195         try {
196             single.await()
197             expectUnreached()
198         } catch (e: TestException) {
199             assertTrue(e.suppressed[0] is TestException2)
200         }
201     }
202 
203     @Test
204     fun testFatalExceptionInSubscribe() = runTest {
205         val handler = { e: Throwable ->
206             assertTrue(e is UndeliverableException && e.cause is LinkageError)
207             expect(2)
208         }
209         withExceptionHandler(handler) {
210             rxSingle(Dispatchers.Unconfined) {
211                 expect(1)
212                 42
213             }.subscribe(Consumer {
214                 throw LinkageError()
215             })
216             finish(3)
217         }
218     }
219 
220     @Test
221     fun testFatalExceptionInSingle() = runTest {
222         rxSingle(Dispatchers.Unconfined) {
223             throw LinkageError()
224         }.subscribe({ _, e ->  assertTrue(e is LinkageError); expect(1) })
225 
226         finish(2)
227     }
228 
229     @Test
230     fun testUnhandledException() = runTest {
231         expect(1)
232         var disposable: Disposable? = null
233         val handler = { e: Throwable ->
234             assertTrue(e is UndeliverableException && e.cause is TestException)
235             expect(5)
236         }
237         val single = rxSingle(currentDispatcher()) {
238             expect(4)
239             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
240             try {
241                 delay(Long.MAX_VALUE)
242             } finally {
243                 throw TestException() // would not be able to handle it since mono is disposed
244             }
245         }
246         withExceptionHandler(handler) {
247             single.subscribe(object : SingleObserver<Unit> {
248                 override fun onSubscribe(d: Disposable) {
249                     expect(2)
250                     disposable = d
251                 }
252 
253                 override fun onSuccess(t: Unit) {
254                     expectUnreached()
255                 }
256 
257                 override fun onError(t: Throwable) {
258                     expectUnreached()
259                 }
260             })
261             expect(3)
262             yield() // run coroutine
263             finish(6)
264         }
265     }
266 }
267