• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.test.*
10 
11 class ChannelUndeliveredElementTest : TestBase() {
12     @Test
13     fun testSendSuccessfully() = runTest {
14         runAllKindsTest { kind ->
15             val channel = kind.create<Resource> { it.cancel() }
16             val res = Resource("OK")
17             launch {
18                 channel.send(res)
19             }
20             val ok = channel.receive()
21             assertEquals("OK", ok.value)
22             assertFalse(res.isCancelled) // was not cancelled
23             channel.close()
24             assertFalse(res.isCancelled) // still was not cancelled
25         }
26     }
27 
28     @Test
29     fun testRendezvousSendCancelled() = runTest {
30         val channel = Channel<Resource> { it.cancel() }
31         val res = Resource("OK")
32         val sender = launch(start = CoroutineStart.UNDISPATCHED) {
33             assertFailsWith<CancellationException> {
34                 channel.send(res) // suspends & get cancelled
35             }
36         }
37         sender.cancelAndJoin()
38         assertTrue(res.isCancelled)
39     }
40 
41     @Test
42     fun testBufferedSendCancelled() = runTest {
43         val channel = Channel<Resource>(1) { it.cancel() }
44         val resA = Resource("A")
45         val resB = Resource("B")
46         val sender = launch(start = CoroutineStart.UNDISPATCHED) {
47             channel.send(resA) // goes to buffer
48             assertFailsWith<CancellationException> {
49                 channel.send(resB) // suspends & get cancelled
50             }
51         }
52         sender.cancelAndJoin()
53         assertFalse(resA.isCancelled) // it is in buffer, not cancelled
54         assertTrue(resB.isCancelled) // send was cancelled
55         channel.cancel() // now cancel the channel
56         assertTrue(resA.isCancelled) // now cancelled in buffer
57     }
58 
59     @Test
60     fun testUnlimitedChannelCancelled() = runTest {
61         val channel = Channel<Resource>(Channel.UNLIMITED) { it.cancel() }
62         val resA = Resource("A")
63         val resB = Resource("B")
64         channel.send(resA) // goes to buffer
65         channel.send(resB) // goes to buffer
66         assertFalse(resA.isCancelled) // it is in buffer, not cancelled
67         assertFalse(resB.isCancelled) //  it is in buffer, not cancelled
68         channel.cancel() // now cancel the channel
69         assertTrue(resA.isCancelled) // now cancelled in buffer
70         assertTrue(resB.isCancelled) // now cancelled in buffer
71     }
72 
73     @Test
74     fun testConflatedResourceCancelled() = runTest {
75         val channel = Channel<Resource>(Channel.CONFLATED) { it.cancel() }
76         val resA = Resource("A")
77         val resB = Resource("B")
78         channel.send(resA)
79         assertFalse(resA.isCancelled)
80         assertFalse(resB.isCancelled)
81         channel.send(resB)
82         assertTrue(resA.isCancelled) // it was conflated (lost) and thus cancelled
83         assertFalse(resB.isCancelled)
84         channel.close()
85         assertFalse(resB.isCancelled) // not cancelled yet, can be still read by receiver
86         channel.cancel()
87         assertTrue(resB.isCancelled) // now it is cancelled
88     }
89 
90     @Test
91     fun testSendToClosedChannel() = runTest {
92         runAllKindsTest { kind ->
93             val channel = kind.create<Resource> { it.cancel() }
94             channel.close() // immediately close channel
95             val res = Resource("OK")
96             assertFailsWith<ClosedSendChannelException> {
97                 channel.send(res) // send fails to closed channel, resource was not delivered
98             }
99             assertTrue(res.isCancelled)
100         }
101     }
102 
103     private suspend fun runAllKindsTest(test: suspend CoroutineScope.(TestChannelKind) -> Unit) {
104         for (kind in TestChannelKind.values()) {
105             if (kind.viaBroadcast) continue // does not support onUndeliveredElement
106             try {
107                 withContext(Job()) {
108                     test(kind)
109                 }
110             } catch(e: Throwable) {
111                 error("$kind: $e", e)
112             }
113         }
114     }
115 
116     private class Resource(val value: String) {
117         private val _cancelled = atomic(false)
118 
119         val isCancelled: Boolean
120             get() = _cancelled.value
121 
122         fun cancel() {
123             check(!_cancelled.getAndSet(true)) { "Already cancelled" }
124         }
125     }
126 
127     @Test
128     fun testHandlerIsNotInvoked() = runTest { // #2826
129         val channel = Channel<Unit> {
130             expectUnreached()
131         }
132 
133         expect(1)
134         launch {
135             expect(2)
136             channel.receive()
137         }
138         channel.send(Unit)
139         finish(3)
140     }
141 
142     @Test
143     fun testChannelBufferOverflow() = runTest {
144         testBufferOverflowStrategy(listOf(1, 2), BufferOverflow.DROP_OLDEST)
145         testBufferOverflowStrategy(listOf(3), BufferOverflow.DROP_LATEST)
146     }
147 
148     private suspend fun testBufferOverflowStrategy(expectedDroppedElements: List<Int>, strategy: BufferOverflow) {
149         val list = ArrayList<Int>()
150         val channel = Channel<Int>(
151             capacity = 2,
152             onBufferOverflow = strategy,
153             onUndeliveredElement = { value -> list.add(value) }
154         )
155 
156         channel.send(1)
157         channel.send(2)
158 
159         channel.send(3)
160         channel.trySend(4).onFailure { expectUnreached() }
161         assertEquals(expectedDroppedElements, list)
162     }
163 
164 
165     @Test
166     fun testTrySendDoesNotInvokeHandlerOnClosedConflatedChannel() = runTest {
167         val conflated = Channel<Int>(Channel.CONFLATED, onUndeliveredElement = {
168             expectUnreached()
169         })
170         conflated.close(IndexOutOfBoundsException())
171         conflated.trySend(3)
172     }
173 
174     @Test
175     fun testTrySendDoesNotInvokeHandlerOnClosedChannel() = runTest {
176         val conflated = Channel<Int>(3, onUndeliveredElement = {
177             expectUnreached()
178         })
179         conflated.close(IndexOutOfBoundsException())
180         repeat(10) {
181             conflated.trySend(3)
182         }
183     }
184 
185     @Test
186     fun testTrySendDoesNotInvokeHandler() {
187         for (capacity in 0..2) {
188             testTrySendDoesNotInvokeHandler(capacity)
189         }
190     }
191 
192     private fun testTrySendDoesNotInvokeHandler(capacity: Int) {
193         val channel = Channel<Int>(capacity, BufferOverflow.DROP_LATEST, onUndeliveredElement = {
194             expectUnreached()
195         })
196         repeat(10) {
197             channel.trySend(3)
198         }
199     }
200 }
201