• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.reactor
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.reactive.*
6 import org.junit.Test
7 import org.reactivestreams.*
8 import reactor.core.publisher.*
9 import reactor.util.context.Context
10 import java.util.concurrent.*
11 import kotlin.test.*
12 
13 @Suppress("ReactiveStreamsSubscriberImplementation")
14 class FlowAsFluxTest : TestBase() {
15     @Test
16     fun testFlowAsFluxContextPropagation() {
17         val flux = flow {
18             (1..4).forEach { i -> emit(createMono(i).awaitSingle()) }
19         }
20             .asFlux()
21             .contextWrite(Context.of(1, "1"))
22             .contextWrite(Context.of(2, "2", 3, "3", 4, "4"))
23         val list = flux.collectList().block()!!
24         assertEquals(listOf("1", "2", "3", "4"), list)
25     }
26 
27     private fun createMono(i: Int): Mono<String> = mono {
28         val ctx = coroutineContext[ReactorContext]!!.context
29         ctx.getOrDefault(i, "noValue")
30     }
31 
32     @Test
33     fun testFluxAsFlowContextPropagationWithFlowOn() = runTest {
34         expect(1)
35         Flux.create<String> {
36             it.next("OK")
37             it.complete()
38         }
39             .contextWrite { ctx ->
40                 expect(2)
41                 assertEquals("CTX", ctx.get(1))
42                 ctx
43             }
44             .asFlow()
45             .flowOn(ReactorContext(Context.of(1, "CTX")))
46             .collect {
47                 expect(3)
48                 assertEquals("OK", it)
49             }
50         finish(4)
51     }
52 
53     @Test
54     fun testFluxAsFlowContextPropagationFromScope() = runTest {
55         expect(1)
56         withContext(ReactorContext(Context.of(1, "CTX"))) {
57             Flux.create<String> {
58                     it.next("OK")
59                     it.complete()
60                 }
61             .contextWrite { ctx ->
62                 expect(2)
63                 assertEquals("CTX", ctx.get(1))
64                 ctx
65             }
66             .asFlow()
67             .collect {
68                 expect(3)
69                 assertEquals("OK", it)
70             }
71         }
72         finish(4)
73     }
74 
75     @Test
76     fun testUnconfinedDefaultContext() {
77         expect(1)
78         val thread = Thread.currentThread()
79         fun checkThread() {
80             assertSame(thread, Thread.currentThread())
81         }
82         flowOf(42).asFlux().subscribe(object : Subscriber<Int> {
83             private lateinit var subscription: Subscription
84 
85             override fun onSubscribe(s: Subscription) {
86                 expect(2)
87                 subscription = s
88                 subscription.request(2)
89             }
90 
91             override fun onNext(t: Int) {
92                 checkThread()
93                 expect(3)
94                 assertEquals(42, t)
95             }
96 
97             override fun onComplete() {
98                 checkThread()
99                 expect(4)
100             }
101 
102             override fun onError(t: Throwable?) {
103                 expectUnreached()
104             }
105         })
106         finish(5)
107     }
108 
109     @Test
110     fun testConfinedContext() {
111         expect(1)
112         val threadName = "FlowAsFluxTest.testConfinedContext"
113         fun checkThread() {
114             val currentThread = Thread.currentThread()
115             assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
116         }
117         val completed = CountDownLatch(1)
118         newSingleThreadContext(threadName).use { dispatcher ->
119             flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber<Int> {
120                 private lateinit var subscription: Subscription
121 
122                 override fun onSubscribe(s: Subscription) {
123                     expect(2)
124                     subscription = s
125                     subscription.request(2)
126                 }
127 
128                 override fun onNext(t: Int) {
129                     checkThread()
130                     expect(3)
131                     assertEquals(42, t)
132                 }
133 
134                 override fun onComplete() {
135                     checkThread()
136                     expect(4)
137                     completed.countDown()
138                 }
139 
140                 override fun onError(t: Throwable?) {
141                     expectUnreached()
142                 }
143             })
144             completed.await()
145         }
146         finish(5)
147     }
148 }
149