1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.disposables.* 9 import io.reactivex.rxjava3.exceptions.* 10 import kotlinx.coroutines.* 11 import org.junit.Test 12 import kotlin.test.* 13 14 class CompletableTest : TestBase() { 15 @Test 16 fun testBasicSuccess() = runBlocking { 17 expect(1) 18 val completable = rxCompletable(currentDispatcher()) { 19 expect(4) 20 } 21 expect(2) 22 completable.subscribe { 23 expect(5) 24 } 25 expect(3) 26 yield() // to completable coroutine 27 finish(6) 28 } 29 30 @Test 31 fun testBasicFailure() = runBlocking { 32 expect(1) 33 val completable = rxCompletable(currentDispatcher()) { 34 expect(4) 35 throw RuntimeException("OK") 36 } 37 expect(2) 38 completable.subscribe({ 39 expectUnreached() 40 }, { error -> 41 expect(5) 42 assertTrue(error is RuntimeException) 43 assertEquals("OK", error.message) 44 }) 45 expect(3) 46 yield() // to completable coroutine 47 finish(6) 48 } 49 50 @Test 51 fun testBasicUnsubscribe() = runBlocking { 52 expect(1) 53 val completable = rxCompletable(currentDispatcher()) { 54 expect(4) 55 yield() // back to main, will get cancelled 56 expectUnreached() 57 } 58 expect(2) 59 // nothing is called on a disposed rx3 completable 60 val sub = completable.subscribe({ 61 expectUnreached() 62 }, { 63 expectUnreached() 64 }) 65 expect(3) 66 yield() // to started coroutine 67 expect(5) 68 sub.dispose() // will cancel coroutine 69 yield() 70 finish(6) 71 } 72 73 @Test 74 fun testAwaitSuccess() = runBlocking { 75 expect(1) 76 val completable = rxCompletable(currentDispatcher()) { 77 expect(3) 78 } 79 expect(2) 80 completable.await() // shall launch coroutine 81 finish(4) 82 } 83 84 @Test 85 fun testAwaitFailure() = runBlocking { 86 expect(1) 87 val completable = rxCompletable(currentDispatcher()) { 88 expect(3) 89 throw RuntimeException("OK") 90 } 91 expect(2) 92 try { 93 completable.await() // shall launch coroutine and throw exception 94 expectUnreached() 95 } catch (e: RuntimeException) { 96 finish(4) 97 assertEquals("OK", e.message) 98 } 99 } 100 101 /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is 102 * cancelled. */ 103 @Test 104 fun testAwaitCancellation() = runTest { 105 expect(1) 106 val completable = CompletableSource { s -> 107 s.onSubscribe(object: Disposable { 108 override fun dispose() { expect(4) } 109 override fun isDisposed(): Boolean { expectUnreached(); return false } 110 }) 111 } 112 val job = launch(start = CoroutineStart.UNDISPATCHED) { 113 try { 114 expect(2) 115 completable.await() 116 } catch (e: CancellationException) { 117 expect(5) 118 throw e 119 } 120 } 121 expect(3) 122 job.cancelAndJoin() 123 finish(6) 124 } 125 126 @Test 127 fun testSuppressedException() = runTest { 128 val completable = rxCompletable(currentDispatcher()) { 129 launch(start = CoroutineStart.ATOMIC) { 130 throw TestException() // child coroutine fails 131 } 132 try { 133 delay(Long.MAX_VALUE) 134 } finally { 135 throw TestException2() // but parent throws another exception while cleaning up 136 } 137 } 138 try { 139 completable.await() 140 expectUnreached() 141 } catch (e: TestException) { 142 assertTrue(e.suppressed[0] is TestException2) 143 } 144 } 145 146 @Test 147 fun testUnhandledException() = runTest { 148 expect(1) 149 var disposable: Disposable? = null 150 val handler = { e: Throwable -> 151 assertTrue(e is UndeliverableException && e.cause is TestException) 152 expect(5) 153 } 154 val completable = rxCompletable(currentDispatcher()) { 155 expect(4) 156 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 157 try { 158 delay(Long.MAX_VALUE) 159 } finally { 160 throw TestException() // would not be able to handle it since mono is disposed 161 } 162 } 163 withExceptionHandler(handler) { 164 completable.subscribe(object : CompletableObserver { 165 override fun onSubscribe(d: Disposable) { 166 expect(2) 167 disposable = d 168 } 169 170 override fun onComplete() { 171 expectUnreached() 172 } 173 174 override fun onError(t: Throwable) { 175 expectUnreached() 176 } 177 }) 178 expect(3) 179 yield() // run coroutine 180 finish(6) 181 } 182 } 183 184 @Test 185 fun testFatalExceptionInSubscribe() = runTest { 186 val handler: (Throwable) -> Unit = { e -> 187 assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2) 188 } 189 190 withExceptionHandler(handler) { 191 rxCompletable(Dispatchers.Unconfined) { 192 expect(1) 193 }.subscribe { throw LinkageError() } 194 finish(3) 195 } 196 } 197 198 @Test 199 fun testFatalExceptionInSingle() = runTest { 200 rxCompletable(Dispatchers.Unconfined) { 201 throw LinkageError() 202 }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) 203 finish(2) 204 } 205 } 206