1 package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import io.reactivex.rxjava3.core.* 5 import kotlinx.coroutines.* 6 import kotlinx.coroutines.channels.* 7 import kotlinx.coroutines.flow.* 8 import org.junit.* 9 import java.util.concurrent.* 10 11 class ObservableSourceAsFlowStressTest : TestBase() { 12 13 private val iterations = 100 * stressTestMultiplierSqrt 14 15 @Before setupnull16 fun setup() { 17 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 18 } 19 20 @Test <lambda>null21 fun testAsFlowCancellation() = runTest { 22 repeat(iterations) { 23 val latch = Channel<Unit>(1) 24 var i = 0 25 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) 26 .doOnNext { if (++i > 100) latch.trySend(Unit) } 27 val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) 28 latch.receive() 29 job.cancelAndJoin() 30 } 31 } 32 } 33