1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import io.reactivex.exceptions.* 10 import kotlinx.coroutines.* 11 import org.junit.Test 12 import kotlin.test.* 13 14 class CompletableTest : TestBase() { 15 @Test 16 fun testBasicSuccess() = runBlocking { 17 expect(1) 18 val completable = rxCompletable(currentDispatcher()) { 19 expect(4) 20 } 21 expect(2) 22 completable.subscribe { 23 expect(5) 24 } 25 expect(3) 26 yield() // to completable coroutine 27 finish(6) 28 } 29 30 @Test 31 fun testBasicFailure() = runBlocking { 32 expect(1) 33 val completable = rxCompletable(currentDispatcher()) { 34 expect(4) 35 throw RuntimeException("OK") 36 } 37 expect(2) 38 completable.subscribe({ 39 expectUnreached() 40 }, { error -> 41 expect(5) 42 assertTrue(error is RuntimeException) 43 assertEquals("OK", error.message) 44 }) 45 expect(3) 46 yield() // to completable coroutine 47 finish(6) 48 } 49 50 @Test 51 fun testBasicUnsubscribe() = runBlocking { 52 expect(1) 53 val completable = rxCompletable(currentDispatcher()) { 54 expect(4) 55 yield() // back to main, will get cancelled 56 expectUnreached() 57 } 58 expect(2) 59 // nothing is called on a disposed rx2 completable 60 val sub = completable.subscribe({ 61 expectUnreached() 62 }, { 63 expectUnreached() 64 }) 65 expect(3) 66 yield() // to started coroutine 67 expect(5) 68 sub.dispose() // will cancel coroutine 69 yield() 70 finish(6) 71 } 72 73 @Test 74 fun testAwaitSuccess() = runBlocking { 75 expect(1) 76 val completable = rxCompletable(currentDispatcher()) { 77 expect(3) 78 } 79 expect(2) 80 completable.await() // shall launch coroutine 81 finish(4) 82 } 83 84 @Test 85 fun testAwaitFailure() = runBlocking { 86 expect(1) 87 val completable = rxCompletable(currentDispatcher()) { 88 expect(3) 89 throw RuntimeException("OK") 90 } 91 expect(2) 92 try { 93 completable.await() // shall launch coroutine and throw exception 94 expectUnreached() 95 } catch (e: RuntimeException) { 96 finish(4) 97 assertEquals("OK", e.message) 98 } 99 } 100 101 @Test 102 fun testSuppressedException() = runTest { 103 val completable = rxCompletable(currentDispatcher()) { 104 launch(start = CoroutineStart.ATOMIC) { 105 throw TestException() // child coroutine fails 106 } 107 try { 108 delay(Long.MAX_VALUE) 109 } finally { 110 throw TestException2() // but parent throws another exception while cleaning up 111 } 112 } 113 try { 114 completable.await() 115 expectUnreached() 116 } catch (e: TestException) { 117 assertTrue(e.suppressed[0] is TestException2) 118 } 119 } 120 121 @Test 122 fun testUnhandledException() = runTest() { 123 expect(1) 124 var disposable: Disposable? = null 125 val handler = { e: Throwable -> 126 assertTrue(e is UndeliverableException && e.cause is TestException) 127 expect(5) 128 } 129 val completable = rxCompletable(currentDispatcher()) { 130 expect(4) 131 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 132 try { 133 delay(Long.MAX_VALUE) 134 } finally { 135 throw TestException() // would not be able to handle it since mono is disposed 136 } 137 } 138 withExceptionHandler(handler) { 139 completable.subscribe(object : CompletableObserver { 140 override fun onSubscribe(d: Disposable) { 141 expect(2) 142 disposable = d 143 } 144 145 override fun onComplete() { 146 expectUnreached() 147 } 148 149 override fun onError(t: Throwable) { 150 expectUnreached() 151 } 152 }) 153 expect(3) 154 yield() // run coroutine 155 finish(6) 156 } 157 } 158 159 @Test 160 fun testFatalExceptionInSubscribe() = runTest { 161 val handler: (Throwable) -> Unit = { e -> 162 assertTrue(e is UndeliverableException && e.cause is LinkageError); expect(2) 163 } 164 165 withExceptionHandler(handler) { 166 rxCompletable(Dispatchers.Unconfined) { 167 expect(1) 168 42 169 }.subscribe({ throw LinkageError() }) 170 finish(3) 171 } 172 } 173 174 @Test 175 fun testFatalExceptionInSingle() = runTest { 176 rxCompletable(Dispatchers.Unconfined) { 177 throw LinkageError() 178 }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) 179 finish(2) 180 } 181 } 182