1 /* 2 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx2 6 7 import io.reactivex.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.channels.* 10 import kotlinx.coroutines.flow.* 11 import org.junit.* 12 import java.util.concurrent.* 13 14 class ObservableSourceAsFlowStressTest : TestBase() { 15 16 private val iterations = 100 * stressTestMultiplierSqrt 17 18 @Before setupnull19 fun setup() { 20 ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") 21 } 22 23 @Test <lambda>null24 fun testAsFlowCancellation() = runTest { 25 repeat(iterations) { 26 val latch = Channel<Unit>(1) 27 var i = 0 28 val observable = Observable.interval(100L, TimeUnit.MICROSECONDS) 29 .doOnNext { if (++i > 100) latch.trySend(Unit) } 30 val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default)) 31 latch.receive() 32 job.cancelAndJoin() 33 } 34 } 35 } 36