• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.reactive.*
10 import org.junit.*
11 import org.junit.Assert.*
12 
13 class ConvertTest : TestBase() {
14     @Test
15     fun testJobToMonoSuccess() = runBlocking {
16         expect(1)
17         val job = launch {
18             expect(3)
19         }
20         val mono = job.asMono(coroutineContext.minusKey(Job))
21         mono.subscribe {
22             expect(4)
23         }
24         expect(2)
25         yield()
26         finish(5)
27     }
28 
29     @Test
30     fun testJobToMonoFail() = runBlocking {
31         expect(1)
32         val job = async(NonCancellable) {
33             expect(3)
34             throw RuntimeException("OK")
35         }
36         val mono = job.asMono(coroutineContext.minusKey(Job))
37         mono.subscribe(
38                 { fail("no item should be emitted") },
39                 { expect(4) }
40         )
41         expect(2)
42         yield()
43         finish(5)
44     }
45 
46     @Test
47     fun testDeferredToMono() {
48         val d = GlobalScope.async {
49             delay(50)
50             "OK"
51         }
52         val mono1 = d.asMono(Dispatchers.Unconfined)
53         checkMonoValue(mono1) {
54             assertEquals("OK", it)
55         }
56         val mono2 = d.asMono(Dispatchers.Unconfined)
57         checkMonoValue(mono2) {
58             assertEquals("OK", it)
59         }
60     }
61 
62     @Test
63     fun testDeferredToMonoEmpty() {
64         val d = GlobalScope.async {
65             delay(50)
66             null
67         }
68         val mono1 = d.asMono(Dispatchers.Unconfined)
69         checkMonoValue(mono1, ::assertNull)
70         val mono2 = d.asMono(Dispatchers.Unconfined)
71         checkMonoValue(mono2, ::assertNull)
72     }
73 
74     @Test
75     fun testDeferredToMonoFail() {
76         val d = GlobalScope.async {
77             delay(50)
78             throw TestRuntimeException("OK")
79         }
80         val mono1 = d.asMono(Dispatchers.Unconfined)
81         checkErroneous(mono1) {
82             check(it is TestRuntimeException && it.message == "OK") { "$it" }
83         }
84         val mono2 = d.asMono(Dispatchers.Unconfined)
85         checkErroneous(mono2) {
86             check(it is TestRuntimeException && it.message == "OK") { "$it" }
87         }
88     }
89 
90     @Test
91     fun testToFlux() {
92         val c = GlobalScope.produce {
93             delay(50)
94             send("O")
95             delay(50)
96             send("K")
97         }
98         val flux = c.asFlux(Dispatchers.Unconfined)
99         checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) {
100             assertEquals("OK", it)
101         }
102     }
103 
104     @Test
105     fun testToFluxFail() {
106         val c = GlobalScope.produce {
107             delay(50)
108             send("O")
109             delay(50)
110             throw TestException("K")
111         }
112         val flux = c.asFlux(Dispatchers.Unconfined)
113         val mono = mono(Dispatchers.Unconfined) {
114             var result = ""
115             try {
116                 flux.collect { result += it }
117             } catch(e: Throwable) {
118                 check(e is TestException)
119                 result += e.message
120             }
121             result
122         }
123         checkMonoValue(mono) {
124             assertEquals("OK", it)
125         }
126     }
127 }