• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import org.junit.*
10 import org.junit.Assert.*
11 
12 class ConvertTest : TestBase() {
13     @Test
14     fun testToCompletableSuccess() = runBlocking {
15         expect(1)
16         val job = launch {
17             expect(3)
18         }
19         val completable = job.asCompletable(coroutineContext.minusKey(Job))
20         completable.subscribe {
21             expect(4)
22         }
23         expect(2)
24         yield()
25         finish(5)
26     }
27 
28     @Test
29     fun testToCompletableFail() = runBlocking {
30         expect(1)
31         val job = async(NonCancellable) { // don't kill parent on exception
32             expect(3)
33             throw RuntimeException("OK")
34         }
35         val completable = job.asCompletable(coroutineContext.minusKey(Job))
36         completable.subscribe {
37             expect(4)
38         }
39         expect(2)
40         yield()
41         finish(5)
42     }
43 
44     @Test
45     fun testToMaybe() {
46         val d = GlobalScope.async {
47             delay(50)
48             "OK"
49         }
50         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
51         checkMaybeValue(maybe1) {
52             assertEquals("OK", it)
53         }
54         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
55         checkMaybeValue(maybe2) {
56             assertEquals("OK", it)
57         }
58     }
59 
60     @Test
61     fun testToMaybeEmpty() {
62         val d = GlobalScope.async {
63             delay(50)
64             null
65         }
66         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
67         checkMaybeValue(maybe1, ::assertNull)
68         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
69         checkMaybeValue(maybe2, ::assertNull)
70     }
71 
72     @Test
73     fun testToMaybeFail() {
74         val d = GlobalScope.async {
75             delay(50)
76             throw TestRuntimeException("OK")
77         }
78         val maybe1 = d.asMaybe(Dispatchers.Unconfined)
79         checkErroneous(maybe1) {
80             check(it is TestRuntimeException && it.message == "OK") { "$it" }
81         }
82         val maybe2 = d.asMaybe(Dispatchers.Unconfined)
83         checkErroneous(maybe2) {
84             check(it is TestRuntimeException && it.message == "OK") { "$it" }
85         }
86     }
87 
88     @Test
89     fun testToSingle() {
90         val d = GlobalScope.async {
91             delay(50)
92             "OK"
93         }
94         val single1 = d.asSingle(Dispatchers.Unconfined)
95         checkSingleValue(single1) {
96             assertEquals("OK", it)
97         }
98         val single2 = d.asSingle(Dispatchers.Unconfined)
99         checkSingleValue(single2) {
100             assertEquals("OK", it)
101         }
102     }
103 
104     @Test
105     fun testToSingleFail() {
106         val d = GlobalScope.async {
107             delay(50)
108             throw TestRuntimeException("OK")
109         }
110         val single1 = d.asSingle(Dispatchers.Unconfined)
111         checkErroneous(single1) {
112             check(it is TestRuntimeException && it.message == "OK") { "$it" }
113         }
114         val single2 = d.asSingle(Dispatchers.Unconfined)
115         checkErroneous(single2) {
116             check(it is TestRuntimeException && it.message == "OK") { "$it" }
117         }
118     }
119 
120     @Test
121     fun testToObservable() {
122         val c = GlobalScope.produce {
123             delay(50)
124             send("O")
125             delay(50)
126             send("K")
127         }
128         val observable = c.asObservable(Dispatchers.Unconfined)
129         checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) {
130             assertEquals("OK", it)
131         }
132     }
133 
134     @Test
135     fun testToObservableFail() {
136         val c = GlobalScope.produce {
137             delay(50)
138             send("O")
139             delay(50)
140             throw TestException("K")
141         }
142         val observable = c.asObservable(Dispatchers.Unconfined)
143         val single = rxSingle(Dispatchers.Unconfined) {
144             var result = ""
145             try {
146                 observable.collect { result += it }
147             } catch(e: Throwable) {
148                 check(e is TestException)
149                 result += e.message
150             }
151             result
152         }
153         checkSingleValue(single) {
154             assertEquals("OK", it)
155         }
156     }
157 }