1 /* <lambda>null2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.disposables.* 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.CancellationException 11 import org.junit.* 12 import org.junit.Test 13 import java.util.concurrent.* 14 import kotlin.test.* 15 16 class ObservableSingleTest : TestBase() { 17 @Before 18 fun setup() { 19 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 20 } 21 22 @Test 23 fun testSingleNoWait() { 24 val observable = rxObservable { 25 send("OK") 26 } 27 28 checkSingleValue(observable) { 29 assertEquals("OK", it) 30 } 31 } 32 33 @Test 34 fun testSingleAwait() = runBlocking { 35 assertEquals("OK", Observable.just("O").awaitSingle() + "K") 36 } 37 38 @Test 39 fun testSingleEmitAndAwait() { 40 val observable = rxObservable { 41 send(Observable.just("O").awaitSingle() + "K") 42 } 43 44 checkSingleValue(observable) { 45 assertEquals("OK", it) 46 } 47 } 48 49 @Test 50 fun testSingleWithDelay() { 51 val observable = rxObservable { 52 send(Observable.timer(50, TimeUnit.MILLISECONDS).map { "O" }.awaitSingle() + "K") 53 } 54 55 checkSingleValue(observable) { 56 assertEquals("OK", it) 57 } 58 } 59 60 @Test 61 fun testSingleException() { 62 val observable = rxObservable { 63 send(Observable.just("O", "K").awaitSingle() + "K") 64 } 65 66 checkErroneous(observable) { 67 assertTrue(it is IllegalArgumentException) 68 } 69 } 70 71 @Test 72 fun testAwaitFirst() { 73 val observable = rxObservable { 74 send(Observable.just("O", "#").awaitFirst() + "K") 75 } 76 77 checkSingleValue(observable) { 78 assertEquals("OK", it) 79 } 80 } 81 82 @Test 83 fun testAwaitFirstOrDefault() { 84 val observable = rxObservable { 85 send(Observable.empty<String>().awaitFirstOrDefault("O") + "K") 86 } 87 88 checkSingleValue(observable) { 89 assertEquals("OK", it) 90 } 91 } 92 93 @Test 94 fun testAwaitFirstOrDefaultWithValues() { 95 val observable = rxObservable { 96 send(Observable.just("O", "#").awaitFirstOrDefault("!") + "K") 97 } 98 99 checkSingleValue(observable) { 100 assertEquals("OK", it) 101 } 102 } 103 104 @Test 105 fun testAwaitFirstOrNull() { 106 val observable = rxObservable { 107 send(Observable.empty<String>().awaitFirstOrNull() ?: "OK") 108 } 109 110 checkSingleValue(observable) { 111 assertEquals("OK", it) 112 } 113 } 114 115 @Test 116 fun testAwaitFirstOrNullWithValues() { 117 val observable = rxObservable { 118 send((Observable.just("O", "#").awaitFirstOrNull() ?: "!") + "K") 119 } 120 121 checkSingleValue(observable) { 122 assertEquals("OK", it) 123 } 124 } 125 126 @Test 127 fun testAwaitFirstOrElse() { 128 val observable = rxObservable { 129 send(Observable.empty<String>().awaitFirstOrElse { "O" } + "K") 130 } 131 132 checkSingleValue(observable) { 133 assertEquals("OK", it) 134 } 135 } 136 137 @Test 138 fun testAwaitFirstOrElseWithValues() { 139 val observable = rxObservable { 140 send(Observable.just("O", "#").awaitFirstOrElse { "!" } + "K") 141 } 142 143 checkSingleValue(observable) { 144 assertEquals("OK", it) 145 } 146 } 147 148 @Test 149 fun testAwaitLast() { 150 val observable = rxObservable { 151 send(Observable.just("#", "O").awaitLast() + "K") 152 } 153 154 checkSingleValue(observable) { 155 assertEquals("OK", it) 156 } 157 } 158 159 /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of 160 * the subscription when their [Job] is cancelled. */ 161 @Test 162 fun testAwaitCancellation() = runTest { 163 expect(1) 164 val observable = ObservableSource<Int> { s -> 165 s.onSubscribe(object: Disposable { 166 override fun dispose() { expect(4) } 167 override fun isDisposed(): Boolean { expectUnreached(); return false } 168 }) 169 } 170 val job = launch(start = CoroutineStart.UNDISPATCHED) { 171 try { 172 expect(2) 173 observable.awaitFirst() 174 } catch (e: CancellationException) { 175 expect(5) 176 throw e 177 } 178 } 179 expect(3) 180 job.cancelAndJoin() 181 finish(6) 182 } 183 184 185 @Test 186 fun testExceptionFromObservable() { 187 val observable = rxObservable { 188 try { 189 send(Observable.error<String>(RuntimeException("O")).awaitFirst()) 190 } catch (e: RuntimeException) { 191 send(Observable.just(e.message!!).awaitLast() + "K") 192 } 193 } 194 195 checkSingleValue(observable) { 196 assertEquals("OK", it) 197 } 198 } 199 200 @Test 201 fun testExceptionFromCoroutine() { 202 val observable = rxObservable<String> { 203 throw IllegalStateException(Observable.just("O").awaitSingle() + "K") 204 } 205 206 checkErroneous(observable) { 207 assertTrue(it is IllegalStateException) 208 assertEquals("OK", it.message) 209 } 210 } 211 212 @Test 213 fun testObservableIteration() { 214 val observable = rxObservable { 215 var result = "" 216 Observable.just("O", "K").collect { result += it } 217 send(result) 218 } 219 220 checkSingleValue(observable) { 221 assertEquals("OK", it) 222 } 223 } 224 225 @Test 226 fun testObservableIterationFailure() { 227 val observable = rxObservable { 228 try { 229 Observable.error<String>(RuntimeException("OK")).collect { fail("Should not be here") } 230 send("Fail") 231 } catch (e: RuntimeException) { 232 send(e.message!!) 233 } 234 } 235 236 checkSingleValue(observable) { 237 assertEquals("OK", it) 238 } 239 } 240 } 241