1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.ObservableSource 8 import io.reactivex.rxjava3.disposables.* 9 import kotlinx.coroutines.* 10 import org.junit.Test 11 import kotlin.test.* 12 13 class ObservableCollectTest: TestBase() { 14 15 /** Tests the behavior of [collect] when the publisher raises an error. */ 16 @Test 17 fun testObservableCollectThrowingObservable() = runTest { 18 expect(1) 19 var sum = 0 20 try { 21 rxObservable { 22 for (i in 0..100) { 23 send(i) 24 } 25 throw TestException() 26 }.collect { 27 sum += it 28 } 29 } catch (e: TestException) { 30 assertTrue(sum > 0) 31 finish(2) 32 } 33 } 34 35 @Test 36 fun testObservableCollectThrowingAction() = runTest { 37 expect(1) 38 var sum = 0 39 val expectedSum = 5 40 try { 41 var disposed = false 42 ObservableSource<Int> { observer -> 43 launch(Dispatchers.Default) { 44 observer.onSubscribe(object : Disposable { 45 override fun dispose() { 46 disposed = true 47 expect(expectedSum + 2) 48 } 49 50 override fun isDisposed(): Boolean = disposed 51 }) 52 while (!disposed) { 53 observer.onNext(1) 54 } 55 } 56 }.collect { 57 expect(sum + 2) 58 sum += it 59 if (sum == expectedSum) { 60 throw TestException() 61 } 62 } 63 } catch (e: TestException) { 64 assertEquals(expectedSum, sum) 65 finish(expectedSum + 3) 66 } 67 } 68 }