<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import io.reactivex.rxjava3.core.* 5 import io.reactivex.rxjava3.disposables.* 6 import io.reactivex.rxjava3.exceptions.* 7 import kotlinx.coroutines.* 8 import org.junit.Test 9 import kotlin.test.* 10 11 class CompletableTest : TestBase() { 12 @Test 13 fun testBasicSuccess() = runBlocking { 14 expect(1) 15 val completable = rxCompletable(currentDispatcher()) { 16 expect(4) 17 } 18 expect(2) 19 completable.subscribe { 20 expect(5) 21 } 22 expect(3) 23 yield() // to completable coroutine 24 finish(6) 25 } 26 27 @Test 28 fun testBasicFailure() = runBlocking { 29 expect(1) 30 val completable = rxCompletable(currentDispatcher()) { 31 expect(4) 32 throw RuntimeException("OK") 33 } 34 expect(2) 35 completable.subscribe({ 36 expectUnreached() 37 }, { error -> 38 expect(5) 39 assertIs<RuntimeException>(error) 40 assertEquals("OK", error.message) 41 }) 42 expect(3) 43 yield() // to completable coroutine 44 finish(6) 45 } 46 47 @Test 48 fun testBasicUnsubscribe() = runBlocking { 49 expect(1) 50 val completable = rxCompletable(currentDispatcher()) { 51 expect(4) 52 yield() // back to main, will get cancelled 53 expectUnreached() 54 } 55 expect(2) 56 // nothing is called on a disposed rx3 completable 57 val sub = completable.subscribe({ 58 expectUnreached() 59 }, { 60 expectUnreached() 61 }) 62 expect(3) 63 yield() // to started coroutine 64 expect(5) 65 sub.dispose() // will cancel coroutine 66 yield() 67 finish(6) 68 } 69 70 @Test 71 fun testAwaitSuccess() = runBlocking { 72 expect(1) 73 val completable = rxCompletable(currentDispatcher()) { 74 expect(3) 75 } 76 expect(2) 77 completable.await() // shall launch coroutine 78 finish(4) 79 } 80 81 @Test 82 fun testAwaitFailure() = runBlocking { 83 expect(1) 84 val completable = rxCompletable(currentDispatcher()) { 85 expect(3) 86 throw RuntimeException("OK") 87 } 88 expect(2) 89 try { 90 completable.await() // shall launch coroutine and throw exception 91 expectUnreached() 92 } catch (e: RuntimeException) { 93 finish(4) 94 assertEquals("OK", e.message) 95 } 96 } 97 98 /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is 99 * cancelled. */ 100 @Test 101 fun testAwaitCancellation() = runTest { 102 expect(1) 103 val completable = CompletableSource { s -> 104 s.onSubscribe(object: Disposable { 105 override fun dispose() { expect(4) } 106 override fun isDisposed(): Boolean { expectUnreached(); return false } 107 }) 108 } 109 val job = launch(start = CoroutineStart.UNDISPATCHED) { 110 try { 111 expect(2) 112 completable.await() 113 } catch (e: CancellationException) { 114 expect(5) 115 throw e 116 } 117 } 118 expect(3) 119 job.cancelAndJoin() 120 finish(6) 121 } 122 123 @Test 124 fun testSuppressedException() = runTest { 125 val completable = rxCompletable(currentDispatcher()) { 126 launch(start = CoroutineStart.ATOMIC) { 127 throw TestException() // child coroutine fails 128 } 129 try { 130 delay(Long.MAX_VALUE) 131 } finally { 132 throw TestException2() // but parent throws another exception while cleaning up 133 } 134 } 135 try { 136 completable.await() 137 expectUnreached() 138 } catch (e: TestException) { 139 assertIs<TestException2>(e.suppressed[0]) 140 } 141 } 142 143 @Test 144 fun testUnhandledException() = runTest { 145 expect(1) 146 var disposable: Disposable? = null 147 val handler = { e: Throwable -> 148 assertTrue(e is UndeliverableException && e.cause is TestException) 149 expect(5) 150 } 151 val completable = rxCompletable(currentDispatcher()) { 152 expect(4) 153 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 154 try { 155 delay(Long.MAX_VALUE) 156 } finally { 157 throw TestException() // would not be able to handle it since mono is disposed 158 } 159 } 160 withExceptionHandler(handler) { 161 completable.subscribe(object : CompletableObserver { 162 override fun onSubscribe(d: Disposable) { 163 expect(2) 164 disposable = d 165 } 166 167 override fun onComplete() { 168 expectUnreached() 169 } 170 171 override fun onError(t: Throwable) { 172 expectUnreached() 173 } 174 }) 175 expect(3) 176 yield() // run coroutine 177 finish(6) 178 } 179 } 180 181 @Test 182 fun testFatalExceptionInSubscribe() = runTest { 183 val handler: (Throwable) -> Unit = { e -> 184 assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2) 185 } 186 187 withExceptionHandler(handler) { 188 rxCompletable(Dispatchers.Unconfined) { 189 expect(1) 190 }.subscribe { throw LinkageError() } 191 finish(3) 192 } 193 } 194 195 @Test 196 fun testFatalExceptionInSingle() = runTest { 197 rxCompletable(Dispatchers.Unconfined) { 198 throw LinkageError() 199 }.subscribe({ expectUnreached() }, { expect(1); assertIs<LinkageError>(it) }) 200 finish(2) 201 } 202 } 203