1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.reactive 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.* 10 import org.reactivestreams.* 11 import kotlin.test.* 12 13 class PublisherAsFlowTest : TestBase() { 14 @Test 15 fun testCancellation() = runTest { 16 var onNext = 0 17 var onCancelled = 0 18 var onError = 0 19 20 val publisher = publish(currentDispatcher()) { 21 coroutineContext[Job]?.invokeOnCompletion { 22 if (it is CancellationException) ++onCancelled 23 } 24 25 repeat(100) { 26 send(it) 27 } 28 } 29 30 publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { 31 onEach { 32 ++onNext 33 throw RuntimeException() 34 } 35 catch<Throwable> { 36 ++onError 37 } 38 }.join() 39 40 41 assertEquals(1, onNext) 42 assertEquals(1, onError) 43 assertEquals(1, onCancelled) 44 } 45 46 @Test 47 fun testBufferSize1() = runTest { 48 val publisher = publish(currentDispatcher()) { 49 expect(1) 50 send(3) 51 52 expect(2) 53 send(5) 54 55 expect(4) 56 send(7) 57 expect(6) 58 } 59 60 publisher.asFlow().buffer(1).collect { 61 expect(it) 62 } 63 64 finish(8) 65 } 66 67 @Test 68 fun testBufferSizeDefault() = runTest { 69 val publisher = publish(currentDispatcher()) { 70 repeat(64) { 71 send(it + 1) 72 expect(it + 1) 73 } 74 assertFalse { trySend(-1).isSuccess } 75 } 76 77 publisher.asFlow().collect { 78 expect(64 + it) 79 } 80 81 finish(129) 82 } 83 84 @Test 85 fun testDefaultCapacityIsProperlyOverwritten() = runTest { 86 val publisher = publish(currentDispatcher()) { 87 expect(1) 88 send(3) 89 expect(2) 90 send(5) 91 expect(4) 92 send(7) 93 expect(6) 94 } 95 96 publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect { 97 expect(it) 98 } 99 100 finish(8) 101 } 102 103 @Test 104 fun testBufferSize10() = runTest { 105 val publisher = publish(currentDispatcher()) { 106 expect(1) 107 send(5) 108 109 expect(2) 110 send(6) 111 112 expect(3) 113 send(7) 114 expect(4) 115 } 116 117 publisher.asFlow().buffer(10).collect { 118 expect(it) 119 } 120 121 finish(8) 122 } 123 124 @Test 125 fun testConflated() = runTest { 126 val publisher = publish(currentDispatcher()) { 127 for (i in 1..5) send(i) 128 } 129 val list = publisher.asFlow().conflate().toList() 130 assertEquals(listOf(1, 5), list) 131 } 132 133 @Test 134 fun testProduce() = runTest { 135 val flow = publish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() 136 check((0..9).toList(), flow.produceIn(this)) 137 check((0..9).toList(), flow.buffer(2).produceIn(this)) 138 check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) 139 check(listOf(0, 9), flow.conflate().produceIn(this)) 140 } 141 142 private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) { 143 val result = ArrayList<Int>(10) 144 channel.consumeEach { result.add(it) } 145 assertEquals(expected, result) 146 } 147 148 @Test 149 fun testProduceCancellation() = runTest { 150 expect(1) 151 // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled 152 val flow = publish(currentDispatcher()) { 153 expect(3) 154 repeat(10) { value -> 155 when (value) { 156 in 0..6 -> send(value) 157 7 -> try { 158 send(value) 159 } catch (e: CancellationException) { 160 expect(5) 161 throw e 162 } 163 else -> expectUnreached() 164 } 165 } 166 }.asFlow().buffer(1) 167 assertFailsWith<TestException> { 168 coroutineScope { 169 expect(2) 170 val channel = flow.produceIn(this) 171 channel.consumeEach { value -> 172 when (value) { 173 in 0..4 -> {} 174 5 -> { 175 expect(4) 176 throw TestException() 177 } 178 else -> expectUnreached() 179 } 180 } 181 } 182 } 183 finish(6) 184 } 185 186 @Test 187 fun testRequestRendezvous() = 188 testRequestSizeWithBuffer(Channel.RENDEZVOUS, BufferOverflow.SUSPEND, 1) 189 190 @Test 191 fun testRequestBuffer1() = 192 testRequestSizeWithBuffer(1, BufferOverflow.SUSPEND, 1) 193 194 @Test 195 fun testRequestBuffer10() = 196 testRequestSizeWithBuffer(10, BufferOverflow.SUSPEND, 10) 197 198 @Test 199 fun testRequestBufferUnlimited() = 200 testRequestSizeWithBuffer(Channel.UNLIMITED, BufferOverflow.SUSPEND, Long.MAX_VALUE) 201 202 @Test 203 fun testRequestBufferOverflowSuspend() = 204 testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.SUSPEND, 64) 205 206 @Test 207 fun testRequestBufferOverflowDropOldest() = 208 testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE) 209 210 @Test 211 fun testRequestBufferOverflowDropLatest() = 212 testRequestSizeWithBuffer(Channel.BUFFERED, BufferOverflow.DROP_LATEST, Long.MAX_VALUE) 213 214 @Test 215 fun testRequestBuffer10OverflowDropOldest() = 216 testRequestSizeWithBuffer(10, BufferOverflow.DROP_OLDEST, Long.MAX_VALUE) 217 218 @Test 219 fun testRequestBuffer10OverflowDropLatest() = 220 testRequestSizeWithBuffer(10, BufferOverflow.DROP_LATEST, Long.MAX_VALUE) 221 222 /** 223 * Tests `publisher.asFlow.buffer(...)` chain, verifying expected requests size and that only expected 224 * values are delivered. 225 */ 226 private fun testRequestSizeWithBuffer( 227 capacity: Int, 228 onBufferOverflow: BufferOverflow, 229 expectedRequestSize: Long 230 ) = runTest { 231 val m = 50 232 // publishers numbers from 1 to m 233 val publisher = Publisher<Int> { s -> 234 s.onSubscribe(object : Subscription { 235 var lastSent = 0 236 var remaining = 0L 237 override fun request(n: Long) { 238 assertEquals(expectedRequestSize, n) 239 remaining += n 240 check(remaining >= 0) 241 while (lastSent < m && remaining > 0) { 242 s.onNext(++lastSent) 243 remaining-- 244 } 245 if (lastSent == m) s.onComplete() 246 } 247 248 override fun cancel() {} 249 }) 250 } 251 val flow = publisher 252 .asFlow() 253 .buffer(capacity, onBufferOverflow) 254 val list = flow.toList() 255 val runSize = if (capacity == Channel.BUFFERED) 1 else capacity 256 val expected = when (onBufferOverflow) { 257 // Everything is expected to be delivered 258 BufferOverflow.SUSPEND -> (1..m).toList() 259 // Only the last one (by default) or the last "capacity" items delivered 260 BufferOverflow.DROP_OLDEST -> (m - runSize + 1..m).toList() 261 // Only the first one (by default) or the first "capacity" items delivered 262 BufferOverflow.DROP_LATEST -> (1..runSize).toList() 263 } 264 assertEquals(expected, list) 265 } 266 267 @Test 268 fun testException() = runTest { 269 expect(1) 270 val p = publish<Int> { throw TestException() }.asFlow() 271 p.catch { 272 assertTrue { it is TestException } 273 finish(2) 274 }.collect() 275 } 276 } 277