• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.exceptions.*
7 import io.reactivex.rxjava3.functions.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.CancellationException
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14 
15 class SingleTest : TestBase() {
16     @Before
17     fun setup() {
18         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
19     }
20 
21     @Test
22     fun testBasicSuccess() = runBlocking {
23         expect(1)
24         val single = rxSingle(currentDispatcher()) {
25             expect(4)
26             "OK"
27         }
28         expect(2)
29         single.subscribe { value ->
30             expect(5)
31             assertEquals("OK", value)
32         }
33         expect(3)
34         yield() // to started coroutine
35         finish(6)
36     }
37 
38     @Test
39     fun testBasicFailure() = runBlocking {
40         expect(1)
41         val single = rxSingle(currentDispatcher()) {
42             expect(4)
43             throw RuntimeException("OK")
44         }
45         expect(2)
46         single.subscribe({
47             expectUnreached()
48         }, { error ->
49             expect(5)
50             assertIs<RuntimeException>(error)
51             assertEquals("OK", error.message)
52         })
53         expect(3)
54         yield() // to started coroutine
55         finish(6)
56     }
57 
58 
59     @Test
60     fun testBasicUnsubscribe() = runBlocking {
61         expect(1)
62         val single = rxSingle(currentDispatcher()) {
63             expect(4)
64             yield() // back to main, will get cancelled
65             expectUnreached()
66 
67         }
68         expect(2)
69         // nothing is called on a disposed rx3 single
70         val sub = single.subscribe({
71             expectUnreached()
72         }, {
73             expectUnreached()
74         })
75         expect(3)
76         yield() // to started coroutine
77         expect(5)
78         sub.dispose() // will cancel coroutine
79         yield()
80         finish(6)
81     }
82 
83     @Test
84     fun testSingleNoWait() {
85         val single = rxSingle {
86             "OK"
87         }
88 
89         checkSingleValue(single) {
90             assertEquals("OK", it)
91         }
92     }
93 
94     @Test
95     fun testSingleAwait() = runBlocking {
96         assertEquals("OK", Single.just("O").await() + "K")
97     }
98 
99     /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their
100      * [Job] is cancelled. */
101     @Test
102     fun testSingleAwaitCancellation() = runTest {
103         expect(1)
104         val single = SingleSource<Int> { s ->
105             s.onSubscribe(object: Disposable {
106                 override fun dispose() { expect(4) }
107                 override fun isDisposed(): Boolean { expectUnreached(); return false }
108             })
109         }
110         val job = launch(start = CoroutineStart.UNDISPATCHED) {
111             try {
112                 expect(2)
113                 single.await()
114             } catch (e: CancellationException) {
115                 expect(5)
116                 throw e
117             }
118         }
119         expect(3)
120         job.cancelAndJoin()
121         finish(6)
122     }
123 
124     @Test
125     fun testSingleEmitAndAwait() {
126         val single = rxSingle {
127             Single.just("O").await() + "K"
128         }
129 
130         checkSingleValue(single) {
131             assertEquals("OK", it)
132         }
133     }
134 
135     @Test
136     fun testSingleWithDelay() {
137         val single = rxSingle {
138             Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
139         }
140 
141         checkSingleValue(single) {
142             assertEquals("OK", it)
143         }
144     }
145 
146     @Test
147     fun testSingleException() {
148         val single = rxSingle {
149             Observable.just("O", "K").awaitSingle() + "K"
150         }
151 
152         checkErroneous(single) {
153             assert(it is IllegalArgumentException)
154         }
155     }
156 
157     @Test
158     fun testAwaitFirst() {
159         val single = rxSingle {
160             Observable.just("O", "#").awaitFirst() + "K"
161         }
162 
163         checkSingleValue(single) {
164             assertEquals("OK", it)
165         }
166     }
167 
168     @Test
169     fun testAwaitLast() {
170         val single = rxSingle {
171             Observable.just("#", "O").awaitLast() + "K"
172         }
173 
174         checkSingleValue(single) {
175             assertEquals("OK", it)
176         }
177     }
178 
179     @Test
180     fun testExceptionFromObservable() {
181         val single = rxSingle {
182             try {
183                 Observable.error<String>(RuntimeException("O")).awaitFirst()
184             } catch (e: RuntimeException) {
185                 Observable.just(e.message!!).awaitLast() + "K"
186             }
187         }
188 
189         checkSingleValue(single) {
190             assertEquals("OK", it)
191         }
192     }
193 
194     @Test
195     fun testExceptionFromCoroutine() {
196         val single = rxSingle<String> {
197             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
198         }
199 
200         checkErroneous(single) {
201             assert(it is IllegalStateException)
202             assertEquals("OK", it.message)
203         }
204     }
205 
206     @Test
207     fun testSuppressedException() = runTest {
208         val single = rxSingle(currentDispatcher()) {
209             launch(start = CoroutineStart.ATOMIC) {
210                 throw TestException() // child coroutine fails
211             }
212             try {
213                 delay(Long.MAX_VALUE)
214             } finally {
215                 throw TestException2() // but parent throws another exception while cleaning up
216             }
217         }
218         try {
219             single.await()
220             expectUnreached()
221         } catch (e: TestException) {
222             assertIs<TestException2>(e.suppressed[0])
223         }
224     }
225 
226     @Test
227     fun testFatalExceptionInSubscribe() = runTest {
228         val handler = { e: Throwable ->
229             assertTrue(e is UndeliverableException && e.cause is LinkageError)
230             expect(2)
231         }
232         withExceptionHandler(handler) {
233             rxSingle(Dispatchers.Unconfined) {
234                 expect(1)
235                 42
236             }.subscribe(Consumer {
237                 throw LinkageError()
238             })
239             finish(3)
240         }
241     }
242 
243     @Test
244     fun testFatalExceptionInSingle() = runTest {
245         rxSingle(Dispatchers.Unconfined) {
246             throw LinkageError()
247         }.subscribe { _, e -> assertIs<LinkageError>(e); expect(1) }
248 
249         finish(2)
250     }
251 
252     @Test
253     fun testUnhandledException() = runTest {
254         expect(1)
255         var disposable: Disposable? = null
256         val handler = { e: Throwable ->
257             assertTrue(e is UndeliverableException && e.cause is TestException)
258             expect(5)
259         }
260         val single = rxSingle(currentDispatcher()) {
261             expect(4)
262             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
263             try {
264                 delay(Long.MAX_VALUE)
265             } finally {
266                 throw TestException() // would not be able to handle it since mono is disposed
267             }
268         }
269         withExceptionHandler(handler) {
270             single.subscribe(object : SingleObserver<Unit> {
271                 override fun onSubscribe(d: Disposable) {
272                     expect(2)
273                     disposable = d
274                 }
275 
276                 override fun onSuccess(t: Unit) {
277                     expectUnreached()
278                 }
279 
280                 override fun onError(t: Throwable) {
281                     expectUnreached()
282                 }
283             })
284             expect(3)
285             yield() // run coroutine
286             finish(6)
287         }
288     }
289 }
290