1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import org.junit.* 10 import org.junit.Assert.* 11 12 class ConvertTest : TestBase() { 13 @Test 14 fun testToCompletableSuccess() = runBlocking { 15 expect(1) 16 val job = launch { 17 expect(3) 18 } 19 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 20 completable.subscribe { 21 expect(4) 22 } 23 expect(2) 24 yield() 25 finish(5) 26 } 27 28 @Test 29 fun testToCompletableFail() = runBlocking { 30 expect(1) 31 val job = async(NonCancellable) { // don't kill parent on exception 32 expect(3) 33 throw RuntimeException("OK") 34 } 35 val completable = job.asCompletable(coroutineContext.minusKey(Job)) 36 completable.subscribe { 37 expect(4) 38 } 39 expect(2) 40 yield() 41 finish(5) 42 } 43 44 @Test 45 fun testToMaybe() { 46 val d = GlobalScope.async { 47 delay(50) 48 "OK" 49 } 50 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 51 checkMaybeValue(maybe1) { 52 assertEquals("OK", it) 53 } 54 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 55 checkMaybeValue(maybe2) { 56 assertEquals("OK", it) 57 } 58 } 59 60 @Test 61 fun testToMaybeEmpty() { 62 val d = GlobalScope.async { 63 delay(50) 64 null 65 } 66 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 67 checkMaybeValue(maybe1, ::assertNull) 68 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 69 checkMaybeValue(maybe2, ::assertNull) 70 } 71 72 @Test 73 fun testToMaybeFail() { 74 val d = GlobalScope.async { 75 delay(50) 76 throw TestRuntimeException("OK") 77 } 78 val maybe1 = d.asMaybe(Dispatchers.Unconfined) 79 checkErroneous(maybe1) { 80 check(it is TestRuntimeException && it.message == "OK") { "$it" } 81 } 82 val maybe2 = d.asMaybe(Dispatchers.Unconfined) 83 checkErroneous(maybe2) { 84 check(it is TestRuntimeException && it.message == "OK") { "$it" } 85 } 86 } 87 88 @Test 89 fun testToSingle() { 90 val d = GlobalScope.async { 91 delay(50) 92 "OK" 93 } 94 val single1 = d.asSingle(Dispatchers.Unconfined) 95 checkSingleValue(single1) { 96 assertEquals("OK", it) 97 } 98 val single2 = d.asSingle(Dispatchers.Unconfined) 99 checkSingleValue(single2) { 100 assertEquals("OK", it) 101 } 102 } 103 104 @Test 105 fun testToSingleFail() { 106 val d = GlobalScope.async { 107 delay(50) 108 throw TestRuntimeException("OK") 109 } 110 val single1 = d.asSingle(Dispatchers.Unconfined) 111 checkErroneous(single1) { 112 check(it is TestRuntimeException && it.message == "OK") { "$it" } 113 } 114 val single2 = d.asSingle(Dispatchers.Unconfined) 115 checkErroneous(single2) { 116 check(it is TestRuntimeException && it.message == "OK") { "$it" } 117 } 118 } 119 120 @Test 121 fun testToObservable() { 122 val c = GlobalScope.produce { 123 delay(50) 124 send("O") 125 delay(50) 126 send("K") 127 } 128 val observable = c.asObservable(Dispatchers.Unconfined) 129 checkSingleValue(observable.reduce { t1, t2 -> t1 + t2 }.toSingle()) { 130 assertEquals("OK", it) 131 } 132 } 133 134 @Test 135 fun testToObservableFail() { 136 val c = GlobalScope.produce { 137 delay(50) 138 send("O") 139 delay(50) 140 throw TestException("K") 141 } 142 val observable = c.asObservable(Dispatchers.Unconfined) 143 val single = rxSingle(Dispatchers.Unconfined) { 144 var result = "" 145 try { 146 observable.collect { result += it } 147 } catch(e: Throwable) { 148 check(e is TestException) 149 result += e.message 150 } 151 result 152 } 153 checkSingleValue(single) { 154 assertEquals("OK", it) 155 } 156 } 157 }