• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.reactive
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.*
10 import org.reactivestreams.*
11 import kotlin.test.*
12 
13 class PublisherAsFlowTest : TestBase() {
14     @Test
15     fun testCancellation() = runTest {
16         var onNext = 0
17         var onCancelled = 0
18         var onError = 0
19 
20         val publisher = publish(currentDispatcher()) {
21             coroutineContext[Job]?.invokeOnCompletion {
22                 if (it is CancellationException) ++onCancelled
23             }
24 
25             repeat(100) {
26                 send(it)
27             }
28         }
29 
30         publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) {
31             onEach {
32                 ++onNext
33                 throw RuntimeException()
34             }
35             catch<Throwable> {
36                 ++onError
37             }
38         }.join()
39 
40 
41         assertEquals(1, onNext)
42         assertEquals(1, onError)
43         assertEquals(1, onCancelled)
44     }
45 
46     @Test
47     fun testBufferSize1() = runTest {
48         val publisher = publish(currentDispatcher()) {
49             expect(1)
50             send(3)
51 
52             expect(2)
53             send(5)
54 
55             expect(4)
56             send(7)
57             expect(6)
58         }
59 
60         publisher.asFlow().buffer(1).collect {
61             expect(it)
62         }
63 
64         finish(8)
65     }
66 
67     @Test
68     fun testBufferSizeDefault() = runTest {
69         val publisher = publish(currentDispatcher()) {
70             repeat(64) {
71                 send(it + 1)
72                 expect(it + 1)
73             }
74             assertFalse { trySend(-1).isSuccess }
75         }
76 
77         publisher.asFlow().collect {
78             expect(64 + it)
79         }
80 
81         finish(129)
82     }
83 
84     @Test
85     fun testDefaultCapacityIsProperlyOverwritten() = runTest {
86         val publisher = publish(currentDispatcher()) {
87             expect(1)
88             send(3)
89             expect(2)
90             send(5)
91             expect(4)
92             send(7)
93             expect(6)
94         }
95 
96         publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect {
97             expect(it)
98         }
99 
100         finish(8)
101     }
102 
103     @Test
104     fun testBufferSize10() = runTest {
105         val publisher = publish(currentDispatcher()) {
106             expect(1)
107             send(5)
108 
109             expect(2)
110             send(6)
111 
112             expect(3)
113             send(7)
114             expect(4)
115         }
116 
117         publisher.asFlow().buffer(10).collect {
118             expect(it)
119         }
120 
121         finish(8)
122     }
123 
124     @Test
125     fun testConflated() = runTest {
126         val publisher = publish(currentDispatcher()) {
127             for (i in 1..5) send(i)
128         }
129         val list = publisher.asFlow().conflate().toList()
130         assertEquals(listOf(1, 5), list)
131     }
132 
133     @Test
134     fun testProduce() = runTest {
135         val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow()
136         check((0..9).toList(), flow.produceIn(this))
137         check((0..9).toList(), flow.buffer(2).produceIn(this))
138         check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this))
139         check(listOf(0, 9), flow.conflate().produceIn(this))
140     }
141 
142     private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) {
143         val result = ArrayList<Int>(10)
144         channel.consumeEach { result.add(it) }
145         assertEquals(expected, result)
146     }
147 
148     @Test
149     fun testProduceCancellation() = runTest {
150         expect(1)
151         // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled
152         val flow = publish(currentDispatcher()) {
153             expect(3)
154             repeat(10) { value ->
155                 when (value) {
156                     in 0..6 -> send(value)
157                     7 -> try {
158                         send(value)
159                     } catch (e: CancellationException) {
160                         expect(5)
161                         throw e
162                     }
163                     else -> expectUnreached()
164                 }
165             }
166         }.asFlow().buffer(1)
167         assertFailsWith<TestException> {
168             coroutineScope {
169                 expect(2)
170                 val channel = flow.produceIn(this)
171                 channel.consumeEach { value ->
172                     when (value) {
173                         in 0..4 -> {}
174                         5 -> {
175                             expect(4)
176                             throw TestException()
177                         }
178                         else -> expectUnreached()
179                     }
180                 }
181             }
182         }
183         finish(6)
184     }
185 
186     @Test
187     fun testRequestRendezvous() =
188         testRequestSizeWithBuffer(Channel.RENDEZVOUS, BufferOverflow.SUSPEND, 1)
189 
190     @Test
191     fun testRequestBuffer1() =
192         testRequestSizeWithBuffer(1, BufferOverflow.SUSPEND, 1)
193 
194     @Test
195     fun testRequestBuffer10() =
196         testRequestSizeWithBuffer(10, BufferOverflow.SUSPEND, 10)
197 
198     @Test
199     fun testRequestBufferUnlimited() =
200         testRequestSizeWithBuffer(Channel.UNLIMITED, BufferOverflow.SUSPEND, Long.MAX_VALUE)
201 
202     @Test
203     fun testRequestBufferOverflowSuspend() =
204         testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.SUSPEND, 64)
205 
206     @Test
207     fun testRequestBufferOverflowDropOldest() =
208         testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE)
209 
210     @Test
211     fun testRequestBufferOverflowDropLatest() =
212         testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_LATEST, Long.MAX_VALUE)
213 
214     @Test
215     fun testRequestBuffer10OverflowDropOldest() =
216         testRequestSizeWithBuffer(10, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE)
217 
218     @Test
219     fun testRequestBuffer10OverflowDropLatest() =
220         testRequestSizeWithBuffer(10, BufferOverflow.DROP_LATEST, Long.MAX_VALUE)
221 
222     /**
223      * Tests `publisher.asFlow.buffer(...)` chain, verifying expected requests size and that only expected
224      * values are delivered.
225      */
226     private fun testRequestSizeWithBuffer(
227         capacity: Int,
228         onBufferOverflow: BufferOverflow,
229         expectedRequestSize: Long
230     ) = runTest {
231         val m = 50
232         // publishers numbers from 1 to m
233         val publisher = Publisher<Int> { s ->
234             s.onSubscribe(object : Subscription {
235                 var lastSent = 0
236                 var remaining = 0L
237                 override fun request(n: Long) {
238                     assertEquals(expectedRequestSize, n)
239                     remaining += n
240                     check(remaining >= 0)
241                     while (lastSent < m && remaining > 0) {
242                         s.onNext(++lastSent)
243                         remaining--
244                     }
245                     if (lastSent == m) s.onComplete()
246                 }
247 
248                 override fun cancel() {}
249             })
250         }
251         val flow = publisher
252             .asFlow()
253             .buffer(capacity, onBufferOverflow)
254         val list = flow.toList()
255         val runSize = if (capacity == Channel.BUFFERED) 1 else capacity
256         val expected = when (onBufferOverflow) {
257             // Everything is expected to be delivered
258             BufferOverflow.SUSPEND -> (1..m).toList()
259             // Only the last one (by default) or the last "capacity" items delivered
260             BufferOverflow.DROP_OLDEST -> (m - runSize + 1..m).toList()
261             // Only the first one (by default) or the first "capacity" items delivered
262             BufferOverflow.DROP_LATEST -> (1..runSize).toList()
263         }
264         assertEquals(expected, list)
265     }
266 
267     @Test
268     fun testException() = runTest {
269         expect(1)
270         val p = publish<Int> { throw TestException() }.asFlow()
271         p.catch {
272             assertTrue { it is TestException }
273             finish(2)
274         }.collect()
275     }
276 }
277