1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.CancellationException 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.reactive.* 11 import org.junit.* 12 import org.junit.Test 13 import org.reactivestreams.* 14 import reactor.core.publisher.* 15 import reactor.util.context.* 16 import java.time.Duration.* 17 import java.util.function.* 18 import kotlin.test.* 19 20 class MonoTest : TestBase() { 21 @Before 22 fun setup() { 23 ignoreLostThreads("timer-", "parallel-") 24 Hooks.onErrorDropped { expectUnreached() } 25 } 26 27 @Test 28 fun testBasicSuccess() = runBlocking { 29 expect(1) 30 val mono = mono(currentDispatcher()) { 31 expect(4) 32 "OK" 33 } 34 expect(2) 35 mono.subscribe { value -> 36 expect(5) 37 assertEquals("OK", value) 38 } 39 expect(3) 40 yield() // to started coroutine 41 finish(6) 42 } 43 44 @Test 45 fun testBasicFailure() = runBlocking { 46 expect(1) 47 val mono = mono(currentDispatcher()) { 48 expect(4) 49 throw RuntimeException("OK") 50 } 51 expect(2) 52 mono.subscribe({ 53 expectUnreached() 54 }, { error -> 55 expect(5) 56 assertTrue(error is RuntimeException) 57 assertEquals("OK", error.message) 58 }) 59 expect(3) 60 yield() // to started coroutine 61 finish(6) 62 } 63 64 @Test 65 fun testBasicEmpty() = runBlocking { 66 expect(1) 67 val mono = mono(currentDispatcher()) { 68 expect(4) 69 null 70 } 71 expect(2) 72 mono.subscribe({}, { throw it }, { 73 expect(5) 74 }) 75 expect(3) 76 yield() // to started coroutine 77 finish(6) 78 } 79 80 @Test 81 fun testBasicUnsubscribe() = runBlocking { 82 expect(1) 83 val mono = mono(currentDispatcher()) { 84 expect(4) 85 yield() // back to main, will get cancelled 86 expectUnreached() 87 } 88 expect(2) 89 // nothing is called on a disposed mono 90 val sub = mono.subscribe({ 91 expectUnreached() 92 }, { 93 expectUnreached() 94 }) 95 expect(3) 96 yield() // to started coroutine 97 expect(5) 98 sub.dispose() // will cancel coroutine 99 yield() 100 finish(6) 101 } 102 103 @Test 104 fun testMonoNoWait() { 105 val mono = mono { 106 "OK" 107 } 108 109 checkMonoValue(mono) { 110 assertEquals("OK", it) 111 } 112 } 113 114 @Test 115 fun testMonoAwait() = runBlocking { 116 assertEquals("OK", Mono.just("O").awaitSingle() + "K") 117 assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K") 118 assertFailsWith<NoSuchElementException>{ Mono.empty<String>().awaitSingle() } 119 assertNull(Mono.empty<Int>().awaitSingleOrNull()) 120 } 121 122 /** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException] 123 * and unsubscribe from the publisher when their [Job] is cancelled. */ 124 @Test 125 fun testAwaitCancellation() = runTest { 126 expect(1) 127 val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) } 128 val job = launch(start = CoroutineStart.UNDISPATCHED) { 129 try { 130 expect(2) 131 mono.awaitSingleOrNull() 132 } catch (e: CancellationException) { 133 expect(6) 134 throw e 135 } 136 } 137 expect(4) 138 job.cancelAndJoin() 139 finish(7) 140 } 141 142 @Test 143 fun testMonoEmitAndAwait() { 144 val mono = mono { 145 Mono.just("O").awaitSingle() + "K" 146 } 147 148 checkMonoValue(mono) { 149 assertEquals("OK", it) 150 } 151 } 152 153 @Test 154 fun testMonoWithDelay() { 155 val mono = mono { 156 Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K" 157 } 158 159 checkMonoValue(mono) { 160 assertEquals("OK", it) 161 } 162 } 163 164 @Test 165 fun testMonoException() { 166 val mono = mono { 167 Flux.just("O", "K").awaitSingle() + "K" 168 } 169 170 checkErroneous(mono) { 171 assert(it is IllegalArgumentException) 172 } 173 } 174 175 @Test 176 fun testAwaitFirst() { 177 val mono = mono { 178 Flux.just("O", "#").awaitFirst() + "K" 179 } 180 181 checkMonoValue(mono) { 182 assertEquals("OK", it) 183 } 184 } 185 186 @Test 187 fun testAwaitLast() { 188 val mono = mono { 189 Flux.just("#", "O").awaitLast() + "K" 190 } 191 192 checkMonoValue(mono) { 193 assertEquals("OK", it) 194 } 195 } 196 197 @Test 198 fun testExceptionFromFlux() { 199 val mono = mono { 200 try { 201 Flux.error<String>(RuntimeException("O")).awaitFirst() 202 } catch (e: RuntimeException) { 203 Flux.just(e.message!!).awaitLast() + "K" 204 } 205 } 206 207 checkMonoValue(mono) { 208 assertEquals("OK", it) 209 } 210 } 211 212 @Test 213 fun testExceptionFromCoroutine() { 214 val mono = mono<String> { 215 throw IllegalStateException(Flux.just("O").awaitSingle() + "K") 216 } 217 218 checkErroneous(mono) { 219 assert(it is IllegalStateException) 220 assertEquals("OK", it.message) 221 } 222 } 223 224 @Test 225 fun testSuppressedException() = runTest { 226 val mono = mono(currentDispatcher()) { 227 launch(start = CoroutineStart.ATOMIC) { 228 throw TestException() // child coroutine fails 229 } 230 try { 231 delay(Long.MAX_VALUE) 232 } finally { 233 throw TestException2() // but parent throws another exception while cleaning up 234 } 235 } 236 try { 237 mono.awaitSingle() 238 expectUnreached() 239 } catch (e: TestException) { 240 assertTrue(e.suppressed[0] is TestException2) 241 } 242 } 243 244 @Test 245 fun testUnhandledException() = runTest { 246 expect(1) 247 var subscription: Subscription? = null 248 val handler = BiFunction<Throwable, Any?, Throwable> { t, _ -> 249 assertTrue(t is TestException) 250 expect(5) 251 t 252 } 253 254 val mono = mono(currentDispatcher()) { 255 expect(4) 256 subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled 257 try { 258 delay(Long.MAX_VALUE) 259 } finally { 260 throw TestException() // would not be able to handle it since mono is disposed 261 } 262 }.contextWrite { Context.of("reactor.onOperatorError.local", handler) } 263 mono.subscribe(object : Subscriber<Unit> { 264 override fun onSubscribe(s: Subscription) { 265 expect(2) 266 subscription = s 267 } 268 override fun onNext(t: Unit?) { expectUnreached() } 269 override fun onComplete() { expectUnreached() } 270 override fun onError(t: Throwable) { expectUnreached() } 271 }) 272 expect(3) 273 yield() // run coroutine 274 finish(6) 275 } 276 277 @Test 278 fun testIllegalArgumentException() { 279 assertFailsWith<IllegalArgumentException> { mono(Job()) { } } 280 } 281 282 @Test 283 fun testExceptionAfterCancellation() = runTest { 284 // Test exception is not reported to global handler 285 Flux 286 .interval(ofMillis(1)) 287 .switchMap { 288 mono(coroutineContext) { 289 timeBomb().awaitSingle() 290 } 291 } 292 .onErrorReturn({ 293 expect(1) 294 true 295 }, 42) 296 .blockLast() 297 finish(2) 298 } 299 300 private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } 301 302 @Test 303 fun testLeakedException() = runBlocking { 304 // Test exception is not reported to global handler 305 val flow = mono<Unit> { throw TestException() }.toFlux().asFlow() 306 repeat(10000) { 307 combine(flow, flow) { _, _ -> } 308 .catch {} 309 .collect { } 310 } 311 } 312 313 /** Test that cancelling a [mono] due to a timeout does throw an exception. */ 314 @Test 315 fun testTimeout() { 316 val mono = mono { 317 withTimeout(1) { delay(100) } 318 } 319 try { 320 mono.doOnSubscribe { expect(1) } 321 .doOnNext { expectUnreached() } 322 .doOnSuccess { expectUnreached() } 323 .doOnError { expect(2) } 324 .doOnCancel { expectUnreached() } 325 .block() 326 } catch (e: CancellationException) { 327 expect(3) 328 } 329 finish(4) 330 } 331 332 /** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore, 333 * this is considered normal behavior and exceptions are not propagated. */ 334 @Test 335 fun testDownstreamCancellationDoesNotThrow() = runTest { 336 var i = 0 337 /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it 338 * to be fired in this case, as the reason for the publisher in this test to accept an exception is simply 339 * cancellation from the downstream. */ 340 Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a -> 341 expectUnreached() 342 t 343 } 344 /** A Mono that doesn't emit a value and instead waits indefinitely. */ 345 val mono = mono(Dispatchers.Unconfined) { expect(5 * i + 3); delay(Long.MAX_VALUE) } 346 .doOnSubscribe { expect(5 * i + 2) } 347 .doOnNext { expectUnreached() } 348 .doOnSuccess { expectUnreached() } 349 .doOnError { expectUnreached() } 350 .doOnCancel { expect(5 * i + 4) } 351 val n = 1000 352 repeat(n) { 353 i = it 354 expect(5 * i + 1) 355 mono.awaitCancelAndJoin() 356 expect(5 * i + 5) 357 } 358 finish(5 * n + 1) 359 Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") 360 } 361 362 /** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting 363 * error is propagated to [Hooks.onOperatorError]. */ 364 @Test 365 fun testRethrowingDownstreamCancellation() = runTest { 366 var i = 0 367 /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it 368 * to be fired in this case. */ 369 Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a -> 370 expect(i * 6 + 5) 371 t 372 } 373 /** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */ 374 val mono = mono(Dispatchers.Unconfined) { 375 expect(i * 6 + 3) 376 try { 377 delay(Long.MAX_VALUE) 378 } catch (e: CancellationException) { 379 throw TestException() 380 } 381 } 382 .doOnSubscribe { expect(i * 6 + 2) } 383 .doOnNext { expectUnreached() } 384 .doOnSuccess { expectUnreached() } 385 .doOnError { expectUnreached() } 386 .doOnCancel { expect(i * 6 + 4) } 387 val n = 1000 388 repeat(n) { 389 i = it 390 expect(i * 6 + 1) 391 mono.awaitCancelAndJoin() 392 expect(i * 6 + 6) 393 } 394 finish(n * 6 + 1) 395 Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") 396 } 397 398 /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then. 399 * 400 * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to 401 * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */ 402 private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope { 403 async(start = CoroutineStart.UNDISPATCHED) { 404 awaitSingleOrNull() 405 }.cancelAndJoin() 406 } 407 } 408