• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx3
6 
7 import io.reactivex.rxjava3.core.*
8 import io.reactivex.rxjava3.exceptions.*
9 import kotlinx.coroutines.*
10 import kotlinx.coroutines.channels.*
11 import kotlinx.coroutines.flow.*
12 import org.junit.*
13 import java.util.concurrent.*
14 
15 class ObservableSourceAsFlowStressTest : TestBase() {
16 
17     private val iterations = 100 * stressTestMultiplierSqrt
18 
19     @Before
setupnull20     fun setup() {
21         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
22     }
23 
24     @Test
<lambda>null25     fun testAsFlowCancellation() = runTest {
26         repeat(iterations) {
27             val latch = Channel<Unit>(1)
28             var i = 0
29             val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
30                 .doOnNext {  if (++i > 100) latch.trySend(Unit) }
31             val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
32             latch.receive()
33             job.cancelAndJoin()
34         }
35     }
36 }
37