• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.channels
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.test.*
6 
7 class ChannelUndeliveredElementTest : TestBase() {
8     @Test
9     fun testSendSuccessfully() = runAllKindsTest { kind ->
10         val channel = kind.create<Resource> { it.cancel() }
11         val res = Resource("OK")
12         launch {
13             channel.send(res)
14         }
15         val ok = channel.receive()
16         assertEquals("OK", ok.value)
17         assertFalse(res.isCancelled) // was not cancelled
18         channel.close()
19         assertFalse(res.isCancelled) // still was not cancelled
20     }
21 
22     @Test
23     fun testRendezvousSendCancelled() = runTest {
24         val channel = Channel<Resource> { it.cancel() }
25         val res = Resource("OK")
26         val sender = launch(start = CoroutineStart.UNDISPATCHED) {
27             assertFailsWith<CancellationException> {
28                 channel.send(res) // suspends & get cancelled
29             }
30         }
31         sender.cancelAndJoin()
32         assertTrue(res.isCancelled)
33     }
34 
35     @Test
36     fun testBufferedSendCancelled() = runTest {
37         val channel = Channel<Resource>(1) { it.cancel() }
38         val resA = Resource("A")
39         val resB = Resource("B")
40         val sender = launch(start = CoroutineStart.UNDISPATCHED) {
41             channel.send(resA) // goes to buffer
42             assertFailsWith<CancellationException> {
43                 channel.send(resB) // suspends & get cancelled
44             }
45         }
46         sender.cancelAndJoin()
47         assertFalse(resA.isCancelled) // it is in buffer, not cancelled
48         assertTrue(resB.isCancelled) // send was cancelled
49         channel.cancel() // now cancel the channel
50         assertTrue(resA.isCancelled) // now cancelled in buffer
51     }
52 
53     @Test
54     fun testConflatedResourceCancelled() = runTest {
55         val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() }
56         val resA = Resource("A")
57         val resB = Resource("B")
58         channel.send(resA)
59         assertFalse(resA.isCancelled)
60         assertFalse(resB.isCancelled)
61         channel.send(resB)
62         assertTrue(resA.isCancelled) // it was conflated (lost) and thus cancelled
63         assertFalse(resB.isCancelled)
64         channel.close()
65         assertFalse(resB.isCancelled) // not cancelled yet, can be still read by receiver
66         channel.cancel()
67         assertTrue(resB.isCancelled) // now it is cancelled
68     }
69 
70     @Test
71     fun testSendToClosedChannel() = runAllKindsTest { kind ->
72         val channel = kind.create<Resource> { it.cancel() }
73         channel.close() // immediately close channel
74         val res = Resource("OK")
75         assertFailsWith<ClosedSendChannelException> {
76             channel.send(res) // send fails to closed channel, resource was not delivered
77         }
78         assertTrue(res.isCancelled)
79     }
80 
81     private fun runAllKindsTest(test: suspend CoroutineScope.(TestChannelKind) -> Unit) {
82         for (kind in TestChannelKind.values()) {
83             if (kind.viaBroadcast) continue // does not support onUndeliveredElement
84             try {
85                 runTest {
86                     test(kind)
87                 }
88             } catch(e: Throwable) {
89                 error("$kind: $e", e)
90             }
91         }
92     }
93 
94     private class Resource(val value: String) {
95         private val _cancelled = atomic(false)
96 
97         val isCancelled: Boolean
98             get() = _cancelled.value
99 
100         fun cancel() {
101             check(!_cancelled.getAndSet(true)) { "Already cancelled" }
102         }
103     }
104 }