<lambda>null1 package kotlinx.coroutines.reactor
2
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.flow.*
5 import kotlinx.coroutines.reactive.*
6 import org.junit.Test
7 import org.reactivestreams.*
8 import reactor.core.publisher.*
9 import reactor.util.context.Context
10 import java.util.concurrent.*
11 import kotlin.test.*
12
13 @Suppress("ReactiveStreamsSubscriberImplementation")
14 class FlowAsFluxTest : TestBase() {
15 @Test
16 fun testFlowAsFluxContextPropagation() {
17 val flux = flow {
18 (1..4).forEach { i -> emit(createMono(i).awaitSingle()) }
19 }
20 .asFlux()
21 .contextWrite(Context.of(1, "1"))
22 .contextWrite(Context.of(2, "2", 3, "3", 4, "4"))
23 val list = flux.collectList().block()!!
24 assertEquals(listOf("1", "2", "3", "4"), list)
25 }
26
27 private fun createMono(i: Int): Mono<String> = mono {
28 val ctx = coroutineContext[ReactorContext]!!.context
29 ctx.getOrDefault(i, "noValue")
30 }
31
32 @Test
33 fun testFluxAsFlowContextPropagationWithFlowOn() = runTest {
34 expect(1)
35 Flux.create<String> {
36 it.next("OK")
37 it.complete()
38 }
39 .contextWrite { ctx ->
40 expect(2)
41 assertEquals("CTX", ctx.get(1))
42 ctx
43 }
44 .asFlow()
45 .flowOn(ReactorContext(Context.of(1, "CTX")))
46 .collect {
47 expect(3)
48 assertEquals("OK", it)
49 }
50 finish(4)
51 }
52
53 @Test
54 fun testFluxAsFlowContextPropagationFromScope() = runTest {
55 expect(1)
56 withContext(ReactorContext(Context.of(1, "CTX"))) {
57 Flux.create<String> {
58 it.next("OK")
59 it.complete()
60 }
61 .contextWrite { ctx ->
62 expect(2)
63 assertEquals("CTX", ctx.get(1))
64 ctx
65 }
66 .asFlow()
67 .collect {
68 expect(3)
69 assertEquals("OK", it)
70 }
71 }
72 finish(4)
73 }
74
75 @Test
76 fun testUnconfinedDefaultContext() {
77 expect(1)
78 val thread = Thread.currentThread()
79 fun checkThread() {
80 assertSame(thread, Thread.currentThread())
81 }
82 flowOf(42).asFlux().subscribe(object : Subscriber<Int> {
83 private lateinit var subscription: Subscription
84
85 override fun onSubscribe(s: Subscription) {
86 expect(2)
87 subscription = s
88 subscription.request(2)
89 }
90
91 override fun onNext(t: Int) {
92 checkThread()
93 expect(3)
94 assertEquals(42, t)
95 }
96
97 override fun onComplete() {
98 checkThread()
99 expect(4)
100 }
101
102 override fun onError(t: Throwable?) {
103 expectUnreached()
104 }
105 })
106 finish(5)
107 }
108
109 @Test
110 fun testConfinedContext() {
111 expect(1)
112 val threadName = "FlowAsFluxTest.testConfinedContext"
113 fun checkThread() {
114 val currentThread = Thread.currentThread()
115 assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
116 }
117 val completed = CountDownLatch(1)
118 newSingleThreadContext(threadName).use { dispatcher ->
119 flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber<Int> {
120 private lateinit var subscription: Subscription
121
122 override fun onSubscribe(s: Subscription) {
123 expect(2)
124 subscription = s
125 subscription.request(2)
126 }
127
128 override fun onNext(t: Int) {
129 checkThread()
130 expect(3)
131 assertEquals(42, t)
132 }
133
134 override fun onComplete() {
135 checkThread()
136 expect(4)
137 completed.countDown()
138 }
139
140 override fun onError(t: Throwable?) {
141 expectUnreached()
142 }
143 })
144 completed.await()
145 }
146 finish(5)
147 }
148 }
149