• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.reactive.*
8 import org.junit.*
9 import org.junit.Test
10 import kotlin.test.*
11 
12 class ConvertTest : TestBase() {
13     @Test
14     fun testJobToMonoSuccess() = runBlocking {
15         expect(1)
16         val job = launch {
17             expect(3)
18         }
19         val mono = job.asMono(coroutineContext.minusKey(Job))
20         mono.subscribe {
21             expect(4)
22         }
23         expect(2)
24         yield()
25         finish(5)
26     }
27 
28     @Test
29     fun testJobToMonoFail() = runBlocking {
30         expect(1)
31         val job = async(NonCancellable) {
32             expect(3)
33             throw RuntimeException("OK")
34         }
35         val mono = job.asMono(coroutineContext.minusKey(Job))
36         mono.subscribe(
37                 { fail("no item should be emitted") },
38                 { expect(4) }
39         )
40         expect(2)
41         yield()
42         finish(5)
43     }
44 
45     @Test
46     fun testDeferredToMono() {
47         val d = GlobalScope.async {
48             delay(50)
49             "OK"
50         }
51         val mono1 = d.asMono(Dispatchers.Unconfined)
52         checkMonoValue(mono1) {
53             assertEquals("OK", it)
54         }
55         val mono2 = d.asMono(Dispatchers.Unconfined)
56         checkMonoValue(mono2) {
57             assertEquals("OK", it)
58         }
59     }
60 
61     @Test
62     fun testDeferredToMonoEmpty() {
63         val d = GlobalScope.async {
64             delay(50)
65             null
66         }
67         val mono1 = d.asMono(Dispatchers.Unconfined)
68         checkMonoValue(mono1, Assert::assertNull)
69         val mono2 = d.asMono(Dispatchers.Unconfined)
70         checkMonoValue(mono2, Assert::assertNull)
71     }
72 
73     @Test
74     fun testDeferredToMonoFail() {
75         val d = GlobalScope.async {
76             delay(50)
77             throw TestRuntimeException("OK")
78         }
79         val mono1 = d.asMono(Dispatchers.Unconfined)
80         checkErroneous(mono1) {
81             check(it is TestRuntimeException && it.message == "OK") { "$it" }
82         }
83         val mono2 = d.asMono(Dispatchers.Unconfined)
84         checkErroneous(mono2) {
85             check(it is TestRuntimeException && it.message == "OK") { "$it" }
86         }
87     }
88 
89     @Test
90     fun testToFlux() {
91         val c = GlobalScope.produce {
92             delay(50)
93             send("O")
94             delay(50)
95             send("K")
96         }
97         val flux = c.consumeAsFlow().asFlux(Dispatchers.Unconfined)
98         checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) {
99             assertEquals("OK", it)
100         }
101     }
102 
103     @Test
104     fun testToFluxFail() {
105         val c = GlobalScope.produce {
106             delay(50)
107             send("O")
108             delay(50)
109             throw TestException("K")
110         }
111         val flux = c.consumeAsFlow().asFlux(Dispatchers.Unconfined)
112         val mono = mono(Dispatchers.Unconfined) {
113             var result = ""
114             try {
115                 flux.collect { result += it }
116             } catch(e: Throwable) {
117                 check(e is TestException)
118                 result += e.message
119             }
120             result
121         }
122         checkMonoValue(mono) {
123             assertEquals("OK", it)
124         }
125     }
126 }
127