• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.flow.*
8 import org.junit.*
9 import java.util.concurrent.*
10 
11 class ObservableSourceAsFlowStressTest : TestBase() {
12 
13     private val iterations = 100 * stressTestMultiplierSqrt
14 
15     @Before
setupnull16     fun setup() {
17         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
18     }
19 
20     @Test
<lambda>null21     fun testAsFlowCancellation() = runTest {
22         repeat(iterations) {
23             val latch = Channel<Unit>(1)
24             var i = 0
25             val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
26                 .doOnNext {  if (++i > 100) latch.trySend(Unit) }
27             val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
28             latch.receive()
29             job.cancelAndJoin()
30         }
31     }
32 }
33