1 package kotlinx.coroutines.rx3 2 3 import kotlinx.coroutines.testing.* 4 import io.reactivex.rxjava3.core.* 5 import kotlinx.coroutines.* 6 import kotlinx.coroutines.flow.* 7 import kotlinx.coroutines.reactive.* 8 import org.junit.* 9 import org.junit.Test 10 import kotlin.test.* 11 12 class FlowableContextTest : TestBase() { 13 private val dispatcher = newSingleThreadContext("FlowableContextTest") 14 15 @After tearDownnull16 fun tearDown() { 17 dispatcher.close() 18 } 19 20 @Test <lambda>null21 fun testFlowableCreateAsFlowThread() = runTest { 22 expect(1) 23 val mainThread = Thread.currentThread() 24 val dispatcherThread = withContext(dispatcher) { Thread.currentThread() } 25 assertTrue(dispatcherThread != mainThread) 26 Flowable.create<String>({ 27 assertEquals(dispatcherThread, Thread.currentThread()) 28 it.onNext("OK") 29 it.onComplete() 30 }, BackpressureStrategy.BUFFER) 31 .asFlow() 32 .flowOn(dispatcher) 33 .collect { 34 expect(2) 35 assertEquals("OK", it) 36 assertEquals(mainThread, Thread.currentThread()) 37 } 38 finish(3) 39 } 40 } 41