<lambda>null1package kotlinx.coroutines.channels 2 3 import kotlinx.atomicfu.* 4 import kotlinx.coroutines.* 5 import kotlin.test.* 6 7 class ChannelUndeliveredElementTest : TestBase() { 8 @Test 9 fun testSendSuccessfully() = runAllKindsTest { kind -> 10 val channel = kind.create<Resource> { it.cancel() } 11 val res = Resource("OK") 12 launch { 13 channel.send(res) 14 } 15 val ok = channel.receive() 16 assertEquals("OK", ok.value) 17 assertFalse(res.isCancelled) // was not cancelled 18 channel.close() 19 assertFalse(res.isCancelled) // still was not cancelled 20 } 21 22 @Test 23 fun testRendezvousSendCancelled() = runTest { 24 val channel = Channel<Resource> { it.cancel() } 25 val res = Resource("OK") 26 val sender = launch(start = CoroutineStart.UNDISPATCHED) { 27 assertFailsWith<CancellationException> { 28 channel.send(res) // suspends & get cancelled 29 } 30 } 31 sender.cancelAndJoin() 32 assertTrue(res.isCancelled) 33 } 34 35 @Test 36 fun testBufferedSendCancelled() = runTest { 37 val channel = Channel<Resource>(1) { it.cancel() } 38 val resA = Resource("A") 39 val resB = Resource("B") 40 val sender = launch(start = CoroutineStart.UNDISPATCHED) { 41 channel.send(resA) // goes to buffer 42 assertFailsWith<CancellationException> { 43 channel.send(resB) // suspends & get cancelled 44 } 45 } 46 sender.cancelAndJoin() 47 assertFalse(resA.isCancelled) // it is in buffer, not cancelled 48 assertTrue(resB.isCancelled) // send was cancelled 49 channel.cancel() // now cancel the channel 50 assertTrue(resA.isCancelled) // now cancelled in buffer 51 } 52 53 @Test 54 fun testConflatedResourceCancelled() = runTest { 55 val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() } 56 val resA = Resource("A") 57 val resB = Resource("B") 58 channel.send(resA) 59 assertFalse(resA.isCancelled) 60 assertFalse(resB.isCancelled) 61 channel.send(resB) 62 assertTrue(resA.isCancelled) // it was conflated (lost) and thus cancelled 63 assertFalse(resB.isCancelled) 64 channel.close() 65 assertFalse(resB.isCancelled) // not cancelled yet, can be still read by receiver 66 channel.cancel() 67 assertTrue(resB.isCancelled) // now it is cancelled 68 } 69 70 @Test 71 fun testSendToClosedChannel() = runAllKindsTest { kind -> 72 val channel = kind.create<Resource> { it.cancel() } 73 channel.close() // immediately close channel 74 val res = Resource("OK") 75 assertFailsWith<ClosedSendChannelException> { 76 channel.send(res) // send fails to closed channel, resource was not delivered 77 } 78 assertTrue(res.isCancelled) 79 } 80 81 private fun runAllKindsTest(test: suspend CoroutineScope.(TestChannelKind) -> Unit) { 82 for (kind in TestChannelKind.values()) { 83 if (kind.viaBroadcast) continue // does not support onUndeliveredElement 84 try { 85 runTest { 86 test(kind) 87 } 88 } catch(e: Throwable) { 89 error("$kind: $e", e) 90 } 91 } 92 } 93 94 private class Resource(val value: String) { 95 private val _cancelled = atomic(false) 96 97 val isCancelled: Boolean 98 get() = _cancelled.value 99 100 fun cancel() { 101 check(!_cancelled.getAndSet(true)) { "Already cancelled" } 102 } 103 } 104 }