• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.CancellationException
9 import kotlinx.coroutines.flow.*
10 import kotlinx.coroutines.reactive.*
11 import org.junit.*
12 import org.junit.Test
13 import org.reactivestreams.*
14 import reactor.core.publisher.*
15 import reactor.util.context.*
16 import java.time.Duration.*
17 import java.util.function.*
18 import kotlin.test.*
19 
20 class MonoTest : TestBase() {
21     @Before
22     fun setup() {
23         ignoreLostThreads("timer-", "parallel-")
24         Hooks.onErrorDropped { expectUnreached() }
25     }
26 
27     @Test
28     fun testBasicSuccess() = runBlocking {
29         expect(1)
30         val mono = mono(currentDispatcher()) {
31             expect(4)
32             "OK"
33         }
34         expect(2)
35         mono.subscribe { value ->
36             expect(5)
37             assertEquals("OK", value)
38         }
39         expect(3)
40         yield() // to started coroutine
41         finish(6)
42     }
43 
44     @Test
45     fun testBasicFailure() = runBlocking {
46         expect(1)
47         val mono = mono(currentDispatcher()) {
48             expect(4)
49             throw RuntimeException("OK")
50         }
51         expect(2)
52         mono.subscribe({
53             expectUnreached()
54         }, { error ->
55             expect(5)
56             assertTrue(error is RuntimeException)
57             assertEquals("OK", error.message)
58         })
59         expect(3)
60         yield() // to started coroutine
61         finish(6)
62     }
63 
64     @Test
65     fun testBasicEmpty() = runBlocking {
66         expect(1)
67         val mono = mono(currentDispatcher()) {
68             expect(4)
69             null
70         }
71         expect(2)
72         mono.subscribe({}, { throw it }, {
73             expect(5)
74         })
75         expect(3)
76         yield() // to started coroutine
77         finish(6)
78     }
79 
80     @Test
81     fun testBasicUnsubscribe() = runBlocking {
82         expect(1)
83         val mono = mono(currentDispatcher()) {
84             expect(4)
85             yield() // back to main, will get cancelled
86             expectUnreached()
87         }
88         expect(2)
89         // nothing is called on a disposed mono
90         val sub = mono.subscribe({
91             expectUnreached()
92         }, {
93             expectUnreached()
94         })
95         expect(3)
96         yield() // to started coroutine
97         expect(5)
98         sub.dispose() // will cancel coroutine
99         yield()
100         finish(6)
101     }
102 
103     @Test
104     fun testMonoNoWait() {
105         val mono = mono {
106             "OK"
107         }
108 
109         checkMonoValue(mono) {
110             assertEquals("OK", it)
111         }
112     }
113 
114     @Test
115     fun testMonoAwait() = runBlocking {
116         assertEquals("OK", Mono.just("O").awaitSingle() + "K")
117         assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K")
118         assertFailsWith<NoSuchElementException>{ Mono.empty<String>().awaitSingle() }
119         assertNull(Mono.empty<Int>().awaitSingleOrNull())
120     }
121 
122     /** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException]
123      * and unsubscribe from the publisher when their [Job] is cancelled. */
124     @Test
125     fun testAwaitCancellation() = runTest {
126         expect(1)
127         val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) }
128         val job = launch(start = CoroutineStart.UNDISPATCHED) {
129             try {
130                 expect(2)
131                 mono.awaitSingleOrNull()
132             } catch (e: CancellationException) {
133                 expect(6)
134                 throw e
135             }
136         }
137         expect(4)
138         job.cancelAndJoin()
139         finish(7)
140     }
141 
142     @Test
143     fun testMonoEmitAndAwait() {
144         val mono = mono {
145             Mono.just("O").awaitSingle() + "K"
146         }
147 
148         checkMonoValue(mono) {
149             assertEquals("OK", it)
150         }
151     }
152 
153     @Test
154     fun testMonoWithDelay() {
155         val mono = mono {
156             Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K"
157         }
158 
159         checkMonoValue(mono) {
160             assertEquals("OK", it)
161         }
162     }
163 
164     @Test
165     fun testMonoException() {
166         val mono = mono {
167             Flux.just("O", "K").awaitSingle() + "K"
168         }
169 
170         checkErroneous(mono) {
171             assert(it is IllegalArgumentException)
172         }
173     }
174 
175     @Test
176     fun testAwaitFirst() {
177         val mono = mono {
178             Flux.just("O", "#").awaitFirst() + "K"
179         }
180 
181         checkMonoValue(mono) {
182             assertEquals("OK", it)
183         }
184     }
185 
186     @Test
187     fun testAwaitLast() {
188         val mono = mono {
189             Flux.just("#", "O").awaitLast() + "K"
190         }
191 
192         checkMonoValue(mono) {
193             assertEquals("OK", it)
194         }
195     }
196 
197     @Test
198     fun testExceptionFromFlux() {
199         val mono = mono {
200             try {
201                 Flux.error<String>(RuntimeException("O")).awaitFirst()
202             } catch (e: RuntimeException) {
203                 Flux.just(e.message!!).awaitLast() + "K"
204             }
205         }
206 
207         checkMonoValue(mono) {
208             assertEquals("OK", it)
209         }
210     }
211 
212     @Test
213     fun testExceptionFromCoroutine() {
214         val mono = mono<String> {
215             throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
216         }
217 
218         checkErroneous(mono) {
219             assert(it is IllegalStateException)
220             assertEquals("OK", it.message)
221         }
222     }
223 
224     @Test
225     fun testSuppressedException() = runTest {
226         val mono = mono(currentDispatcher()) {
227             launch(start = CoroutineStart.ATOMIC) {
228                 throw TestException() // child coroutine fails
229             }
230             try {
231                 delay(Long.MAX_VALUE)
232             } finally {
233                 throw TestException2() // but parent throws another exception while cleaning up
234             }
235         }
236         try {
237             mono.awaitSingle()
238             expectUnreached()
239         } catch (e: TestException) {
240             assertTrue(e.suppressed[0] is TestException2)
241         }
242     }
243 
244     @Test
245     fun testUnhandledException() = runTest {
246         expect(1)
247         var subscription: Subscription? = null
248         val handler = BiFunction<Throwable, Any?, Throwable> { t, _ ->
249             assertTrue(t is TestException)
250             expect(5)
251             t
252         }
253 
254         val mono = mono(currentDispatcher()) {
255             expect(4)
256             subscription!!.cancel() // cancel our own subscription, so that delay will get cancelled
257             try {
258                 delay(Long.MAX_VALUE)
259             } finally {
260                 throw TestException() // would not be able to handle it since mono is disposed
261             }
262         }.contextWrite { Context.of("reactor.onOperatorError.local", handler) }
263         mono.subscribe(object : Subscriber<Unit> {
264             override fun onSubscribe(s: Subscription) {
265                 expect(2)
266                 subscription = s
267             }
268             override fun onNext(t: Unit?) { expectUnreached() }
269             override fun onComplete() { expectUnreached() }
270             override fun onError(t: Throwable) { expectUnreached() }
271         })
272         expect(3)
273         yield() // run coroutine
274         finish(6)
275     }
276 
277     @Test
278     fun testIllegalArgumentException() {
279         assertFailsWith<IllegalArgumentException> { mono(Job()) { } }
280     }
281 
282     @Test
283     fun testExceptionAfterCancellation() = runTest {
284         // Test exception is not reported to global handler
285         Flux
286             .interval(ofMillis(1))
287             .switchMap {
288                 mono(coroutineContext) {
289                     timeBomb().awaitSingle()
290                 }
291             }
292             .onErrorReturn({
293                 expect(1)
294                 true
295             }, 42)
296             .blockLast()
297         finish(2)
298     }
299 
300     private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") }
301 
302     @Test
303     fun testLeakedException() = runBlocking {
304         // Test exception is not reported to global handler
305         val flow = mono<Unit> { throw TestException() }.toFlux().asFlow()
306         repeat(10000) {
307             combine(flow, flow) { _, _ -> }
308                 .catch {}
309                 .collect { }
310         }
311     }
312 
313     /** Test that cancelling a [mono] due to a timeout does throw an exception. */
314     @Test
315     fun testTimeout() {
316         val mono = mono {
317             withTimeout(1) { delay(100) }
318         }
319         try {
320             mono.doOnSubscribe { expect(1) }
321                 .doOnNext { expectUnreached() }
322                 .doOnSuccess { expectUnreached() }
323                 .doOnError { expect(2) }
324                 .doOnCancel { expectUnreached() }
325                 .block()
326         } catch (e: CancellationException) {
327             expect(3)
328         }
329         finish(4)
330     }
331 
332     /** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore,
333      * this is considered normal behavior and exceptions are not propagated. */
334     @Test
335     fun testDownstreamCancellationDoesNotThrow() = runTest {
336         var i = 0
337         /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
338          * to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
339          * cancellation from the downstream. */
340         Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
341             expectUnreached()
342             t
343         }
344         /** A Mono that doesn't emit a value and instead waits indefinitely. */
345         val mono = mono(Dispatchers.Unconfined) { expect(5 * i + 3); delay(Long.MAX_VALUE) }
346             .doOnSubscribe { expect(5 * i + 2) }
347             .doOnNext { expectUnreached() }
348             .doOnSuccess { expectUnreached() }
349             .doOnError { expectUnreached() }
350             .doOnCancel { expect(5 * i + 4) }
351         val n = 1000
352         repeat(n) {
353             i = it
354             expect(5 * i + 1)
355             mono.awaitCancelAndJoin()
356             expect(5 * i + 5)
357         }
358         finish(5 * n + 1)
359         Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
360     }
361 
362     /** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
363      * error is propagated to [Hooks.onOperatorError]. */
364     @Test
365     fun testRethrowingDownstreamCancellation() = runTest {
366         var i = 0
367         /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
368          * to be fired in this case. */
369         Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
370             expect(i * 6 + 5)
371             t
372         }
373         /** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
374         val mono = mono(Dispatchers.Unconfined) {
375             expect(i * 6 + 3)
376             try {
377                 delay(Long.MAX_VALUE)
378             } catch (e: CancellationException) {
379                 throw TestException()
380             }
381         }
382             .doOnSubscribe { expect(i * 6 + 2) }
383             .doOnNext { expectUnreached() }
384             .doOnSuccess { expectUnreached() }
385             .doOnError { expectUnreached() }
386             .doOnCancel { expect(i * 6 + 4) }
387         val n = 1000
388         repeat(n) {
389             i = it
390             expect(i * 6 + 1)
391             mono.awaitCancelAndJoin()
392             expect(i * 6 + 6)
393         }
394         finish(n * 6 + 1)
395         Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
396     }
397 
398     /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then.
399      *
400      * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to
401      * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */
402     private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
403         async(start = CoroutineStart.UNDISPATCHED) {
404             awaitSingleOrNull()
405         }.cancelAndJoin()
406     }
407 }
408