1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import io.reactivex.rxjava3.exceptions.* 9 import kotlinx.coroutines.* 10 import kotlinx.coroutines.channels.* 11 import kotlinx.coroutines.flow.* 12 import org.junit.* 13 import java.util.concurrent.* 14 15 class ObservableSourceAsFlowStressTest : TestBase() { 16 17 private val iterations = 100 * stressTestMultiplierSqrt 18 19 @Before setupnull20 fun setup() { 21 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 22 } 23 24 @Test <lambda>null25 fun testAsFlowCancellation() = runTest { 26 repeat(iterations) { 27 val latch = Channel<Unit>(1) 28 var i = 0 29 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) 30 .doOnNext { if (++i > 100) latch.trySend(Unit) } 31 val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) 32 latch.receive() 33 job.cancelAndJoin() 34 } 35 } 36 } 37