1 /* <lambda>null2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.flow.* 11 import org.junit.Test 12 import java.util.concurrent.* 13 import kotlin.test.* 14 15 class FlowAsObservableTest : TestBase() { 16 @Test 17 fun testBasicSuccess() = runTest { 18 expect(1) 19 val observable = flow { 20 expect(3) 21 emit("OK") 22 }.asObservable() 23 24 expect(2) 25 observable.subscribe { value -> 26 expect(4) 27 assertEquals("OK", value) 28 } 29 30 finish(5) 31 } 32 33 @Test 34 fun testBasicFailure() = runTest { 35 expect(1) 36 val observable = flow<Int> { 37 expect(3) 38 throw RuntimeException("OK") 39 }.asObservable() 40 41 expect(2) 42 observable.subscribe({ expectUnreached() }, { error -> 43 expect(4) 44 assertTrue(error is RuntimeException) 45 assertEquals("OK", error.message) 46 }) 47 finish(5) 48 } 49 50 @Test 51 fun testBasicUnsubscribe() = runTest { 52 expect(1) 53 val observable = flow<Int> { 54 expect(3) 55 hang { 56 expect(4) 57 } 58 }.asObservable() 59 60 expect(2) 61 val sub = observable.subscribe({ expectUnreached() }, { expectUnreached() }) 62 sub.dispose() // will cancel coroutine 63 finish(5) 64 } 65 66 @Test 67 fun testNotifyOnceOnCancellation() = runTest { 68 val observable = 69 flow { 70 expect(3) 71 emit("OK") 72 hang { 73 expect(7) 74 } 75 }.asObservable() 76 .doOnNext { 77 expect(4) 78 assertEquals("OK", it) 79 } 80 .doOnDispose { 81 expect(6) // notified once! 82 } 83 84 expect(1) 85 val job = launch(start = CoroutineStart.UNDISPATCHED) { 86 expect(2) 87 observable.collect { 88 expect(5) 89 assertEquals("OK", it) 90 } 91 } 92 93 yield() 94 job.cancelAndJoin() 95 finish(8) 96 } 97 98 @Test 99 fun testFailingConsumer() = runTest { 100 expect(1) 101 val observable = flow { 102 expect(2) 103 emit("OK") 104 hang { 105 expect(4) 106 } 107 108 }.asObservable() 109 110 try { 111 observable.collect { 112 expect(3) 113 throw TestException() 114 } 115 } catch (e: TestException) { 116 finish(5) 117 } 118 } 119 120 @Test 121 fun testNonAtomicStart() = runTest { 122 withContext(Dispatchers.Unconfined) { 123 val observable = flow<Int> { 124 expect(1) 125 }.asObservable() 126 127 val disposable = observable.subscribe({ expectUnreached() }, { expectUnreached() }, { expectUnreached() }) 128 disposable.dispose() 129 } 130 finish(2) 131 } 132 133 @Test 134 fun testFlowCancelledFromWithin() = runTest { 135 val observable = flow { 136 expect(1) 137 emit(1) 138 kotlin.coroutines.coroutineContext.cancel() 139 kotlin.coroutines.coroutineContext.ensureActive() 140 expectUnreached() 141 }.asObservable() 142 143 observable.subscribe({ expect(2) }, { expectUnreached() }, { finish(3) }) 144 } 145 146 @Test 147 fun testUnconfinedDefaultContext() { 148 expect(1) 149 val thread = Thread.currentThread() 150 fun checkThread() { 151 assertSame(thread, Thread.currentThread()) 152 } 153 flowOf(42).asObservable().subscribe(object : Observer<Int> { 154 override fun onSubscribe(d: Disposable) { 155 expect(2) 156 } 157 158 override fun onNext(t: Int) { 159 checkThread() 160 expect(3) 161 assertEquals(42, t) 162 } 163 164 override fun onComplete() { 165 checkThread() 166 expect(4) 167 } 168 169 override fun onError(t: Throwable) { 170 expectUnreached() 171 } 172 }) 173 finish(5) 174 } 175 176 @Test 177 fun testConfinedContext() { 178 expect(1) 179 val threadName = "FlowAsObservableTest.testConfinedContext" 180 fun checkThread() { 181 val currentThread = Thread.currentThread() 182 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread") 183 } 184 val completed = CountDownLatch(1) 185 newSingleThreadContext(threadName).use { dispatcher -> 186 flowOf(42).asObservable(dispatcher).subscribe(object : Observer<Int> { 187 override fun onSubscribe(d: Disposable) { 188 expect(2) 189 } 190 191 override fun onNext(t: Int) { 192 checkThread() 193 expect(3) 194 assertEquals(42, t) 195 } 196 197 override fun onComplete() { 198 checkThread() 199 expect(4) 200 completed.countDown() 201 } 202 203 override fun onError(e: Throwable) { 204 expectUnreached() 205 } 206 }) 207 completed.await() 208 } 209 finish(5) 210 } 211 } 212