<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import kotlinx.coroutines.* 5 import kotlinx.coroutines.channels.* 6 import kotlinx.coroutines.flow.consumeAsFlow 7 import org.junit.Assert 8 import org.junit.Test 9 import kotlin.test.* 10 11 class ConvertTest : TestBase() { 12 @Test 13 fun testToCompletableSuccess() = runBlocking { 14 expect(1) 15 val job = launch { 16 expect(3) 17 } 18 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 19 completable.subscribe { 20 expect(4) 21 } 22 expect(2) 23 yield() 24 finish(5) 25 } 26 27 @Test 28 fun testToCompletableFail() = runBlocking { 29 expect(1) 30 val job = async(NonCancellable) { // don't kill parent on exception 31 expect(3) 32 throw RuntimeException("OK") 33 } 34 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 35 completable.subscribe { 36 expect(4) 37 } 38 expect(2) 39 yield() 40 finish(5) 41 } 42 43 @Test 44 fun testToMaybe() { 45 val d = GlobalScope.async { 46 delay(50) 47 "OK" 48 } 49 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 50 checkMaybeValue(maybe1) { 51 assertEquals("OK", it) 52 } 53 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 54 checkMaybeValue(maybe2) { 55 assertEquals("OK", it) 56 } 57 } 58 59 @Test 60 fun testToMaybeEmpty() { 61 val d = GlobalScope.async { 62 delay(50) 63 null 64 } 65 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 66 checkMaybeValue(maybe1, Assert::assertNull) 67 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 68 checkMaybeValue(maybe2, Assert::assertNull) 69 } 70 71 @Test 72 fun testToMaybeFail() { 73 val d = GlobalScope.async { 74 delay(50) 75 throw TestRuntimeException("OK") 76 } 77 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 78 checkErroneous(maybe1) { 79 check(it is TestRuntimeException && it.message == "OK") { "$it" } 80 } 81 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 82 checkErroneous(maybe2) { 83 check(it is TestRuntimeException && it.message == "OK") { "$it" } 84 } 85 } 86 87 @Test 88 fun testToSingle() { 89 val d = GlobalScope.async { 90 delay(50) 91 "OK" 92 } 93 val single1 = d.asSingle(Dispatchers.Unconfined) 94 checkSingleValue(single1) { 95 assertEquals("OK", it) 96 } 97 val single2 = d.asSingle(Dispatchers.Unconfined) 98 checkSingleValue(single2) { 99 assertEquals("OK", it) 100 } 101 } 102 103 @Test 104 fun testToSingleFail() { 105 val d = GlobalScope.async { 106 delay(50) 107 throw TestRuntimeException("OK") 108 } 109 val single1 = d.asSingle(Dispatchers.Unconfined) 110 checkErroneous(single1) { 111 check(it is TestRuntimeException && it.message == "OK") { "$it" } 112 } 113 val single2 = d.asSingle(Dispatchers.Unconfined) 114 checkErroneous(single2) { 115 check(it is TestRuntimeException && it.message == "OK") { "$it" } 116 } 117 } 118 119 @Test 120 fun testToObservable() { 121 val c = GlobalScope.produce { 122 delay(50) 123 send("O") 124 delay(50) 125 send("K") 126 } 127 val observable = c.consumeAsFlow().asObservable() 128 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) { 129 assertEquals("OK", it) 130 } 131 } 132 133 @Test 134 fun testToObservableFail() { 135 val c = GlobalScope.produce { 136 delay(50) 137 send("O") 138 delay(50) 139 throw TestException("K") 140 } 141 val observable = c.consumeAsFlow().asObservable() 142 val single = rxSingle(Dispatchers.Unconfined) { 143 var result = "" 144 try { 145 observable.collect { result += it } 146 } catch(e: Throwable) { 147 check(e is TestException) 148 result += e.message 149 } 150 result 151 } 152 checkSingleValue(single) { 153 assertEquals("OK", it) 154 } 155 } 156 } 157