• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.exceptions.*
10 import kotlinx.coroutines.*
11 import org.junit.Test
12 import kotlin.test.*
13 
14 class CompletableTest : TestBase() {
15     @Test
16     fun testBasicSuccess() = runBlocking {
17         expect(1)
18         val completable = rxCompletable(currentDispatcher()) {
19             expect(4)
20         }
21         expect(2)
22         completable.subscribe {
23             expect(5)
24         }
25         expect(3)
26         yield() // to completable coroutine
27         finish(6)
28     }
29 
30     @Test
31     fun testBasicFailure() = runBlocking {
32         expect(1)
33         val completable = rxCompletable(currentDispatcher()) {
34             expect(4)
35             throw RuntimeException("OK")
36         }
37         expect(2)
38         completable.subscribe({
39             expectUnreached()
40         }, { error ->
41             expect(5)
42             assertTrue(error is RuntimeException)
43             assertEquals("OK", error.message)
44         })
45         expect(3)
46         yield() // to completable coroutine
47         finish(6)
48     }
49 
50     @Test
51     fun testBasicUnsubscribe() = runBlocking {
52         expect(1)
53         val completable = rxCompletable(currentDispatcher()) {
54             expect(4)
55             yield() // back to main, will get cancelled
56             expectUnreached()
57         }
58         expect(2)
59         // nothing is called on a disposed rx2 completable
60         val sub = completable.subscribe({
61             expectUnreached()
62         }, {
63             expectUnreached()
64         })
65         expect(3)
66         yield() // to started coroutine
67         expect(5)
68         sub.dispose() // will cancel coroutine
69         yield()
70         finish(6)
71     }
72 
73     @Test
74     fun testAwaitSuccess() = runBlocking {
75         expect(1)
76         val completable = rxCompletable(currentDispatcher()) {
77             expect(3)
78         }
79         expect(2)
80         completable.await() // shall launch coroutine
81         finish(4)
82     }
83 
84     @Test
85     fun testAwaitFailure() = runBlocking {
86         expect(1)
87         val completable = rxCompletable(currentDispatcher()) {
88             expect(3)
89             throw RuntimeException("OK")
90         }
91         expect(2)
92         try {
93             completable.await() // shall launch coroutine and throw exception
94             expectUnreached()
95         } catch (e: RuntimeException) {
96             finish(4)
97             assertEquals("OK", e.message)
98         }
99     }
100 
101     @Test
102     fun testSuppressedException() = runTest {
103         val completable = rxCompletable(currentDispatcher()) {
104             launch(start = CoroutineStart.ATOMIC) {
105                 throw TestException() // child coroutine fails
106             }
107             try {
108                 delay(Long.MAX_VALUE)
109             } finally {
110                 throw TestException2() // but parent throws another exception while cleaning up
111             }
112         }
113         try {
114             completable.await()
115             expectUnreached()
116         } catch (e: TestException) {
117             assertTrue(e.suppressed[0] is TestException2)
118         }
119     }
120 
121     @Test
122     fun testUnhandledException() = runTest() {
123         expect(1)
124         var disposable: Disposable? = null
125         val handler = { e: Throwable ->
126             assertTrue(e is UndeliverableException && e.cause is TestException)
127             expect(5)
128         }
129         val completable = rxCompletable(currentDispatcher()) {
130             expect(4)
131             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
132             try {
133                 delay(Long.MAX_VALUE)
134             } finally {
135                 throw TestException() // would not be able to handle it since mono is disposed
136             }
137         }
138         withExceptionHandler(handler) {
139             completable.subscribe(object : CompletableObserver {
140                 override fun onSubscribe(d: Disposable) {
141                     expect(2)
142                     disposable = d
143                 }
144 
145                 override fun onComplete() {
146                     expectUnreached()
147                 }
148 
149                 override fun onError(t: Throwable) {
150                     expectUnreached()
151                 }
152             })
153             expect(3)
154             yield() // run coroutine
155             finish(6)
156         }
157     }
158 
159     @Test
160     fun testFatalExceptionInSubscribe() = runTest {
161         val handler: (Throwable) -> Unit = { e ->
162             assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
163         }
164 
165         withExceptionHandler(handler) {
166             rxCompletable(Dispatchers.Unconfined) {
167                 expect(1)
168                 42
169             }.subscribe({ throw LinkageError() })
170             finish(3)
171         }
172     }
173 
174     @Test
175     fun testFatalExceptionInSingle() = runTest {
176         rxCompletable(Dispatchers.Unconfined) {
177             throw LinkageError()
178         }.subscribe({ expectUnreached()  }, { expect(1); assertTrue(it is LinkageError) })
179         finish(2)
180     }
181 }
182