1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import io.reactivex.exceptions.* 10 import io.reactivex.internal.functions.Functions.* 11 import kotlinx.coroutines.* 12 import kotlinx.coroutines.CancellationException 13 import org.junit.* 14 import org.junit.Test 15 import java.util.concurrent.* 16 import kotlin.test.* 17 18 class MaybeTest : TestBase() { 19 @Before 20 fun setup() { 21 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 22 } 23 24 @Test 25 fun testBasicSuccess() = runBlocking { 26 expect(1) 27 val maybe = rxMaybe(currentDispatcher()) { 28 expect(4) 29 "OK" 30 } 31 expect(2) 32 maybe.subscribe { value -> 33 expect(5) 34 assertEquals("OK", value) 35 } 36 expect(3) 37 yield() // to started coroutine 38 finish(6) 39 } 40 41 @Test 42 fun testBasicEmpty() = runBlocking { 43 expect(1) 44 val maybe = rxMaybe(currentDispatcher()) { 45 expect(4) 46 null 47 } 48 expect(2) 49 maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { 50 expect(5) 51 }) 52 expect(3) 53 yield() // to started coroutine 54 finish(6) 55 } 56 57 @Test 58 fun testBasicFailure() = runBlocking { 59 expect(1) 60 val maybe = rxMaybe(currentDispatcher()) { 61 expect(4) 62 throw RuntimeException("OK") 63 } 64 expect(2) 65 maybe.subscribe({ 66 expectUnreached() 67 }, { error -> 68 expect(5) 69 assertTrue(error is RuntimeException) 70 assertEquals("OK", error.message) 71 }) 72 expect(3) 73 yield() // to started coroutine 74 finish(6) 75 } 76 77 78 @Test 79 fun testBasicUnsubscribe() = runBlocking { 80 expect(1) 81 val maybe = rxMaybe(currentDispatcher()) { 82 expect(4) 83 yield() // back to main, will get cancelled 84 expectUnreached() 85 } 86 expect(2) 87 // nothing is called on a disposed rx2 maybe 88 val sub = maybe.subscribe({ 89 expectUnreached() 90 }, { 91 expectUnreached() 92 }) 93 expect(3) 94 yield() // to started coroutine 95 expect(5) 96 sub.dispose() // will cancel coroutine 97 yield() 98 finish(6) 99 } 100 101 @Test 102 fun testMaybeNoWait() { 103 val maybe = rxMaybe { 104 "OK" 105 } 106 107 checkMaybeValue(maybe) { 108 assertEquals("OK", it) 109 } 110 } 111 112 @Test 113 fun testMaybeAwait() = runBlocking { 114 assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") 115 assertEquals("OK", Maybe.just("O").awaitSingle() + "K") 116 } 117 118 @Test 119 fun testMaybeAwaitForNull(): Unit = runBlocking { 120 assertNull(Maybe.empty<String>().awaitSingleOrNull()) 121 assertFailsWith<NoSuchElementException> { Maybe.empty<String>().awaitSingle() } 122 } 123 124 /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their 125 * [Job] is cancelled. */ 126 @Test 127 fun testMaybeAwaitCancellation() = runTest { 128 expect(1) 129 val maybe = MaybeSource<Int> { s -> 130 s.onSubscribe(object: Disposable { 131 override fun dispose() { expect(4) } 132 override fun isDisposed(): Boolean { expectUnreached(); return false } 133 }) 134 } 135 val job = launch(start = CoroutineStart.UNDISPATCHED) { 136 try { 137 expect(2) 138 maybe.awaitSingleOrNull() 139 } catch (e: CancellationException) { 140 expect(5) 141 throw e 142 } 143 } 144 expect(3) 145 job.cancelAndJoin() 146 finish(6) 147 } 148 149 @Test 150 fun testMaybeEmitAndAwait() { 151 val maybe = rxMaybe { 152 Maybe.just("O").awaitSingleOrNull() + "K" 153 } 154 155 checkMaybeValue(maybe) { 156 assertEquals("OK", it) 157 } 158 } 159 160 @Test 161 fun testMaybeWithDelay() { 162 val maybe = rxMaybe { 163 Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K" 164 } 165 166 checkMaybeValue(maybe) { 167 assertEquals("OK", it) 168 } 169 } 170 171 @Test 172 fun testMaybeException() { 173 val maybe = rxMaybe { 174 Observable.just("O", "K").awaitSingle() + "K" 175 } 176 177 checkErroneous(maybe) { 178 assert(it is IllegalArgumentException) 179 } 180 } 181 182 @Test 183 fun testAwaitFirst() { 184 val maybe = rxMaybe { 185 Observable.just("O", "#").awaitFirst() + "K" 186 } 187 188 checkMaybeValue(maybe) { 189 assertEquals("OK", it) 190 } 191 } 192 193 @Test 194 fun testAwaitLast() { 195 val maybe = rxMaybe { 196 Observable.just("#", "O").awaitLast() + "K" 197 } 198 199 checkMaybeValue(maybe) { 200 assertEquals("OK", it) 201 } 202 } 203 204 @Test 205 fun testExceptionFromObservable() { 206 val maybe = rxMaybe { 207 try { 208 Observable.error<String>(RuntimeException("O")).awaitFirst() 209 } catch (e: RuntimeException) { 210 Observable.just(e.message!!).awaitLast() + "K" 211 } 212 } 213 214 checkMaybeValue(maybe) { 215 assertEquals("OK", it) 216 } 217 } 218 219 @Test 220 fun testExceptionFromCoroutine() { 221 val maybe = rxMaybe<String> { 222 throw IllegalStateException(Observable.just("O").awaitSingle() + "K") 223 } 224 225 checkErroneous(maybe) { 226 assert(it is IllegalStateException) 227 assertEquals("OK", it.message) 228 } 229 } 230 231 @Test 232 fun testCancelledConsumer() = runTest { 233 expect(1) 234 val maybe = rxMaybe(currentDispatcher()) { 235 expect(4) 236 try { 237 delay(Long.MAX_VALUE) 238 } catch (e: CancellationException) { 239 expect(6) 240 } 241 42 242 } 243 expect(2) 244 val timeout = withTimeoutOrNull(100) { 245 expect(3) 246 maybe.collect { 247 expectUnreached() 248 } 249 expectUnreached() 250 } 251 assertNull(timeout) 252 expect(5) 253 yield() // must cancel code inside maybe!!! 254 finish(7) 255 } 256 257 /** Tests the simple scenario where the Maybe doesn't output a value. */ 258 @Test 259 fun testMaybeCollectEmpty() = runTest { 260 expect(1) 261 Maybe.empty<Int>().collect { 262 expectUnreached() 263 } 264 finish(2) 265 } 266 267 /** Tests the simple scenario where the Maybe doesn't output a value. */ 268 @Test 269 fun testMaybeCollectSingle() = runTest { 270 expect(1) 271 Maybe.just("OK").collect { 272 assertEquals("OK", it) 273 expect(2) 274 } 275 finish(3) 276 } 277 278 /** Tests the behavior of [collect] when the Maybe raises an error. */ 279 @Test 280 fun testMaybeCollectThrowingMaybe() = runTest { 281 expect(1) 282 try { 283 Maybe.error<Int>(TestException()).collect { 284 expectUnreached() 285 } 286 } catch (e: TestException) { 287 expect(2) 288 } 289 finish(3) 290 } 291 292 /** Tests the behavior of [collect] when the action throws. */ 293 @Test 294 fun testMaybeCollectThrowingAction() = runTest { 295 expect(1) 296 try { 297 Maybe.just("OK").collect { 298 expect(2) 299 throw TestException() 300 } 301 } catch (e: TestException) { 302 expect(3) 303 } 304 finish(4) 305 } 306 307 @Test 308 fun testSuppressedException() = runTest { 309 val maybe = rxMaybe(currentDispatcher()) { 310 launch(start = CoroutineStart.ATOMIC) { 311 throw TestException() // child coroutine fails 312 } 313 try { 314 delay(Long.MAX_VALUE) 315 } finally { 316 throw TestException2() // but parent throws another exception while cleaning up 317 } 318 } 319 try { 320 maybe.awaitSingleOrNull() 321 expectUnreached() 322 } catch (e: TestException) { 323 assertTrue(e.suppressed[0] is TestException2) 324 } 325 } 326 327 @Test 328 fun testUnhandledException() = runTest { 329 expect(1) 330 var disposable: Disposable? = null 331 val handler = { e: Throwable -> 332 assertTrue(e is UndeliverableException && e.cause is TestException) 333 expect(5) 334 } 335 val maybe = rxMaybe(currentDispatcher()) { 336 expect(4) 337 disposable!!.dispose() // cancel our own subscription, so that delay will get cancelled 338 try { 339 delay(Long.MAX_VALUE) 340 } finally { 341 throw TestException() // would not be able to handle it since mono is disposed 342 } 343 } 344 withExceptionHandler(handler) { 345 maybe.subscribe(object : MaybeObserver<Unit> { 346 override fun onSubscribe(d: Disposable) { 347 expect(2) 348 disposable = d 349 } 350 351 override fun onComplete() { 352 expectUnreached() 353 } 354 355 override fun onSuccess(t: Unit) { 356 expectUnreached() 357 } 358 359 override fun onError(t: Throwable) { 360 expectUnreached() 361 } 362 }) 363 expect(3) 364 yield() // run coroutine 365 finish(6) 366 } 367 } 368 369 @Test 370 fun testFatalExceptionInSubscribe() = runTest { 371 val handler = { e: Throwable -> 372 assertTrue(e is UndeliverableException && e.cause is LinkageError) 373 expect(2) 374 } 375 376 withExceptionHandler(handler) { 377 rxMaybe(Dispatchers.Unconfined) { 378 expect(1) 379 42 380 }.subscribe { throw LinkageError() } 381 finish(3) 382 } 383 } 384 385 @Test 386 fun testFatalExceptionInSingle() = runTest { 387 rxMaybe(Dispatchers.Unconfined) { 388 throw LinkageError() 389 }.subscribe({ expectUnreached() }, { expect(1); assertTrue(it is LinkageError) }) 390 finish(2) 391 } 392 } 393