1 package kotlinx.coroutines.rx2 2 3 import kotlinx.coroutines.testing.* 4 import io.reactivex.Observable 5 import io.reactivex.ObservableSource 6 import io.reactivex.Observer 7 import io.reactivex.disposables.Disposables 8 import io.reactivex.subjects.PublishSubject 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.channels.* 11 import kotlinx.coroutines.flow.* 12 import kotlinx.coroutines.testing.flow.* 13 import kotlin.test.* 14 15 class ObservableAsFlowTest : TestBase() { 16 @Test testCancellationnull17 fun testCancellation() = runTest { 18 var onNext = 0 19 var onCancelled = 0 20 var onError = 0 21 22 val source = rxObservable(currentDispatcher()) { 23 coroutineContext[Job]?.invokeOnCompletion { 24 if (it is CancellationException) ++onCancelled 25 } 26 27 repeat(100) { 28 send(it) 29 } 30 } 31 32 source.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { 33 onEach { 34 ++onNext 35 throw RuntimeException() 36 } 37 catch<Throwable> { 38 ++onError 39 } 40 }.join() 41 42 43 assertEquals(1, onNext) 44 assertEquals(1, onError) 45 assertEquals(1, onCancelled) 46 } 47 48 @Test testImmediateCollectionnull49 fun testImmediateCollection() { 50 val source = PublishSubject.create<Int>() 51 val flow = source.asFlow() 52 GlobalScope.launch(Dispatchers.Unconfined) { 53 expect(1) 54 flow.collect { expect(it) } 55 expect(6) 56 } 57 expect(2) 58 source.onNext(3) 59 expect(4) 60 source.onNext(5) 61 source.onComplete() 62 finish(7) 63 } 64 65 @Test testOnErrorCancellationnull66 fun testOnErrorCancellation() { 67 val source = PublishSubject.create<Int>() 68 val flow = source.asFlow() 69 val exception = RuntimeException() 70 GlobalScope.launch(Dispatchers.Unconfined) { 71 try { 72 expect(1) 73 flow.collect { expect(it) } 74 expectUnreached() 75 } 76 catch (e: Exception) { 77 assertSame(exception, e.cause) 78 expect(5) 79 } 80 expect(6) 81 } 82 expect(2) 83 source.onNext(3) 84 expect(4) 85 source.onError(exception) 86 finish(7) 87 } 88 89 @Test testUnsubscribeOnCollectionExceptionnull90 fun testUnsubscribeOnCollectionException() { 91 val source = PublishSubject.create<Int>() 92 val flow = source.asFlow() 93 val exception = RuntimeException() 94 GlobalScope.launch(Dispatchers.Unconfined) { 95 try { 96 expect(1) 97 flow.collect { 98 expect(it) 99 if (it == 3) throw exception 100 } 101 expectUnreached() 102 } 103 catch (e: Exception) { 104 assertSame(exception, e.cause) 105 expect(4) 106 } 107 expect(5) 108 } 109 expect(2) 110 assertTrue(source.hasObservers()) 111 source.onNext(3) 112 assertFalse(source.hasObservers()) 113 finish(6) 114 } 115 116 @Test testLateOnSubscribenull117 fun testLateOnSubscribe() { 118 var observer: Observer<in Int>? = null 119 val source = ObservableSource<Int> { observer = it } 120 val flow = source.asFlow() 121 assertNull(observer) 122 val job = GlobalScope.launch(Dispatchers.Unconfined) { 123 expect(1) 124 flow.collect { expectUnreached() } 125 expectUnreached() 126 } 127 expect(2) 128 assertNotNull(observer) 129 job.cancel() 130 val disposable = Disposables.empty() 131 observer!!.onSubscribe(disposable) 132 assertTrue(disposable.isDisposed) 133 finish(3) 134 } 135 136 @Test <lambda>null137 fun testBufferUnlimited() = runTest { 138 val source = rxObservable(currentDispatcher()) { 139 expect(1); send(10) 140 expect(2); send(11) 141 expect(3); send(12) 142 expect(4); send(13) 143 expect(5); send(14) 144 expect(6); send(15) 145 expect(7); send(16) 146 expect(8); send(17) 147 expect(9) 148 } 149 source.asFlow().buffer(Channel.UNLIMITED).collect { expect(it) } 150 finish(18) 151 } 152 153 @Test <lambda>null154 fun testConflated() = runTest { 155 val source = Observable.range(1, 5) 156 val list = source.asFlow().conflate().toList() 157 assertEquals(listOf(1, 5), list) 158 } 159 160 @Test <lambda>null161 fun testLongRange() = runTest { 162 val source = Observable.range(1, 10_000) 163 val count = source.asFlow().count() 164 assertEquals(10_000, count) 165 } 166 167 @Test <lambda>null168 fun testProduce() = runTest { 169 val source = Observable.range(0, 10) 170 val flow = source.asFlow() 171 check((0..9).toList(), flow.produceIn(this)) 172 check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) 173 check((0..9).toList(), flow.buffer(2).produceIn(this)) 174 check((0..9).toList(), flow.buffer(0).produceIn(this)) 175 check(listOf(0, 9), flow.conflate().produceIn(this)) 176 } 177 checknull178 private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) { 179 val result = ArrayList<Int>(10) 180 channel.consumeEach { result.add(it) } 181 assertEquals(expected, result) 182 } 183 } 184