• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.exceptions.*
7 import io.reactivex.rxjava3.internal.functions.Functions.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.CancellationException
10 import org.junit.*
11 import org.junit.Test
12 import java.util.concurrent.*
13 import kotlin.test.*
14 
15 class MaybeTest : TestBase() {
16     @Before
17     fun setup() {
18         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
19     }
20 
21     @Test
22     fun testBasicSuccess() = runBlocking {
23         expect(1)
24         val maybe = rxMaybe(currentDispatcher()) {
25             expect(4)
26             "OK"
27         }
28         expect(2)
29         maybe.subscribe { value ->
30             expect(5)
31             assertEquals("OK", value)
32         }
33         expect(3)
34         yield() // to started coroutine
35         finish(6)
36     }
37 
38     @Test
39     fun testBasicEmpty() = runBlocking {
40         expect(1)
41         val maybe = rxMaybe(currentDispatcher()) {
42             expect(4)
43             null
44         }
45         expect(2)
46         maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
47             expect(5)
48         })
49         expect(3)
50         yield() // to started coroutine
51         finish(6)
52     }
53 
54     @Test
55     fun testBasicFailure() = runBlocking {
56         expect(1)
57         val maybe = rxMaybe(currentDispatcher()) {
58             expect(4)
59             throw RuntimeException("OK")
60         }
61         expect(2)
62         maybe.subscribe({
63             expectUnreached()
64         }, { error ->
65             expect(5)
66             assertIs<RuntimeException>(error)
67             assertEquals("OK", error.message)
68         })
69         expect(3)
70         yield() // to started coroutine
71         finish(6)
72     }
73 
74 
75     @Test
76     fun testBasicUnsubscribe() = runBlocking {
77         expect(1)
78         val maybe = rxMaybe(currentDispatcher()) {
79             expect(4)
80             yield() // back to main, will get cancelled
81             expectUnreached()
82         }
83         expect(2)
84         // nothing is called on a disposed rx2 maybe
85         val sub = maybe.subscribe({
86             expectUnreached()
87         }, {
88             expectUnreached()
89         })
90         expect(3)
91         yield() // to started coroutine
92         expect(5)
93         sub.dispose() // will cancel coroutine
94         yield()
95         finish(6)
96     }
97 
98     @Test
99     fun testMaybeNoWait() {
100         val maybe = rxMaybe {
101             "OK"
102         }
103 
104         checkMaybeValue(maybe) {
105             assertEquals("OK", it)
106         }
107     }
108 
109     @Test
110     fun testMaybeAwait() = runBlocking {
111         assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
112         assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
113     }
114 
115     @Test
116     fun testMaybeAwaitForNull(): Unit = runBlocking {
117         assertNull(Maybe.empty<String>().awaitSingleOrNull())
118         assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
119     }
120 
121     /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
122      * [Job] is cancelled. */
123     @Test
124     fun testMaybeAwaitCancellation() = runTest {
125         expect(1)
126         val maybe = MaybeSource<Int> { s ->
127             s.onSubscribe(object: Disposable {
128                 override fun dispose() { expect(4) }
129                 override fun isDisposed(): Boolean { expectUnreached(); return false }
130             })
131         }
132         val job = launch(start = CoroutineStart.UNDISPATCHED) {
133             try {
134                 expect(2)
135                 maybe.awaitSingleOrNull()
136             } catch (e: CancellationException) {
137                 expect(5)
138                 throw e
139             }
140         }
141         expect(3)
142         job.cancelAndJoin()
143         finish(6)
144     }
145 
146     @Test
147     fun testMaybeEmitAndAwait() {
148         val maybe = rxMaybe {
149             Maybe.just("O").awaitSingleOrNull() + "K"
150         }
151 
152         checkMaybeValue(maybe) {
153             assertEquals("OK", it)
154         }
155     }
156 
157     @Test
158     fun testMaybeWithDelay() {
159         val maybe = rxMaybe {
160             Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
161         }
162 
163         checkMaybeValue(maybe) {
164             assertEquals("OK", it)
165         }
166     }
167 
168     @Test
169     fun testMaybeException() {
170         val maybe = rxMaybe {
171             Observable.just("O", "K").awaitSingle() + "K"
172         }
173 
174         checkErroneous(maybe) {
175             assert(it is IllegalArgumentException)
176         }
177     }
178 
179     @Test
180     fun testAwaitFirst() {
181         val maybe = rxMaybe {
182             Observable.just("O", "#").awaitFirst() + "K"
183         }
184 
185         checkMaybeValue(maybe) {
186             assertEquals("OK", it)
187         }
188     }
189 
190     @Test
191     fun testAwaitLast() {
192         val maybe = rxMaybe {
193             Observable.just("#", "O").awaitLast() + "K"
194         }
195 
196         checkMaybeValue(maybe) {
197             assertEquals("OK", it)
198         }
199     }
200 
201     @Test
202     fun testExceptionFromObservable() {
203         val maybe = rxMaybe {
204             try {
205                 Observable.error<String>(RuntimeException("O")).awaitFirst()
206             } catch (e: RuntimeException) {
207                 Observable.just(e.message!!).awaitLast() + "K"
208             }
209         }
210 
211         checkMaybeValue(maybe) {
212             assertEquals("OK", it)
213         }
214     }
215 
216     @Test
217     fun testExceptionFromCoroutine() {
218         val maybe = rxMaybe<String> {
219             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
220         }
221 
222         checkErroneous(maybe) {
223             assert(it is IllegalStateException)
224             assertEquals("OK", it.message)
225         }
226     }
227 
228     @Test
229     fun testCancelledConsumer() = runTest {
230         expect(1)
231         val maybe = rxMaybe(currentDispatcher()) {
232             expect(4)
233             try {
234                 delay(Long.MAX_VALUE)
235             } catch (e: CancellationException) {
236                 expect(6)
237             }
238             42
239         }
240         expect(2)
241         val timeout = withTimeoutOrNull(100) {
242             expect(3)
243             maybe.collect {
244                 expectUnreached()
245             }
246             expectUnreached()
247         }
248         assertNull(timeout)
249         expect(5)
250         yield() // must cancel code inside maybe!!!
251         finish(7)
252     }
253 
254     /** Tests the simple scenario where the Maybe doesn't output a value. */
255     @Test
256     fun testMaybeCollectEmpty() = runTest {
257         expect(1)
258         Maybe.empty<Int>().collect {
259             expectUnreached()
260         }
261         finish(2)
262     }
263 
264     /** Tests the simple scenario where the Maybe doesn't output a value. */
265     @Test
266     fun testMaybeCollectSingle() = runTest {
267         expect(1)
268         Maybe.just("OK").collect {
269             assertEquals("OK", it)
270             expect(2)
271         }
272         finish(3)
273     }
274 
275     /** Tests the behavior of [collect] when the Maybe raises an error. */
276     @Test
277     fun testMaybeCollectThrowingMaybe() = runTest {
278         expect(1)
279         try {
280             Maybe.error<Int>(TestException()).collect {
281                 expectUnreached()
282             }
283         } catch (e: TestException) {
284             expect(2)
285         }
286         finish(3)
287     }
288 
289     /** Tests the behavior of [collect] when the action throws. */
290     @Test
291     fun testMaybeCollectThrowingAction() = runTest {
292         expect(1)
293         try {
294             Maybe.just("OK").collect {
295                 expect(2)
296                 throw TestException()
297             }
298         } catch (e: TestException) {
299             expect(3)
300         }
301         finish(4)
302     }
303 
304     @Test
305     fun testSuppressedException() = runTest {
306         val maybe = rxMaybe(currentDispatcher()) {
307             launch(start = CoroutineStart.ATOMIC) {
308                 throw TestException() // child coroutine fails
309             }
310             try {
311                 delay(Long.MAX_VALUE)
312             } finally {
313                 throw TestException2() // but parent throws another exception while cleaning up
314             }
315         }
316         try {
317             maybe.awaitSingleOrNull()
318             expectUnreached()
319         } catch (e: TestException) {
320             assertIs<TestException2>(e.suppressed[0])
321         }
322     }
323 
324     @Test
325     fun testUnhandledException() = runTest {
326         expect(1)
327         var disposable: Disposable? = null
328         val handler = { e: Throwable ->
329             assertTrue(e is UndeliverableException && e.cause is TestException)
330             expect(5)
331         }
332         val maybe = rxMaybe(currentDispatcher()) {
333             expect(4)
334             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
335             try {
336                 delay(Long.MAX_VALUE)
337             } finally {
338                 throw TestException() // would not be able to handle it since mono is disposed
339             }
340         }
341         withExceptionHandler(handler) {
342             maybe.subscribe(object : MaybeObserver<Unit> {
343                 override fun onSubscribe(d: Disposable) {
344                     expect(2)
345                     disposable = d
346                 }
347 
348                 override fun onComplete() {
349                     expectUnreached()
350                 }
351 
352                 override fun onSuccess(t: Unit) {
353                     expectUnreached()
354                 }
355 
356                 override fun onError(t: Throwable) {
357                     expectUnreached()
358                 }
359             })
360             expect(3)
361             yield() // run coroutine
362             finish(6)
363         }
364     }
365 
366     @Test
367     fun testFatalExceptionInSubscribe() = runTest {
368         val handler = { e: Throwable ->
369             assertTrue(e is UndeliverableException && e.cause is LinkageError)
370             expect(2)
371         }
372 
373         withExceptionHandler(handler) {
374             rxMaybe(Dispatchers.Unconfined) {
375                 expect(1)
376                 42
377             }.subscribe { throw LinkageError() }
378             finish(3)
379         }
380     }
381 
382     @Test
383     fun testFatalExceptionInSingle() = runTest {
384         rxMaybe(Dispatchers.Unconfined) {
385             throw LinkageError()
386         }.subscribe({ expectUnreached()  }, { expect(1); assertIs<LinkageError>(it) })
387         finish(2)
388     }
389 }
390