<lambda>null1package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import io.reactivex.rxjava3.core.ObservableSource 5 import io.reactivex.rxjava3.disposables.* 6 import kotlinx.coroutines.* 7 import org.junit.Test 8 import kotlin.test.* 9 10 class ObservableCollectTest: TestBase() { 11 12 /** Tests the behavior of [collect] when the publisher raises an error. */ 13 @Test 14 fun testObservableCollectThrowingObservable() = runTest { 15 expect(1) 16 var sum = 0 17 try { 18 rxObservable { 19 for (i in 0..100) { 20 send(i) 21 } 22 throw TestException() 23 }.collect { 24 sum += it 25 } 26 } catch (e: TestException) { 27 assertTrue(sum > 0) 28 finish(2) 29 } 30 } 31 32 @Test 33 fun testObservableCollectThrowingAction() = runTest { 34 expect(1) 35 var sum = 0 36 val expectedSum = 5 37 try { 38 var disposed = false 39 ObservableSource<Int> { observer -> 40 launch(Dispatchers.Default) { 41 observer.onSubscribe(object : Disposable { 42 override fun dispose() { 43 disposed = true 44 expect(expectedSum + 2) 45 } 46 47 override fun isDisposed(): Boolean = disposed 48 }) 49 while (!disposed) { 50 observer.onNext(1) 51 } 52 } 53 }.collect { 54 expect(sum + 2) 55 sum += it 56 if (sum == expectedSum) { 57 throw TestException() 58 } 59 } 60 } catch (e: TestException) { 61 assertEquals(expectedSum, sum) 62 finish(expectedSum + 3) 63 } 64 } 65 }