• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import io.reactivex.rxjava3.disposables.*
6 import io.reactivex.rxjava3.exceptions.*
7 import kotlinx.coroutines.*
8 import org.junit.Test
9 import kotlin.test.*
10 
11 class CompletableTest : TestBase() {
12     @Test
13     fun testBasicSuccess() = runBlocking {
14         expect(1)
15         val completable = rxCompletable(currentDispatcher()) {
16             expect(4)
17         }
18         expect(2)
19         completable.subscribe {
20             expect(5)
21         }
22         expect(3)
23         yield() // to completable coroutine
24         finish(6)
25     }
26 
27     @Test
28     fun testBasicFailure() = runBlocking {
29         expect(1)
30         val completable = rxCompletable(currentDispatcher()) {
31             expect(4)
32             throw RuntimeException("OK")
33         }
34         expect(2)
35         completable.subscribe({
36             expectUnreached()
37         }, { error ->
38             expect(5)
39             assertIs<RuntimeException>(error)
40             assertEquals("OK", error.message)
41         })
42         expect(3)
43         yield() // to completable coroutine
44         finish(6)
45     }
46 
47     @Test
48     fun testBasicUnsubscribe() = runBlocking {
49         expect(1)
50         val completable = rxCompletable(currentDispatcher()) {
51             expect(4)
52             yield() // back to main, will get cancelled
53             expectUnreached()
54         }
55         expect(2)
56         // nothing is called on a disposed rx3 completable
57         val sub = completable.subscribe({
58             expectUnreached()
59         }, {
60             expectUnreached()
61         })
62         expect(3)
63         yield() // to started coroutine
64         expect(5)
65         sub.dispose() // will cancel coroutine
66         yield()
67         finish(6)
68     }
69 
70     @Test
71     fun testAwaitSuccess() = runBlocking {
72         expect(1)
73         val completable = rxCompletable(currentDispatcher()) {
74             expect(3)
75         }
76         expect(2)
77         completable.await() // shall launch coroutine
78         finish(4)
79     }
80 
81     @Test
82     fun testAwaitFailure() = runBlocking {
83         expect(1)
84         val completable = rxCompletable(currentDispatcher()) {
85             expect(3)
86             throw RuntimeException("OK")
87         }
88         expect(2)
89         try {
90             completable.await() // shall launch coroutine and throw exception
91             expectUnreached()
92         } catch (e: RuntimeException) {
93             finish(4)
94             assertEquals("OK", e.message)
95         }
96     }
97 
98     /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
99      * cancelled. */
100     @Test
101     fun testAwaitCancellation() = runTest {
102         expect(1)
103         val completable = CompletableSource { s ->
104             s.onSubscribe(object: Disposable {
105                 override fun dispose() { expect(4) }
106                 override fun isDisposed(): Boolean { expectUnreached(); return false }
107             })
108         }
109         val job = launch(start = CoroutineStart.UNDISPATCHED) {
110             try {
111                 expect(2)
112                 completable.await()
113             } catch (e: CancellationException) {
114                 expect(5)
115                 throw e
116             }
117         }
118         expect(3)
119         job.cancelAndJoin()
120         finish(6)
121     }
122 
123     @Test
124     fun testSuppressedException() = runTest {
125         val completable = rxCompletable(currentDispatcher()) {
126             launch(start = CoroutineStart.ATOMIC) {
127                 throw TestException() // child coroutine fails
128             }
129             try {
130                 delay(Long.MAX_VALUE)
131             } finally {
132                 throw TestException2() // but parent throws another exception while cleaning up
133             }
134         }
135         try {
136             completable.await()
137             expectUnreached()
138         } catch (e: TestException) {
139             assertIs<TestException2>(e.suppressed[0])
140         }
141     }
142 
143     @Test
144     fun testUnhandledException() = runTest {
145         expect(1)
146         var disposable: Disposable? = null
147         val handler = { e: Throwable ->
148             assertTrue(e is UndeliverableException && e.cause is TestException)
149             expect(5)
150         }
151         val completable = rxCompletable(currentDispatcher()) {
152             expect(4)
153             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
154             try {
155                 delay(Long.MAX_VALUE)
156             } finally {
157                 throw TestException() // would not be able to handle it since mono is disposed
158             }
159         }
160         withExceptionHandler(handler) {
161             completable.subscribe(object : CompletableObserver {
162                 override fun onSubscribe(d: Disposable) {
163                     expect(2)
164                     disposable = d
165                 }
166 
167                 override fun onComplete() {
168                     expectUnreached()
169                 }
170 
171                 override fun onError(t: Throwable) {
172                     expectUnreached()
173                 }
174             })
175             expect(3)
176             yield() // run coroutine
177             finish(6)
178         }
179     }
180 
181     @Test
182     fun testFatalExceptionInSubscribe() = runTest {
183         val handler: (Throwable) -> Unit = { e ->
184             assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
185         }
186 
187         withExceptionHandler(handler) {
188             rxCompletable(Dispatchers.Unconfined) {
189                 expect(1)
190             }.subscribe { throw LinkageError() }
191             finish(3)
192         }
193     }
194 
195     @Test
196     fun testFatalExceptionInSingle() = runTest {
197         rxCompletable(Dispatchers.Unconfined) {
198             throw LinkageError()
199         }.subscribe({ expectUnreached()  }, { expect(1); assertIs<LinkageError>(it) })
200         finish(2)
201     }
202 }
203