• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.consumeAsFlow
7 import org.junit.Assert
8 import org.junit.Test
9 import kotlin.test.*
10 
11 class ConvertTest : TestBase() {
12     @Test
13     fun testToCompletableSuccess() = runBlocking {
14         expect(1)
15         val job = launch {
16             expect(3)
17         }
18         val completable = job.asCompletable(coroutineContext.minusKey(Job))
19         completable.subscribe {
20             expect(4)
21         }
22         expect(2)
23         yield()
24         finish(5)
25     }
26 
27     @Test
28     fun testToCompletableFail() = runBlocking {
29         expect(1)
30         val job = async(NonCancellable) { // don't kill parent on exception
31             expect(3)
32             throw RuntimeException("OK")
33         }
34         val completable = job.asCompletable(coroutineContext.minusKey(Job))
35         completable.subscribe {
36             expect(4)
37         }
38         expect(2)
39         yield()
40         finish(5)
41     }
42 
43     @Test
44     fun testToMaybe() {
45         val d = GlobalScope.async {
46             delay(50)
47             "OK"
48         }
49         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
50         checkMaybeValue(maybe1) {
51             assertEquals("OK", it)
52         }
53         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
54         checkMaybeValue(maybe2) {
55             assertEquals("OK", it)
56         }
57     }
58 
59     @Test
60     fun testToMaybeEmpty() {
61         val d = GlobalScope.async {
62             delay(50)
63             null
64         }
65         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
66         checkMaybeValue(maybe1, Assert::assertNull)
67         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
68         checkMaybeValue(maybe2, Assert::assertNull)
69     }
70 
71     @Test
72     fun testToMaybeFail() {
73         val d = GlobalScope.async {
74             delay(50)
75             throw TestRuntimeException("OK")
76         }
77         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
78         checkErroneous(maybe1) {
79             check(it is TestRuntimeException && it.message == "OK") { "$it" }
80         }
81         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
82         checkErroneous(maybe2) {
83             check(it is TestRuntimeException && it.message == "OK") { "$it" }
84         }
85     }
86 
87     @Test
88     fun testToSingle() {
89         val d = GlobalScope.async {
90             delay(50)
91             "OK"
92         }
93         val single1 = d.asSingle(Dispatchers.Unconfined)
94         checkSingleValue(single1) {
95             assertEquals("OK", it)
96         }
97         val single2 = d.asSingle(Dispatchers.Unconfined)
98         checkSingleValue(single2) {
99             assertEquals("OK", it)
100         }
101     }
102 
103     @Test
104     fun testToSingleFail() {
105         val d = GlobalScope.async {
106             delay(50)
107             throw TestRuntimeException("OK")
108         }
109         val single1 = d.asSingle(Dispatchers.Unconfined)
110         checkErroneous(single1) {
111             check(it is TestRuntimeException && it.message == "OK") { "$it" }
112         }
113         val single2 = d.asSingle(Dispatchers.Unconfined)
114         checkErroneous(single2) {
115             check(it is TestRuntimeException && it.message == "OK") { "$it" }
116         }
117     }
118 
119     @Test
120     fun testToObservable() {
121         val c = GlobalScope.produce {
122             delay(50)
123             send("O")
124             delay(50)
125             send("K")
126         }
127         val observable = c.consumeAsFlow().asObservable()
128         checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
129             assertEquals("OK", it)
130         }
131     }
132 
133     @Test
134     fun testToObservableFail() {
135         val c = GlobalScope.produce {
136             delay(50)
137             send("O")
138             delay(50)
139             throw TestException("K")
140         }
141         val observable = c.consumeAsFlow().asObservable()
142         val single = rxSingle(Dispatchers.Unconfined) {
143             var result = ""
144             try {
145                 observable.collect { result += it }
146             } catch(e: Throwable) {
147                 check(e is TestException)
148                 result += e.message
149             }
150             result
151         }
152         checkSingleValue(single) {
153             assertEquals("OK", it)
154         }
155     }
156 }
157