• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.exceptions.*
10 import io.reactivex.internal.functions.Functions.*
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.CancellationException
13 import org.junit.*
14 import org.junit.Test
15 import java.util.concurrent.*
16 import kotlin.test.*
17 
18 class MaybeTest : TestBase() {
19     @Before
20     fun setup() {
21         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22     }
23 
24     @Test
25     fun testBasicSuccess() = runBlocking {
26         expect(1)
27         val maybe = rxMaybe(currentDispatcher()) {
28             expect(4)
29             "OK"
30         }
31         expect(2)
32         maybe.subscribe { value ->
33             expect(5)
34             assertEquals("OK", value)
35         }
36         expect(3)
37         yield() // to started coroutine
38         finish(6)
39     }
40 
41     @Test
42     fun testBasicEmpty() = runBlocking {
43         expect(1)
44         val maybe = rxMaybe(currentDispatcher()) {
45             expect(4)
46             null
47         }
48         expect(2)
49         maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, {
50             expect(5)
51         })
52         expect(3)
53         yield() // to started coroutine
54         finish(6)
55     }
56 
57     @Test
58     fun testBasicFailure() = runBlocking {
59         expect(1)
60         val maybe = rxMaybe(currentDispatcher()) {
61             expect(4)
62             throw RuntimeException("OK")
63         }
64         expect(2)
65         maybe.subscribe({
66             expectUnreached()
67         }, { error ->
68             expect(5)
69             assertTrue(error is RuntimeException)
70             assertEquals("OK", error.message)
71         })
72         expect(3)
73         yield() // to started coroutine
74         finish(6)
75     }
76 
77 
78     @Test
79     fun testBasicUnsubscribe() = runBlocking {
80         expect(1)
81         val maybe = rxMaybe(currentDispatcher()) {
82             expect(4)
83             yield() // back to main, will get cancelled
84             expectUnreached()
85         }
86         expect(2)
87         // nothing is called on a disposed rx2 maybe
88         val sub = maybe.subscribe({
89             expectUnreached()
90         }, {
91             expectUnreached()
92         })
93         expect(3)
94         yield() // to started coroutine
95         expect(5)
96         sub.dispose() // will cancel coroutine
97         yield()
98         finish(6)
99     }
100 
101     @Test
102     fun testMaybeNoWait() {
103         val maybe = rxMaybe {
104             "OK"
105         }
106 
107         checkMaybeValue(maybe) {
108             assertEquals("OK", it)
109         }
110     }
111 
112     @Test
113     fun testMaybeAwait() = runBlocking {
114         assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K")
115         assertEquals("OK", Maybe.just("O").awaitSingle() + "K")
116     }
117 
118     @Test
119     fun testMaybeAwaitForNull(): Unit = runBlocking {
120         assertNull(Maybe.empty<String>().awaitSingleOrNull())
121         assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() }
122     }
123 
124     /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their
125      * [Job] is cancelled. */
126     @Test
127     fun testMaybeAwaitCancellation() = runTest {
128         expect(1)
129         val maybe = MaybeSource<Int> { s ->
130             s.onSubscribe(object: Disposable {
131                 override fun dispose() { expect(4) }
132                 override fun isDisposed(): Boolean { expectUnreached(); return false }
133             })
134         }
135         val job = launch(start = CoroutineStart.UNDISPATCHED) {
136             try {
137                 expect(2)
138                 maybe.awaitSingleOrNull()
139             } catch (e: CancellationException) {
140                 expect(5)
141                 throw e
142             }
143         }
144         expect(3)
145         job.cancelAndJoin()
146         finish(6)
147     }
148 
149     @Test
150     fun testMaybeEmitAndAwait() {
151         val maybe = rxMaybe {
152             Maybe.just("O").awaitSingleOrNull() + "K"
153         }
154 
155         checkMaybeValue(maybe) {
156             assertEquals("OK", it)
157         }
158     }
159 
160     @Test
161     fun testMaybeWithDelay() {
162         val maybe = rxMaybe {
163             Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K"
164         }
165 
166         checkMaybeValue(maybe) {
167             assertEquals("OK", it)
168         }
169     }
170 
171     @Test
172     fun testMaybeException() {
173         val maybe = rxMaybe {
174             Observable.just("O", "K").awaitSingle() + "K"
175         }
176 
177         checkErroneous(maybe) {
178             assert(it is IllegalArgumentException)
179         }
180     }
181 
182     @Test
183     fun testAwaitFirst() {
184         val maybe = rxMaybe {
185             Observable.just("O", "#").awaitFirst() + "K"
186         }
187 
188         checkMaybeValue(maybe) {
189             assertEquals("OK", it)
190         }
191     }
192 
193     @Test
194     fun testAwaitLast() {
195         val maybe = rxMaybe {
196             Observable.just("#", "O").awaitLast() + "K"
197         }
198 
199         checkMaybeValue(maybe) {
200             assertEquals("OK", it)
201         }
202     }
203 
204     @Test
205     fun testExceptionFromObservable() {
206         val maybe = rxMaybe {
207             try {
208                 Observable.error<String>(RuntimeException("O")).awaitFirst()
209             } catch (e: RuntimeException) {
210                 Observable.just(e.message!!).awaitLast() + "K"
211             }
212         }
213 
214         checkMaybeValue(maybe) {
215             assertEquals("OK", it)
216         }
217     }
218 
219     @Test
220     fun testExceptionFromCoroutine() {
221         val maybe = rxMaybe<String> {
222             throw IllegalStateException(Observable.just("O").awaitSingle() + "K")
223         }
224 
225         checkErroneous(maybe) {
226             assert(it is IllegalStateException)
227             assertEquals("OK", it.message)
228         }
229     }
230 
231     @Test
232     fun testCancelledConsumer() = runTest {
233         expect(1)
234         val maybe = rxMaybe(currentDispatcher()) {
235             expect(4)
236             try {
237                 delay(Long.MAX_VALUE)
238             } catch (e: CancellationException) {
239                 expect(6)
240             }
241             42
242         }
243         expect(2)
244         val timeout = withTimeoutOrNull(100) {
245             expect(3)
246             maybe.collect {
247                 expectUnreached()
248             }
249             expectUnreached()
250         }
251         assertNull(timeout)
252         expect(5)
253         yield() // must cancel code inside maybe!!!
254         finish(7)
255     }
256 
257     /** Tests the simple scenario where the Maybe doesn't output a value. */
258     @Test
259     fun testMaybeCollectEmpty() = runTest {
260         expect(1)
261         Maybe.empty<Int>().collect {
262             expectUnreached()
263         }
264         finish(2)
265     }
266 
267     /** Tests the simple scenario where the Maybe doesn't output a value. */
268     @Test
269     fun testMaybeCollectSingle() = runTest {
270         expect(1)
271         Maybe.just("OK").collect {
272             assertEquals("OK", it)
273             expect(2)
274         }
275         finish(3)
276     }
277 
278     /** Tests the behavior of [collect] when the Maybe raises an error. */
279     @Test
280     fun testMaybeCollectThrowingMaybe() = runTest {
281         expect(1)
282         try {
283             Maybe.error<Int>(TestException()).collect {
284                 expectUnreached()
285             }
286         } catch (e: TestException) {
287             expect(2)
288         }
289         finish(3)
290     }
291 
292     /** Tests the behavior of [collect] when the action throws. */
293     @Test
294     fun testMaybeCollectThrowingAction() = runTest {
295         expect(1)
296         try {
297             Maybe.just("OK").collect {
298                 expect(2)
299                 throw TestException()
300             }
301         } catch (e: TestException) {
302             expect(3)
303         }
304         finish(4)
305     }
306 
307     @Test
308     fun testSuppressedException() = runTest {
309         val maybe = rxMaybe(currentDispatcher()) {
310             launch(start = CoroutineStart.ATOMIC) {
311                 throw TestException() // child coroutine fails
312             }
313             try {
314                 delay(Long.MAX_VALUE)
315             } finally {
316                 throw TestException2() // but parent throws another exception while cleaning up
317             }
318         }
319         try {
320             maybe.awaitSingleOrNull()
321             expectUnreached()
322         } catch (e: TestException) {
323             assertTrue(e.suppressed[0] is TestException2)
324         }
325     }
326 
327     @Test
328     fun testUnhandledException() = runTest {
329         expect(1)
330         var disposable: Disposable? = null
331         val handler = { e: Throwable ->
332             assertTrue(e is UndeliverableException && e.cause is TestException)
333             expect(5)
334         }
335         val maybe = rxMaybe(currentDispatcher()) {
336             expect(4)
337             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
338             try {
339                 delay(Long.MAX_VALUE)
340             } finally {
341                 throw TestException() // would not be able to handle it since mono is disposed
342             }
343         }
344         withExceptionHandler(handler) {
345             maybe.subscribe(object : MaybeObserver<Unit> {
346                 override fun onSubscribe(d: Disposable) {
347                     expect(2)
348                     disposable = d
349                 }
350 
351                 override fun onComplete() {
352                     expectUnreached()
353                 }
354 
355                 override fun onSuccess(t: Unit) {
356                     expectUnreached()
357                 }
358 
359                 override fun onError(t: Throwable) {
360                     expectUnreached()
361                 }
362             })
363             expect(3)
364             yield() // run coroutine
365             finish(6)
366         }
367     }
368 
369     @Test
370     fun testFatalExceptionInSubscribe() = runTest {
371         val handler = { e: Throwable ->
372             assertTrue(e is UndeliverableException && e.cause is LinkageError)
373             expect(2)
374         }
375 
376         withExceptionHandler(handler) {
377             rxMaybe(Dispatchers.Unconfined) {
378                 expect(1)
379                 42
380             }.subscribe { throw LinkageError() }
381             finish(3)
382         }
383     }
384 
385     @Test
386     fun testFatalExceptionInSingle() = runTest {
387         rxMaybe(Dispatchers.Unconfined) {
388             throw LinkageError()
389         }.subscribe({ expectUnreached()  }, { expect(1); assertTrue(it is LinkageError) })
390         finish(2)
391     }
392 }
393