• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 package kotlinx.coroutines.rx3
2 
3 import kotlinx.coroutines.testing.*
4 import io.reactivex.rxjava3.core.*
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.flow.*
7 import kotlinx.coroutines.reactive.*
8 import org.junit.*
9 import org.junit.Test
10 import kotlin.test.*
11 
12 class FlowableContextTest : TestBase() {
13     private val dispatcher = newSingleThreadContext("FlowableContextTest")
14 
15     @After
tearDownnull16     fun tearDown() {
17         dispatcher.close()
18     }
19 
20     @Test
<lambda>null21     fun testFlowableCreateAsFlowThread() = runTest {
22         expect(1)
23         val mainThread = Thread.currentThread()
24         val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
25         assertTrue(dispatcherThread != mainThread)
26         Flowable.create<String>({
27             assertEquals(dispatcherThread, Thread.currentThread())
28             it.onNext("OK")
29             it.onComplete()
30         }, BackpressureStrategy.BUFFER)
31             .asFlow()
32             .flowOn(dispatcher)
33             .collect {
34                 expect(2)
35                 assertEquals("OK", it)
36                 assertEquals(mainThread, Thread.currentThread())
37             }
38         finish(3)
39     }
40 }
41