1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.reactive.* 11 import org.junit.Test 12 import kotlin.test.* 13 14 class FluxTest : TestBase() { 15 @Test 16 fun testBasicSuccess() = runBlocking { 17 expect(1) 18 val flux = flux(currentDispatcher()) { 19 expect(4) 20 send("OK") 21 } 22 expect(2) 23 flux.subscribe { value -> 24 expect(5) 25 assertEquals("OK", value) 26 } 27 expect(3) 28 yield() // to started coroutine 29 finish(6) 30 } 31 32 @Test 33 fun testBasicFailure() = runBlocking { 34 expect(1) 35 val flux = flux<String>(currentDispatcher()) { 36 expect(4) 37 throw RuntimeException("OK") 38 } 39 expect(2) 40 flux.subscribe({ 41 expectUnreached() 42 }, { error -> 43 expect(5) 44 assertTrue(error is RuntimeException) 45 assertEquals("OK", error.message) 46 }) 47 expect(3) 48 yield() // to started coroutine 49 finish(6) 50 } 51 52 @Test 53 fun testBasicUnsubscribe() = runBlocking { 54 expect(1) 55 val flux = flux<String>(currentDispatcher()) { 56 expect(4) 57 yield() // back to main, will get cancelled 58 expectUnreached() 59 } 60 expect(2) 61 val sub = flux.subscribe({ 62 expectUnreached() 63 }, { 64 expectUnreached() 65 }) 66 expect(3) 67 yield() // to started coroutine 68 expect(5) 69 sub.dispose() // will cancel coroutine 70 yield() 71 finish(6) 72 } 73 74 @Test 75 fun testNotifyOnceOnCancellation() = runTest { 76 expect(1) 77 val observable = 78 flux(currentDispatcher()) { 79 expect(5) 80 send("OK") 81 try { 82 delay(Long.MAX_VALUE) 83 } catch (e: CancellationException) { 84 expect(11) 85 } 86 } 87 .doOnNext { 88 expect(6) 89 assertEquals("OK", it) 90 } 91 .doOnCancel { 92 expect(10) // notified once! 93 } 94 expect(2) 95 val job = launch(start = CoroutineStart.UNDISPATCHED) { 96 expect(3) 97 observable.collect { 98 expect(8) 99 assertEquals("OK", it) 100 } 101 } 102 expect(4) 103 yield() // to observable code 104 expect(7) 105 yield() // to consuming coroutines 106 expect(9) 107 job.cancel() 108 job.join() 109 finish(12) 110 } 111 112 @Test 113 fun testFailingConsumer() = runTest { 114 val pub = flux(currentDispatcher()) { 115 repeat(3) { 116 expect(it + 1) // expect(1), expect(2) *should* be invoked 117 send(it) 118 } 119 } 120 try { 121 pub.collect { 122 throw TestException() 123 } 124 } catch (e: TestException) { 125 finish(3) 126 } 127 } 128 129 @Test 130 fun testIllegalArgumentException() { 131 assertFailsWith<IllegalArgumentException> { flux<Int>(Job()) { } } 132 } 133 134 @Test 135 fun testLeakedException() = runBlocking { 136 // Test exception is not reported to global handler 137 val flow = flux<Unit> { throw TestException() }.asFlow() 138 repeat(2000) { 139 combine(flow, flow) { _, _ -> Unit } 140 .catch {} 141 .collect { } 142 } 143 } 144 145 /** Tests that `trySend` doesn't throw in `flux`. */ 146 @Test 147 fun testTrySendNotThrowing() = runTest { 148 var producerScope: ProducerScope<Int>? = null 149 expect(1) 150 val flux = flux<Int>(Dispatchers.Unconfined) { 151 producerScope = this 152 expect(3) 153 delay(Long.MAX_VALUE) 154 } 155 val job = launch(start = CoroutineStart.UNDISPATCHED) { 156 expect(2) 157 flux.awaitFirstOrNull() 158 expectUnreached() 159 } 160 job.cancel() 161 expect(4) 162 val result = producerScope!!.trySend(1) 163 assertTrue(result.isFailure) 164 finish(5) 165 } 166 167 /** Tests that all methods on `flux` fail without closing the channel when attempting to emit `null`. */ 168 @Test 169 fun testEmittingNull() = runTest { 170 val flux = flux { 171 assertFailsWith<NullPointerException> { send(null) } 172 assertFailsWith<NullPointerException> { trySend(null) } 173 send("OK") 174 } 175 assertEquals("OK", flux.awaitFirstOrNull()) 176 } 177 } 178