1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.channels 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.* 9 import kotlin.test.* 10 11 class ChannelUndeliveredElementTest : TestBase() { 12 @Test 13 fun testSendSuccessfully() = runTest { 14 runAllKindsTest { kind -> 15 val channel = kind.create<Resource> { it.cancel() } 16 val res = Resource("OK") 17 launch { 18 channel.send(res) 19 } 20 val ok = channel.receive() 21 assertEquals("OK", ok.value) 22 assertFalse(res.isCancelled) // was not cancelled 23 channel.close() 24 assertFalse(res.isCancelled) // still was not cancelled 25 } 26 } 27 28 @Test 29 fun testRendezvousSendCancelled() = runTest { 30 val channel = Channel<Resource> { it.cancel() } 31 val res = Resource("OK") 32 val sender = launch(start = CoroutineStart.UNDISPATCHED) { 33 assertFailsWith<CancellationException> { 34 channel.send(res) // suspends & get cancelled 35 } 36 } 37 sender.cancelAndJoin() 38 assertTrue(res.isCancelled) 39 } 40 41 @Test 42 fun testBufferedSendCancelled() = runTest { 43 val channel = Channel<Resource>(1) { it.cancel() } 44 val resA = Resource("A") 45 val resB = Resource("B") 46 val sender = launch(start = CoroutineStart.UNDISPATCHED) { 47 channel.send(resA) // goes to buffer 48 assertFailsWith<CancellationException> { 49 channel.send(resB) // suspends & get cancelled 50 } 51 } 52 sender.cancelAndJoin() 53 assertFalse(resA.isCancelled) // it is in buffer, not cancelled 54 assertTrue(resB.isCancelled) // send was cancelled 55 channel.cancel() // now cancel the channel 56 assertTrue(resA.isCancelled) // now cancelled in buffer 57 } 58 59 @Test 60 fun testUnlimitedChannelCancelled() = runTest { 61 val channel = Channel<Resource>(Channel.UNLIMITED) { it.cancel() } 62 val resA = Resource("A") 63 val resB = Resource("B") 64 channel.send(resA) // goes to buffer 65 channel.send(resB) // goes to buffer 66 assertFalse(resA.isCancelled) // it is in buffer, not cancelled 67 assertFalse(resB.isCancelled) // it is in buffer, not cancelled 68 channel.cancel() // now cancel the channel 69 assertTrue(resA.isCancelled) // now cancelled in buffer 70 assertTrue(resB.isCancelled) // now cancelled in buffer 71 } 72 73 @Test 74 fun testConflatedResourceCancelled() = runTest { 75 val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() } 76 val resA = Resource("A") 77 val resB = Resource("B") 78 channel.send(resA) 79 assertFalse(resA.isCancelled) 80 assertFalse(resB.isCancelled) 81 channel.send(resB) 82 assertTrue(resA.isCancelled) // it was conflated (lost) and thus cancelled 83 assertFalse(resB.isCancelled) 84 channel.close() 85 assertFalse(resB.isCancelled) // not cancelled yet, can be still read by receiver 86 channel.cancel() 87 assertTrue(resB.isCancelled) // now it is cancelled 88 } 89 90 @Test 91 fun testSendToClosedChannel() = runTest { 92 runAllKindsTest { kind -> 93 val channel = kind.create<Resource> { it.cancel() } 94 channel.close() // immediately close channel 95 val res = Resource("OK") 96 assertFailsWith<ClosedSendChannelException> { 97 channel.send(res) // send fails to closed channel, resource was not delivered 98 } 99 assertTrue(res.isCancelled) 100 } 101 } 102 103 private suspend fun runAllKindsTest(test: suspend CoroutineScope.(TestChannelKind) -> Unit) { 104 for (kind in TestChannelKind.values()) { 105 if (kind.viaBroadcast) continue // does not support onUndeliveredElement 106 try { 107 withContext(Job()) { 108 test(kind) 109 } 110 } catch(e: Throwable) { 111 error("$kind: $e", e) 112 } 113 } 114 } 115 116 private class Resource(val value: String) { 117 private val _cancelled = atomic(false) 118 119 val isCancelled: Boolean 120 get() = _cancelled.value 121 122 fun cancel() { 123 check(!_cancelled.getAndSet(true)) { "Already cancelled" } 124 } 125 } 126 127 @Test 128 fun testHandlerIsNotInvoked() = runTest { // #2826 129 val channel = Channel<Unit> { 130 expectUnreached() 131 } 132 133 expect(1) 134 launch { 135 expect(2) 136 channel.receive() 137 } 138 channel.send(Unit) 139 finish(3) 140 } 141 142 @Test 143 fun testChannelBufferOverflow() = runTest { 144 testBufferOverflowStrategy(listOf(1, 2), BufferOverflow.DROP_OLDEST) 145 testBufferOverflowStrategy(listOf(3), BufferOverflow.DROP_LATEST) 146 } 147 148 private suspend fun testBufferOverflowStrategy(expectedDroppedElements: List<Int>, strategy: BufferOverflow) { 149 val list = ArrayList<Int>() 150 val channel = Channel<Int>( 151 capacity = 2, 152 onBufferOverflow = strategy, 153 onUndeliveredElement = { value -> list.add(value) } 154 ) 155 156 channel.send(1) 157 channel.send(2) 158 159 channel.send(3) 160 channel.trySend(4).onFailure { expectUnreached() } 161 assertEquals(expectedDroppedElements, list) 162 } 163 164 165 @Test 166 fun testTrySendDoesNotInvokeHandlerOnClosedConflatedChannel() = runTest { 167 val conflated = Channel<Int>(Channel.CONFLATED, onUndeliveredElement = { 168 expectUnreached() 169 }) 170 conflated.close(IndexOutOfBoundsException()) 171 conflated.trySend(3) 172 } 173 174 @Test 175 fun testTrySendDoesNotInvokeHandlerOnClosedChannel() = runTest { 176 val conflated = Channel<Int>(3, onUndeliveredElement = { 177 expectUnreached() 178 }) 179 conflated.close(IndexOutOfBoundsException()) 180 repeat(10) { 181 conflated.trySend(3) 182 } 183 } 184 185 @Test 186 fun testTrySendDoesNotInvokeHandler() { 187 for (capacity in 0..2) { 188 testTrySendDoesNotInvokeHandler(capacity) 189 } 190 } 191 192 private fun testTrySendDoesNotInvokeHandler(capacity: Int) { 193 val channel = Channel<Int>(capacity, BufferOverflow.DROP_LATEST, onUndeliveredElement = { 194 expectUnreached() 195 }) 196 repeat(10) { 197 channel.trySend(3) 198 } 199 } 200 } 201