1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import io.reactivex.disposables.* 9 import kotlinx.coroutines.* 10 import org.junit.Test 11 import kotlin.test.* 12 13 class ObservableCollectTest: TestBase() { 14 15 /** Tests the behavior of [collect] when the publisher raises an error. */ 16 @Test 17 fun testObservableCollectThrowingObservable() = runTest { 18 expect(1) 19 var sum = 0 20 try { 21 rxObservable { 22 for (i in 0..100) { 23 send(i) 24 } 25 throw TestException() 26 }.collect { 27 sum += it 28 } 29 } catch (e: TestException) { 30 assertTrue(sum > 0) 31 finish(2) 32 } 33 } 34 35 /** Tests the behavior of [collect] when the action throws. */ 36 @Test 37 fun testObservableCollectThrowingAction() = runTest { 38 expect(1) 39 var sum = 0 40 val expectedSum = 5 41 try { 42 var disposed = false 43 ObservableSource<Int> { observer -> 44 launch(Dispatchers.Default) { 45 observer.onSubscribe(object : Disposable { 46 override fun dispose() { 47 disposed = true 48 expect(expectedSum + 2) 49 } 50 51 override fun isDisposed(): Boolean = disposed 52 }) 53 while (!disposed) { 54 observer.onNext(1) 55 } 56 } 57 }.collect { 58 expect(sum + 2) 59 sum += it 60 if (sum == expectedSum) { 61 throw TestException() 62 } 63 } 64 } catch (e: TestException) { 65 assertEquals(expectedSum, sum) 66 finish(expectedSum + 3) 67 } 68 } 69 }