1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.disposables.* 9 import io.reactivex.rxjava3.exceptions.* 10 import io.reactivex.rxjava3.functions.* 11 import kotlinx.coroutines.* 12 import kotlinx.coroutines.CancellationException 13 import org.junit.* 14 import org.junit.Test 15 import java.util.concurrent.* 16 import kotlin.test.* 17 18 class SingleTest : TestBase() { 19 @Before 20 fun setup() { 21 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 22 } 23 24 @Test 25 fun testBasicSuccess() = runBlocking { 26 expect(1) 27 val single = rxSingle(currentDispatcher()) { 28 expect(4) 29 "OK" 30 } 31 expect(2) 32 single.subscribe { value -> 33 expect(5) 34 assertEquals("OK", value) 35 } 36 expect(3) 37 yield() // to started coroutine 38 finish(6) 39 } 40 41 @Test 42 fun testBasicFailure() = runBlocking { 43 expect(1) 44 val single = rxSingle(currentDispatcher()) { 45 expect(4) 46 throw RuntimeException("OK") 47 } 48 expect(2) 49 single.subscribe({ 50 expectUnreached() 51 }, { error -> 52 expect(5) 53 assertTrue(error is RuntimeException) 54 assertEquals("OK", error.message) 55 }) 56 expect(3) 57 yield() // to started coroutine 58 finish(6) 59 } 60 61 62 @Test 63 fun testBasicUnsubscribe() = runBlocking { 64 expect(1) 65 val single = rxSingle(currentDispatcher()) { 66 expect(4) 67 yield() // back to main, will get cancelled 68 expectUnreached() 69 70 } 71 expect(2) 72 // nothing is called on a disposed rx3 single 73 val sub = single.subscribe({ 74 expectUnreached() 75 }, { 76 expectUnreached() 77 }) 78 expect(3) 79 yield() // to started coroutine 80 expect(5) 81 sub.dispose() // will cancel coroutine 82 yield() 83 finish(6) 84 } 85 86 @Test 87 fun testSingleNoWait() { 88 val single = rxSingle { 89 "OK" 90 } 91 92 checkSingleValue(single) { 93 assertEquals("OK", it) 94 } 95 } 96 97 @Test 98 fun testSingleAwait() = runBlocking { 99 assertEquals("OK", Single.just("O").await() + "K") 100 } 101 102 /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their 103 * [Job] is cancelled. */ 104 @Test 105 fun testSingleAwaitCancellation() = runTest { 106 expect(1) 107 val single = SingleSource<Int> { s -> 108 s.onSubscribe(object: Disposable { 109 override fun dispose() { expect(4) } 110 override fun isDisposed(): Boolean { expectUnreached(); return false } 111 }) 112 } 113 val job = launch(start = CoroutineStart.UNDISPATCHED) { 114 try { 115 expect(2) 116 single.await() 117 } catch (e: CancellationException) { 118 expect(5) 119 throw e 120 } 121 } 122 expect(3) 123 job.cancelAndJoin() 124 finish(6) 125 } 126 127 @Test 128 fun testSingleEmitAndAwait() { 129 val single = rxSingle { 130 Single.just("O").await() + "K" 131 } 132 133 checkSingleValue(single) { 134 assertEquals("OK", it) 135 } 136 } 137 138 @Test 139 fun testSingleWithDelay() { 140 val single = rxSingle { 141 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K" 142 } 143 144 checkSingleValue(single) { 145 assertEquals("OK", it) 146 } 147 } 148 149 @Test 150 fun testSingleException() { 151 val single = rxSingle { 152 Observable.just("O", "K").awaitSingle() + "K" 153 } 154 155 checkErroneous(single) { 156 assert(it is IllegalArgumentException) 157 } 158 } 159 160 @Test 161 fun testAwaitFirst() { 162 val single = rxSingle { 163 Observable.just("O", "#").awaitFirst() + "K" 164 } 165 166 checkSingleValue(single) { 167 assertEquals("OK", it) 168 } 169 } 170 171 @Test 172 fun testAwaitLast() { 173 val single = rxSingle { 174 Observable.just("#", "O").awaitLast() + "K" 175 } 176 177 checkSingleValue(single) { 178 assertEquals("OK", it) 179 } 180 } 181 182 @Test 183 fun testExceptionFromObservable() { 184 val single = rxSingle { 185 try { 186 Observable.error<String>(RuntimeException("O")).awaitFirst() 187 } catch (e: RuntimeException) { 188 Observable.just(e.message!!).awaitLast() + "K" 189 } 190 } 191 192 checkSingleValue(single) { 193 assertEquals("OK", it) 194 } 195 } 196 197 @Test 198 fun testExceptionFromCoroutine() { 199 val single = rxSingle<String> { 200 throw IllegalStateException(Observable.just("O").awaitSingle() + "K") 201 } 202 203 checkErroneous(single) { 204 assert(it is IllegalStateException) 205 assertEquals("OK", it.message) 206 } 207 } 208 209 @Test 210 fun testSuppressedException() = runTest { 211 val single = rxSingle(currentDispatcher()) { 212 launch(start = CoroutineStart.ATOMIC) { 213 throw TestException() // child coroutine fails 214 } 215 try { 216 delay(Long.MAX_VALUE) 217 } finally { 218 throw TestException2() // but parent throws another exception while cleaning up 219 } 220 } 221 try { 222 single.await() 223 expectUnreached() 224 } catch (e: TestException) { 225 assertTrue(e.suppressed[0] is TestException2) 226 } 227 } 228 229 @Test 230 fun testFatalExceptionInSubscribe() = runTest { 231 val handler = { e: Throwable -> 232 assertTrue(e is UndeliverableException && e.cause is LinkageError) 233 expect(2) 234 } 235 withExceptionHandler(handler) { 236 rxSingle(Dispatchers.Unconfined) { 237 expect(1) 238 42 239 }.subscribe(Consumer { 240 throw LinkageError() 241 }) 242 finish(3) 243 } 244 } 245 246 @Test 247 fun testFatalExceptionInSingle() = runTest { 248 rxSingle(Dispatchers.Unconfined) { 249 throw LinkageError() 250 }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } 251 252 finish(2) 253 } 254 255 @Test 256 fun testUnhandledException() = runTest { 257 expect(1) 258 var disposable: Disposable? = null 259 val handler = { e: Throwable -> 260 assertTrue(e is UndeliverableException && e.cause is TestException) 261 expect(5) 262 } 263 val single = rxSingle(currentDispatcher()) { 264 expect(4) 265 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 266 try { 267 delay(Long.MAX_VALUE) 268 } finally { 269 throw TestException() // would not be able to handle it since mono is disposed 270 } 271 } 272 withExceptionHandler(handler) { 273 single.subscribe(object : SingleObserver<Unit> { 274 override fun onSubscribe(d: Disposable) { 275 expect(2) 276 disposable = d 277 } 278 279 override fun onSuccess(t: Unit) { 280 expectUnreached() 281 } 282 283 override fun onError(t: Throwable) { 284 expectUnreached() 285 } 286 }) 287 expect(3) 288 yield() // run coroutine 289 finish(6) 290 } 291 } 292 } 293