1 /* 2 * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactor 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.* 9 import kotlinx.coroutines.reactive.* 10 import org.junit.* 11 import org.junit.Test 12 import reactor.core.publisher.* 13 import kotlin.test.* 14 15 class FluxContextTest : TestBase() { 16 private val dispatcher = newSingleThreadContext("FluxContextTest") 17 18 @After tearDownnull19 fun tearDown() { 20 dispatcher.close() 21 } 22 23 @Test <lambda>null24 fun testFluxCreateAsFlowThread() = runTest { 25 expect(1) 26 val mainThread = Thread.currentThread() 27 val dispatcherThread = withContext(dispatcher) { Thread.currentThread() } 28 assertTrue(dispatcherThread != mainThread) 29 Flux.create<String> { 30 assertEquals(dispatcherThread, Thread.currentThread()) 31 it.next("OK") 32 it.complete() 33 } 34 .asFlow() 35 .flowOn(dispatcher) 36 .collect { 37 expect(2) 38 assertEquals("OK", it) 39 assertEquals(mainThread, Thread.currentThread()) 40 } 41 finish(3) 42 } 43 }