1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.rx3 6 7 import io.reactivex.rxjava3.core.* 8 import kotlinx.coroutines.* 9 import kotlinx.coroutines.flow.* 10 import kotlinx.coroutines.reactive.* 11 import org.junit.* 12 import org.junit.Test 13 import kotlin.test.* 14 15 class FlowableContextTest : TestBase() { 16 private val dispatcher = newSingleThreadContext("FlowableContextTest") 17 18 @After tearDownnull19 fun tearDown() { 20 dispatcher.close() 21 } 22 23 @Test <lambda>null24 fun testFlowableCreateAsFlowThread() = runTest { 25 expect(1) 26 val mainThread = Thread.currentThread() 27 val dispatcherThread = withContext(dispatcher) { Thread.currentThread() } 28 assertTrue(dispatcherThread != mainThread) 29 Flowable.create<String>({ 30 assertEquals(dispatcherThread, Thread.currentThread()) 31 it.onNext("OK") 32 it.onComplete() 33 }, BackpressureStrategy.BUFFER) 34 .asFlow() 35 .flowOn(dispatcher) 36 .collect { 37 expect(2) 38 assertEquals("OK", it) 39 assertEquals(mainThread, Thread.currentThread()) 40 } 41 finish(3) 42 } 43 } 44