• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.rx2
6 
7 import io.reactivex.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.*
11 import org.junit.*
12 import java.util.concurrent.*
13 
14 class ObservableSourceAsFlowStressTest : TestBase() {
15 
16     private val iterations = 100 * stressTestMultiplierSqrt
17 
18     @Before
setupnull19     fun setup() {
20         ignoreLostThreads("RxComputationThreadPool-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
21     }
22 
23     @Test
<lambda>null24     fun testAsFlowCancellation() = runTest {
25         repeat(iterations) {
26             val latch = Channel<Unit>(1)
27             var i = 0
28             val observable = Observable.interval(100L, TimeUnit.MICROSECONDS)
29                 .doOnNext {  if (++i > 100) latch.trySend(Unit) }
30             val job = observable.asFlow().launchIn(CoroutineScope(Dispatchers.Default))
31             latch.receive()
32             job.cancelAndJoin()
33         }
34     }
35 }
36