1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import kotlinx.coroutines.* 9 import org.junit.* 10 import org.junit.Test 11 import java.util.concurrent.* 12 import kotlin.test.* 13 14 class ObservableSingleTest : TestBase() { 15 @Before setupnull16 fun setup() { 17 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 18 } 19 20 @Test testSingleNoWaitnull21 fun testSingleNoWait() { 22 val observable = rxObservable { 23 send("OK") 24 } 25 26 checkSingleValue(observable) { 27 assertEquals("OK", it) 28 } 29 } 30 31 @Test <lambda>null32 fun testSingleAwait() = runBlocking { 33 assertEquals("OK", Observable.just("O").awaitSingle() + "K") 34 } 35 36 @Test testSingleEmitAndAwaitnull37 fun testSingleEmitAndAwait() { 38 val observable = rxObservable { 39 send(Observable.just("O").awaitSingle() + "K") 40 } 41 42 checkSingleValue(observable) { 43 assertEquals("OK", it) 44 } 45 } 46 47 @Test testSingleWithDelaynull48 fun testSingleWithDelay() { 49 val observable = rxObservable { 50 send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K") 51 } 52 53 checkSingleValue(observable) { 54 assertEquals("OK", it) 55 } 56 } 57 58 @Test testSingleExceptionnull59 fun testSingleException() { 60 val observable = rxObservable { 61 send(Observable.just("O", "K").awaitSingle() + "K") 62 } 63 64 checkErroneous(observable) { 65 assertTrue(it is IllegalArgumentException) 66 } 67 } 68 69 @Test testAwaitFirstnull70 fun testAwaitFirst() { 71 val observable = rxObservable { 72 send(Observable.just("O", "#").awaitFirst() + "K") 73 } 74 75 checkSingleValue(observable) { 76 assertEquals("OK", it) 77 } 78 } 79 80 @Test testAwaitFirstOrDefaultnull81 fun testAwaitFirstOrDefault() { 82 val observable = rxObservable { 83 send(Observable.empty<String>().awaitFirstOrDefault("O") + "K") 84 } 85 86 checkSingleValue(observable) { 87 assertEquals("OK", it) 88 } 89 } 90 91 @Test testAwaitFirstOrDefaultWithValuesnull92 fun testAwaitFirstOrDefaultWithValues() { 93 val observable = rxObservable { 94 send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K") 95 } 96 97 checkSingleValue(observable) { 98 assertEquals("OK", it) 99 } 100 } 101 102 @Test testAwaitFirstOrNullnull103 fun testAwaitFirstOrNull() { 104 val observable = rxObservable<String> { 105 send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") 106 } 107 108 checkSingleValue(observable) { 109 assertEquals("OK", it) 110 } 111 } 112 113 @Test testAwaitFirstOrNullWithValuesnull114 fun testAwaitFirstOrNullWithValues() { 115 val observable = rxObservable { 116 send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K") 117 } 118 119 checkSingleValue(observable) { 120 assertEquals("OK", it) 121 } 122 } 123 124 @Test testAwaitFirstOrElsenull125 fun testAwaitFirstOrElse() { 126 val observable = rxObservable { 127 send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K") 128 } 129 130 checkSingleValue(observable) { 131 assertEquals("OK", it) 132 } 133 } 134 135 @Test testAwaitFirstOrElseWithValuesnull136 fun testAwaitFirstOrElseWithValues() { 137 val observable = rxObservable { 138 send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K") 139 } 140 141 checkSingleValue(observable) { 142 assertEquals("OK", it) 143 } 144 } 145 146 @Test testAwaitLastnull147 fun testAwaitLast() { 148 val observable = rxObservable { 149 send(Observable.just("#", "O").awaitLast() + "K") 150 } 151 152 checkSingleValue(observable) { 153 assertEquals("OK", it) 154 } 155 } 156 157 @Test testExceptionFromObservablenull158 fun testExceptionFromObservable() { 159 val observable = rxObservable { 160 try { 161 send(Observable.error<String>(RuntimeException("O")).awaitFirst()) 162 } catch (e: RuntimeException) { 163 send(Observable.just(e.message!!).awaitLast() + "K") 164 } 165 } 166 167 checkSingleValue(observable) { 168 assertEquals("OK", it) 169 } 170 } 171 172 @Test testExceptionFromCoroutinenull173 fun testExceptionFromCoroutine() { 174 val observable = rxObservable<String> { 175 throw IllegalStateException(Observable.just("O").awaitSingle() + "K") 176 } 177 178 checkErroneous(observable) { 179 assertTrue(it is IllegalStateException) 180 assertEquals("OK", it.message) 181 } 182 } 183 184 @Test testObservableIterationnull185 fun testObservableIteration() { 186 val observable = rxObservable { 187 var result = "" 188 Observable.just("O", "K").collect { result += it } 189 send(result) 190 } 191 192 checkSingleValue(observable) { 193 assertEquals("OK", it) 194 } 195 } 196 197 @Test testObservableIterationFailurenull198 fun testObservableIterationFailure() { 199 val observable = rxObservable { 200 try { 201 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") } 202 send("Fail") 203 } catch (e: RuntimeException) { 204 send(e.message!!) 205 } 206 } 207 208 checkSingleValue(observable) { 209 assertEquals("OK", it) 210 } 211 } 212 } 213