• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import io.reactivex.disposables.*
9 import io.reactivex.exceptions.*
10 import kotlinx.coroutines.*
11 import org.junit.Test
12 import kotlin.test.*
13 
14 class CompletableTest : TestBase() {
15     @Test
16     fun testBasicSuccess() = runBlocking {
17         expect(1)
18         val completable = rxCompletable(currentDispatcher()) {
19             expect(4)
20         }
21         expect(2)
22         completable.subscribe {
23             expect(5)
24         }
25         expect(3)
26         yield() // to completable coroutine
27         finish(6)
28     }
29 
30     @Test
31     fun testBasicFailure() = runBlocking {
32         expect(1)
33         val completable = rxCompletable(currentDispatcher()) {
34             expect(4)
35             throw RuntimeException("OK")
36         }
37         expect(2)
38         completable.subscribe({
39             expectUnreached()
40         }, { error ->
41             expect(5)
42             assertTrue(error is RuntimeException)
43             assertEquals("OK", error.message)
44         })
45         expect(3)
46         yield() // to completable coroutine
47         finish(6)
48     }
49 
50     @Test
51     fun testBasicUnsubscribe() = runBlocking {
52         expect(1)
53         val completable = rxCompletable(currentDispatcher()) {
54             expect(4)
55             yield() // back to main, will get cancelled
56             expectUnreached()
57         }
58         expect(2)
59         // nothing is called on a disposed rx2 completable
60         val sub = completable.subscribe({
61             expectUnreached()
62         }, {
63             expectUnreached()
64         })
65         expect(3)
66         yield() // to started coroutine
67         expect(5)
68         sub.dispose() // will cancel coroutine
69         yield()
70         finish(6)
71     }
72 
73     @Test
74     fun testAwaitSuccess() = runBlocking {
75         expect(1)
76         val completable = rxCompletable(currentDispatcher()) {
77             expect(3)
78         }
79         expect(2)
80         completable.await() // shall launch coroutine
81         finish(4)
82     }
83 
84     @Test
85     fun testAwaitFailure() = runBlocking {
86         expect(1)
87         val completable = rxCompletable(currentDispatcher()) {
88             expect(3)
89             throw RuntimeException("OK")
90         }
91         expect(2)
92         try {
93             completable.await() // shall launch coroutine and throw exception
94             expectUnreached()
95         } catch (e: RuntimeException) {
96             finish(4)
97             assertEquals("OK", e.message)
98         }
99     }
100 
101     /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is
102      * cancelled. */
103     @Test
104     fun testAwaitCancellation() = runTest {
105         expect(1)
106         val completable = CompletableSource { s ->
107             s.onSubscribe(object: Disposable {
108                 override fun dispose() { expect(4) }
109                 override fun isDisposed(): Boolean { expectUnreached(); return false }
110             })
111         }
112         val job = launch(start = CoroutineStart.UNDISPATCHED) {
113             try {
114                 expect(2)
115                 completable.await()
116             } catch (e: CancellationException) {
117                 expect(5)
118                 throw e
119             }
120         }
121         expect(3)
122         job.cancelAndJoin()
123         finish(6)
124     }
125 
126     @Test
127     fun testSuppressedException() = runTest {
128         val completable = rxCompletable(currentDispatcher()) {
129             launch(start = CoroutineStart.ATOMIC) {
130                 throw TestException() // child coroutine fails
131             }
132             try {
133                 delay(Long.MAX_VALUE)
134             } finally {
135                 throw TestException2() // but parent throws another exception while cleaning up
136             }
137         }
138         try {
139             completable.await()
140             expectUnreached()
141         } catch (e: TestException) {
142             assertTrue(e.suppressed[0] is TestException2)
143         }
144     }
145 
146     @Test
147     fun testUnhandledException() = runTest {
148         expect(1)
149         var disposable: Disposable? = null
150         val handler = { e: Throwable ->
151             assertTrue(e is UndeliverableException && e.cause is TestException)
152             expect(5)
153         }
154         val completable = rxCompletable(currentDispatcher()) {
155             expect(4)
156             disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled
157             try {
158                 delay(Long.MAX_VALUE)
159             } finally {
160                 throw TestException() // would not be able to handle it since mono is disposed
161             }
162         }
163         withExceptionHandler(handler) {
164             completable.subscribe(object : CompletableObserver {
165                 override fun onSubscribe(d: Disposable) {
166                     expect(2)
167                     disposable = d
168                 }
169 
170                 override fun onComplete() {
171                     expectUnreached()
172                 }
173 
174                 override fun onError(t: Throwable) {
175                     expectUnreached()
176                 }
177             })
178             expect(3)
179             yield() // run coroutine
180             finish(6)
181         }
182     }
183 
184     @Test
185     fun testFatalExceptionInSubscribe() = runTest {
186         val handler: (Throwable) -> Unit = { e ->
187             assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2)
188         }
189 
190         withExceptionHandler(handler) {
191             rxCompletable(Dispatchers.Unconfined) {
192                 expect(1)
193             }.subscribe { throw LinkageError() }
194             finish(3)
195         }
196     }
197 
198     @Test
199     fun testFatalExceptionInSingle() = runTest {
200         rxCompletable(Dispatchers.Unconfined) {
201             throw LinkageError()
202         }.subscribe({ expectUnreached()  }, { expect(1); assertTrue(it is LinkageError) })
203         finish(2)
204     }
205 }
206