1 /* <lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.jdk9 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.channels.* 9 import kotlinx.coroutines.flow.* 10 import kotlin.test.* 11 12 class PublisherAsFlowTest : TestBase() { 13 @Test 14 fun testCancellation() = runTest { 15 var onNext = 0 16 var onCancelled = 0 17 var onError = 0 18 19 val publisher = flowPublish(currentDispatcher()) { 20 coroutineContext[Job]?.invokeOnCompletion { 21 if (it is CancellationException) ++onCancelled 22 } 23 24 repeat(100) { 25 send(it) 26 } 27 } 28 29 publisher.asFlow().launchIn(CoroutineScope(Dispatchers.Unconfined)) { 30 onEach { 31 ++onNext 32 throw RuntimeException() 33 } 34 catch<Throwable> { 35 ++onError 36 } 37 }.join() 38 39 40 assertEquals(1, onNext) 41 assertEquals(1, onError) 42 assertEquals(1, onCancelled) 43 } 44 45 @Test 46 fun testBufferSize1() = runTest { 47 val publisher = flowPublish(currentDispatcher()) { 48 expect(1) 49 send(3) 50 51 expect(2) 52 send(5) 53 54 expect(4) 55 send(7) 56 expect(6) 57 } 58 59 publisher.asFlow().buffer(1).collect { 60 expect(it) 61 } 62 63 finish(8) 64 } 65 66 @Test 67 fun testBufferSizeDefault() = runTest { 68 val publisher = flowPublish(currentDispatcher()) { 69 repeat(64) { 70 send(it + 1) 71 expect(it + 1) 72 } 73 assertFalse { trySend(-1).isSuccess } 74 } 75 76 publisher.asFlow().collect { 77 expect(64 + it) 78 } 79 80 finish(129) 81 } 82 83 @Test 84 fun testDefaultCapacityIsProperlyOverwritten() = runTest { 85 val publisher = flowPublish(currentDispatcher()) { 86 expect(1) 87 send(3) 88 expect(2) 89 send(5) 90 expect(4) 91 send(7) 92 expect(6) 93 } 94 95 publisher.asFlow().flowOn(wrapperDispatcher()).buffer(1).collect { 96 expect(it) 97 } 98 99 finish(8) 100 } 101 102 @Test 103 fun testBufferSize10() = runTest { 104 val publisher = flowPublish(currentDispatcher()) { 105 expect(1) 106 send(5) 107 108 expect(2) 109 send(6) 110 111 expect(3) 112 send(7) 113 expect(4) 114 } 115 116 publisher.asFlow().buffer(10).collect { 117 expect(it) 118 } 119 120 finish(8) 121 } 122 123 @Test 124 fun testConflated() = runTest { 125 val publisher = flowPublish(currentDispatcher()) { 126 for (i in 1..5) send(i) 127 } 128 val list = publisher.asFlow().conflate().toList() 129 assertEquals(listOf(1, 5), list) 130 } 131 132 @Test 133 fun testProduce() = runTest { 134 val flow = flowPublish(currentDispatcher()) { repeat(10) { send(it) } }.asFlow() 135 check((0..9).toList(), flow.produceIn(this)) 136 check((0..9).toList(), flow.buffer(2).produceIn(this)) 137 check((0..9).toList(), flow.buffer(Channel.UNLIMITED).produceIn(this)) 138 check(listOf(0, 9), flow.conflate().produceIn(this)) 139 } 140 141 private suspend fun check(expected: List<Int>, channel: ReceiveChannel<Int>) { 142 val result = ArrayList<Int>(10) 143 channel.consumeEach { result.add(it) } 144 assertEquals(expected, result) 145 } 146 147 @Test 148 fun testProduceCancellation() = runTest { 149 expect(1) 150 // publisher is an async coroutine, so it overproduces to the channel, but still gets cancelled 151 val flow = flowPublish(currentDispatcher()) { 152 expect(3) 153 repeat(10) { value -> 154 when (value) { 155 in 0..6 -> send(value) 156 7 -> try { 157 send(value) 158 } catch (e: CancellationException) { 159 expect(5) 160 throw e 161 } 162 else -> expectUnreached() 163 } 164 } 165 }.asFlow().buffer(1) 166 assertFailsWith<TestException> { 167 coroutineScope { 168 expect(2) 169 val channel = flow.produceIn(this) 170 channel.consumeEach { value -> 171 when (value) { 172 in 0..4 -> {} 173 5 -> { 174 expect(4) 175 throw TestException() 176 } 177 else -> expectUnreached() 178 } 179 } 180 } 181 } 182 finish(6) 183 } 184 } 185