• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.reactive.*
9 import org.junit.*
10 import org.junit.Test
11 import reactor.core.publisher.*
12 import java.time.Duration.*
13 import kotlin.test.*
14 
15 class FluxSingleTest : TestBase() {
16 
17     @Before
setupnull18     fun setup() {
19         ignoreLostThreads("parallel-")
20     }
21 
22     @Test
testSingleNoWaitnull23     fun testSingleNoWait() {
24         val flux = flux {
25             send("OK")
26         }
27 
28         checkSingleValue(flux) {
29             assertEquals("OK", it)
30         }
31     }
32 
33     @Test
<lambda>null34     fun testSingleAwait() = runBlocking {
35         assertEquals("OK", Flux.just("O").awaitSingle() + "K")
36     }
37 
38     @Test
testSingleEmitAndAwaitnull39     fun testSingleEmitAndAwait() {
40         val flux = flux {
41             send(Flux.just("O").awaitSingle() + "K")
42         }
43 
44         checkSingleValue(flux) {
45             assertEquals("OK", it)
46         }
47     }
48 
49     @Test
testSingleWithDelaynull50     fun testSingleWithDelay() {
51         val flux = flux {
52             send(Flux.just("O").delayElements(ofMillis(50)).awaitSingle() + "K")
53         }
54 
55         checkSingleValue(flux) {
56             assertEquals("OK", it)
57         }
58     }
59 
60     @Test
testSingleExceptionnull61     fun testSingleException() {
62         val flux = flux {
63             send(Flux.just("O", "K").awaitSingle() + "K")
64         }
65 
66         checkErroneous(flux) {
67             assert(it is IllegalArgumentException)
68         }
69     }
70 
71     @Test
testAwaitFirstnull72     fun testAwaitFirst() {
73         val flux = flux {
74             send(Flux.just("O", "#").awaitFirst() + "K")
75         }
76 
77         checkSingleValue(flux) {
78             assertEquals("OK", it)
79         }
80     }
81 
82     @Test
testAwaitFirstOrDefaultnull83     fun testAwaitFirstOrDefault() {
84         val flux = flux {
85             send(Flux.empty<String>().awaitFirstOrDefault("O") + "K")
86         }
87 
88         checkSingleValue(flux) {
89             assertEquals("OK", it)
90         }
91     }
92 
93     @Test
testAwaitFirstOrDefaultWithValuesnull94     fun testAwaitFirstOrDefaultWithValues() {
95         val flux = flux {
96             send(Flux.just("O", "#").awaitFirstOrDefault("!") + "K")
97         }
98 
99         checkSingleValue(flux) {
100             assertEquals("OK", it)
101         }
102     }
103 
104     @Test
testAwaitFirstOrNullnull105     fun testAwaitFirstOrNull() {
106         val flux = flux<String?> {
107             send(Flux.empty<String>().awaitFirstOrNull() ?: "OK")
108         }
109 
110         checkSingleValue(flux) {
111             assertEquals("OK", it)
112         }
113     }
114 
115     @Test
testAwaitFirstOrNullWithValuesnull116     fun testAwaitFirstOrNullWithValues() {
117         val flux = flux {
118             send((Flux.just("O", "#").awaitFirstOrNull() ?: "!") + "K")
119         }
120 
121         checkSingleValue(flux) {
122             assertEquals("OK", it)
123         }
124     }
125 
126     @Test
testAwaitFirstOrElsenull127     fun testAwaitFirstOrElse() {
128         val flux = flux {
129             send(Flux.empty<String>().awaitFirstOrElse { "O" } + "K")
130         }
131 
132         checkSingleValue(flux) {
133             assertEquals("OK", it)
134         }
135     }
136 
137     @Test
testAwaitFirstOrElseWithValuesnull138     fun testAwaitFirstOrElseWithValues() {
139         val flux = flux {
140             send(Flux.just("O", "#").awaitFirstOrElse { "!" } + "K")
141         }
142 
143         checkSingleValue(flux) {
144             assertEquals("OK", it)
145         }
146     }
147 
148     @Test
testAwaitLastnull149     fun testAwaitLast() {
150         val flux = flux {
151             send(Flux.just("#", "O").awaitLast() + "K")
152         }
153 
154         checkSingleValue(flux) {
155             assertEquals("OK", it)
156         }
157     }
158 
159     @Test
testExceptionFromObservablenull160     fun testExceptionFromObservable() {
161         val flux = flux {
162             try {
163                 send(Flux.error<String>(RuntimeException("O")).awaitFirst())
164             } catch (e: RuntimeException) {
165                 send(Flux.just(e.message!!).awaitLast() + "K")
166             }
167         }
168 
169         checkSingleValue(flux) {
170             assertEquals("OK", it)
171         }
172     }
173 
174     @Test
testExceptionFromCoroutinenull175     fun testExceptionFromCoroutine() {
176         val flux = flux<String> {
177             throw IllegalStateException(Flux.just("O").awaitSingle() + "K")
178         }
179 
180         checkErroneous(flux) {
181             assert(it is IllegalStateException)
182             assertEquals("OK", it.message)
183         }
184     }
185 
186     @Test
testFluxIterationnull187     fun testFluxIteration() {
188         val flux = flux {
189             var result = ""
190             Flux.just("O", "K").collect { result += it }
191             send(result)
192         }
193 
194         checkSingleValue(flux) {
195             assertEquals("OK", it)
196         }
197     }
198 
199     @Test
testFluxIterationFailurenull200     fun testFluxIterationFailure() {
201         val flux = flux {
202             try {
203                 Flux.error<String>(RuntimeException("OK")).collect { fail("Should not be here") }
204                 send("Fail")
205             } catch (e: RuntimeException) {
206                 send(e.message!!)
207             }
208         }
209 
210         checkSingleValue(flux) {
211             assertEquals("OK", it)
212         }
213     }
214 }
215