1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import io.reactivex.exceptions.* 10 import io.reactivex.functions.* 11 import kotlinx.coroutines.* 12 import org.junit.* 13 import org.junit.Test 14 import java.util.concurrent.* 15 import kotlin.test.* 16 17 class SingleTest : TestBase() { 18 @Before 19 fun setup() { 20 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 21 } 22 23 @Test 24 fun testBasicSuccess() = runBlocking { 25 expect(1) 26 val single = rxSingle(currentDispatcher()) { 27 expect(4) 28 "OK" 29 } 30 expect(2) 31 single.subscribe { value -> 32 expect(5) 33 assertEquals("OK", value) 34 } 35 expect(3) 36 yield() // to started coroutine 37 finish(6) 38 } 39 40 @Test 41 fun testBasicFailure() = runBlocking { 42 expect(1) 43 val single = rxSingle(currentDispatcher()) { 44 expect(4) 45 throw RuntimeException("OK") 46 } 47 expect(2) 48 single.subscribe({ 49 expectUnreached() 50 }, { error -> 51 expect(5) 52 assertTrue(error is RuntimeException) 53 assertEquals("OK", error.message) 54 }) 55 expect(3) 56 yield() // to started coroutine 57 finish(6) 58 } 59 60 61 @Test 62 fun testBasicUnsubscribe() = runBlocking { 63 expect(1) 64 val single = rxSingle(currentDispatcher()) { 65 expect(4) 66 yield() // back to main, will get cancelled 67 expectUnreached() 68 69 } 70 expect(2) 71 // nothing is called on a disposed rx2 single 72 val sub = single.subscribe({ 73 expectUnreached() 74 }, { 75 expectUnreached() 76 }) 77 expect(3) 78 yield() // to started coroutine 79 expect(5) 80 sub.dispose() // will cancel coroutine 81 yield() 82 finish(6) 83 } 84 85 @Test 86 fun testSingleNoWait() { 87 val single = rxSingle { 88 "OK" 89 } 90 91 checkSingleValue(single) { 92 assertEquals("OK", it) 93 } 94 } 95 96 @Test 97 fun testSingleAwait() = runBlocking { 98 assertEquals("OK", Single.just("O").await() + "K") 99 } 100 101 @Test 102 fun testSingleEmitAndAwait() { 103 val single = rxSingle { 104 Single.just("O").await() + "K" 105 } 106 107 checkSingleValue(single) { 108 assertEquals("OK", it) 109 } 110 } 111 112 @Test 113 fun testSingleWithDelay() { 114 val single = rxSingle { 115 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K" 116 } 117 118 checkSingleValue(single) { 119 assertEquals("OK", it) 120 } 121 } 122 123 @Test 124 fun testSingleException() { 125 val single = rxSingle { 126 Observable.just("O", "K").awaitSingle() + "K" 127 } 128 129 checkErroneous(single) { 130 assert(it is IllegalArgumentException) 131 } 132 } 133 134 @Test 135 fun testAwaitFirst() { 136 val single = rxSingle { 137 Observable.just("O", "#").awaitFirst() + "K" 138 } 139 140 checkSingleValue(single) { 141 assertEquals("OK", it) 142 } 143 } 144 145 @Test 146 fun testAwaitLast() { 147 val single = rxSingle { 148 Observable.just("#", "O").awaitLast() + "K" 149 } 150 151 checkSingleValue(single) { 152 assertEquals("OK", it) 153 } 154 } 155 156 @Test 157 fun testExceptionFromObservable() { 158 val single = rxSingle { 159 try { 160 Observable.error<String>(RuntimeException("O")).awaitFirst() 161 } catch (e: RuntimeException) { 162 Observable.just(e.message!!).awaitLast() + "K" 163 } 164 } 165 166 checkSingleValue(single) { 167 assertEquals("OK", it) 168 } 169 } 170 171 @Test 172 fun testExceptionFromCoroutine() { 173 val single = rxSingle<String> { 174 throw IllegalStateException(Observable.just("O").awaitSingle() + "K") 175 } 176 177 checkErroneous(single) { 178 assert(it is IllegalStateException) 179 assertEquals("OK", it.message) 180 } 181 } 182 183 @Test 184 fun testSuppressedException() = runTest { 185 val single = rxSingle(currentDispatcher()) { 186 launch(start = CoroutineStart.ATOMIC) { 187 throw TestException() // child coroutine fails 188 } 189 try { 190 delay(Long.MAX_VALUE) 191 } finally { 192 throw TestException2() // but parent throws another exception while cleaning up 193 } 194 } 195 try { 196 single.await() 197 expectUnreached() 198 } catch (e: TestException) { 199 assertTrue(e.suppressed[0] is TestException2) 200 } 201 } 202 203 @Test 204 fun testFatalExceptionInSubscribe() = runTest { 205 val handler = { e: Throwable -> 206 assertTrue(e is UndeliverableException && e.cause is LinkageError) 207 expect(2) 208 } 209 withExceptionHandler(handler) { 210 rxSingle(Dispatchers.Unconfined) { 211 expect(1) 212 42 213 }.subscribe(Consumer { 214 throw LinkageError() 215 }) 216 finish(3) 217 } 218 } 219 220 @Test 221 fun testFatalExceptionInSingle() = runTest { 222 rxSingle(Dispatchers.Unconfined) { 223 throw LinkageError() 224 }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) 225 226 finish(2) 227 } 228 229 @Test 230 fun testUnhandledException() = runTest { 231 expect(1) 232 var disposable: Disposable? = null 233 val handler = { e: Throwable -> 234 assertTrue(e is UndeliverableException && e.cause is TestException) 235 expect(5) 236 } 237 val single = rxSingle(currentDispatcher()) { 238 expect(4) 239 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 240 try { 241 delay(Long.MAX_VALUE) 242 } finally { 243 throw TestException() // would not be able to handle it since mono is disposed 244 } 245 } 246 withExceptionHandler(handler) { 247 single.subscribe(object : SingleObserver<Unit> { 248 override fun onSubscribe(d: Disposable) { 249 expect(2) 250 disposable = d 251 } 252 253 override fun onSuccess(t: Unit) { 254 expectUnreached() 255 } 256 257 override fun onError(t: Throwable) { 258 expectUnreached() 259 } 260 }) 261 expect(3) 262 yield() // run coroutine 263 finish(6) 264 } 265 } 266 } 267