• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactor
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.*
9 import kotlinx.coroutines.reactive.*
10 import org.junit.*
11 import org.junit.Test
12 import reactor.core.publisher.*
13 import kotlin.test.*
14 
15 class FluxContextTest : TestBase() {
16     private val dispatcher = newSingleThreadContext("FluxContextTest")
17 
18     @After
tearDownnull19     fun tearDown() {
20         dispatcher.close()
21     }
22 
23     @Test
<lambda>null24     fun testFluxCreateAsFlowThread() = runTest {
25         expect(1)
26         val mainThread = Thread.currentThread()
27         val dispatcherThread = withContext(dispatcher) { Thread.currentThread() }
28         assertTrue(dispatcherThread != mainThread)
29         Flux.create<String> {
30             assertEquals(dispatcherThread, Thread.currentThread())
31             it.next("OK")
32             it.complete()
33         }
34             .asFlow()
35             .flowOn(dispatcher)
36             .collect {
37                 expect(2)
38                 assertEquals("OK", it)
39                 assertEquals(mainThread, Thread.currentThread())
40             }
41         finish(3)
42     }
43 }