1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.reactive.* 9 import org.junit.* 10 import org.junit.Test 11 import reactor.core.publisher.* 12 import java.time.Duration.* 13 import kotlin.test.* 14 15 class FluxSingleTest : TestBase() { 16 17 @Before setupnull18 fun setup() { 19 ignoreLostThreads("parallel-") 20 } 21 22 @Test testSingleNoWaitnull23 fun testSingleNoWait() { 24 val flux = flux { 25 send("OK") 26 } 27 28 checkSingleValue(flux) { 29 assertEquals("OK", it) 30 } 31 } 32 33 @Test <lambda>null34 fun testSingleAwait() = runBlocking { 35 assertEquals("OK", Flux.just("O").awaitSingle() + "K") 36 } 37 38 @Test testSingleEmitAndAwaitnull39 fun testSingleEmitAndAwait() { 40 val flux = flux { 41 send(Flux.just("O").awaitSingle() + "K") 42 } 43 44 checkSingleValue(flux) { 45 assertEquals("OK", it) 46 } 47 } 48 49 @Test testSingleWithDelaynull50 fun testSingleWithDelay() { 51 val flux = flux { 52 send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K") 53 } 54 55 checkSingleValue(flux) { 56 assertEquals("OK", it) 57 } 58 } 59 60 @Test testSingleExceptionnull61 fun testSingleException() { 62 val flux = flux { 63 send(Flux.just("O", "K").awaitSingle() + "K") 64 } 65 66 checkErroneous(flux) { 67 assert(it is IllegalArgumentException) 68 } 69 } 70 71 @Test testAwaitFirstnull72 fun testAwaitFirst() { 73 val flux = flux { 74 send(Flux.just("O", "#").awaitFirst() + "K") 75 } 76 77 checkSingleValue(flux) { 78 assertEquals("OK", it) 79 } 80 } 81 82 @Test testAwaitFirstOrDefaultnull83 fun testAwaitFirstOrDefault() { 84 val flux = flux { 85 send(Flux.empty<String>().awaitFirstOrDefault("O") + "K") 86 } 87 88 checkSingleValue(flux) { 89 assertEquals("OK", it) 90 } 91 } 92 93 @Test testAwaitFirstOrDefaultWithValuesnull94 fun testAwaitFirstOrDefaultWithValues() { 95 val flux = flux { 96 send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K") 97 } 98 99 checkSingleValue(flux) { 100 assertEquals("OK", it) 101 } 102 } 103 104 @Test testAwaitFirstOrNullnull105 fun testAwaitFirstOrNull() { 106 val flux = flux<String?> { 107 send(Flux.empty<String>().awaitFirstOrNull() ?: "OK") 108 } 109 110 checkSingleValue(flux) { 111 assertEquals("OK", it) 112 } 113 } 114 115 @Test testAwaitFirstOrNullWithValuesnull116 fun testAwaitFirstOrNullWithValues() { 117 val flux = flux { 118 send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K") 119 } 120 121 checkSingleValue(flux) { 122 assertEquals("OK", it) 123 } 124 } 125 126 @Test testAwaitFirstOrElsenull127 fun testAwaitFirstOrElse() { 128 val flux = flux { 129 send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K") 130 } 131 132 checkSingleValue(flux) { 133 assertEquals("OK", it) 134 } 135 } 136 137 @Test testAwaitFirstOrElseWithValuesnull138 fun testAwaitFirstOrElseWithValues() { 139 val flux = flux { 140 send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K") 141 } 142 143 checkSingleValue(flux) { 144 assertEquals("OK", it) 145 } 146 } 147 148 @Test testAwaitLastnull149 fun testAwaitLast() { 150 val flux = flux { 151 send(Flux.just("#", "O").awaitLast() + "K") 152 } 153 154 checkSingleValue(flux) { 155 assertEquals("OK", it) 156 } 157 } 158 159 @Test testExceptionFromObservablenull160 fun testExceptionFromObservable() { 161 val flux = flux { 162 try { 163 send(Flux.error<String>(RuntimeException("O")).awaitFirst()) 164 } catch (e: RuntimeException) { 165 send(Flux.just(e.message!!).awaitLast() + "K") 166 } 167 } 168 169 checkSingleValue(flux) { 170 assertEquals("OK", it) 171 } 172 } 173 174 @Test testExceptionFromCoroutinenull175 fun testExceptionFromCoroutine() { 176 val flux = flux<String> { 177 throw IllegalStateException(Flux.just("O").awaitSingle() + "K") 178 } 179 180 checkErroneous(flux) { 181 assert(it is IllegalStateException) 182 assertEquals("OK", it.message) 183 } 184 } 185 186 @Test testFluxIterationnull187 fun testFluxIteration() { 188 val flux = flux { 189 var result = "" 190 Flux.just("O", "K").collect { result += it } 191 send(result) 192 } 193 194 checkSingleValue(flux) { 195 assertEquals("OK", it) 196 } 197 } 198 199 @Test testFluxIterationFailurenull200 fun testFluxIterationFailure() { 201 val flux = flux { 202 try { 203 Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") } 204 send("Fail") 205 } catch (e: RuntimeException) { 206 send(e.message!!) 207 } 208 } 209 210 checkSingleValue(flux) { 211 assertEquals("OK", it) 212 } 213 } 214 } 215