• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.selects.*
11 import kotlin.math.*
12 
13 /**
14  * Channel with array buffer of a fixed [capacity].
15  * Sender suspends only when buffer is full and receiver suspends only when buffer is empty.
16  *
17  * This channel is created by `Channel(capacity)` factory function invocation.
18  *
19  * This implementation uses lock to protect the buffer, which is held only during very short buffer-update operations.
20  * The lists of suspended senders or receivers are lock-free.
21  **/
22 internal open class ArrayChannel<E>(
23     /**
24      * Buffer capacity.
25      */
26     private val capacity: Int,
27     private val onBufferOverflow: BufferOverflow,
28     onUndeliveredElement: OnUndeliveredElement<E>?
29 ) : AbstractChannel<E>(onUndeliveredElement) {
30     init {
31         // This check is actually used by the Channel(...) constructor function which checks only for known
32         // capacities and calls ArrayChannel constructor for everything else.
<lambda>null33         require(capacity >= 1) { "ArrayChannel capacity must be at least 1, but $capacity was specified" }
34     }
35 
36     private val lock = ReentrantLock()
37 
38     /*
39      * Guarded by lock.
40      * Allocate minimum of capacity and 16 to avoid excess memory pressure for large channels when it's not necessary.
41      */
<lambda>null42     private var buffer: Array<Any?> = arrayOfNulls<Any?>(min(capacity, 8)).apply { fill(EMPTY) }
43 
44     private var head: Int = 0
45     private val size = atomic(0) // Invariant: size <= capacity
46 
47     protected final override val isBufferAlwaysEmpty: Boolean get() = false
48     protected final override val isBufferEmpty: Boolean get() = size.value == 0
49     protected final override val isBufferAlwaysFull: Boolean get() = false
50     protected final override val isBufferFull: Boolean get() = size.value == capacity && onBufferOverflow == BufferOverflow.SUSPEND
51 
<lambda>null52     override val isFull: Boolean get() = lock.withLock { isFullImpl }
<lambda>null53     override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
<lambda>null54     override val isClosedForReceive: Boolean get() = lock.withLock { super.isClosedForReceive }
55 
56     // result is `OFFER_SUCCESS | OFFER_FAILED | Closed`
offerInternalnull57     protected override fun offerInternal(element: E): Any {
58         var receive: ReceiveOrClosed<E>? = null
59         lock.withLock {
60             val size = this.size.value
61             closedForSend?.let { return it }
62             // update size before checking queue (!!!)
63             updateBufferSize(size)?.let { return it }
64             // check for receivers that were waiting on empty queue
65             if (size == 0) {
66                 loop@ while (true) {
67                     receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
68                     if (receive is Closed) {
69                         this.size.value = size // restore size
70                         return receive!!
71                     }
72                     val token = receive!!.tryResumeReceive(element, null)
73                     if (token != null) {
74                         assert { token === RESUME_TOKEN }
75                         this.size.value = size // restore size
76                         return@withLock
77                     }
78                 }
79             }
80             enqueueElement(size, element)
81             return OFFER_SUCCESS
82         }
83         // breaks here if offer meets receiver
84         receive!!.completeResumeReceive(element)
85         return receive!!.offerResult
86     }
87 
88     // result is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | Closed`
offerSelectInternalnull89     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
90         var receive: ReceiveOrClosed<E>? = null
91         lock.withLock {
92             val size = this.size.value
93             closedForSend?.let { return it }
94             // update size before checking queue (!!!)
95             updateBufferSize(size)?.let { return it }
96             // check for receivers that were waiting on empty queue
97             if (size == 0) {
98                 loop@ while (true) {
99                     val offerOp = describeTryOffer(element)
100                     val failure = select.performAtomicTrySelect(offerOp)
101                     when {
102                         failure == null -> { // offered successfully
103                             this.size.value = size // restore size
104                             receive = offerOp.result
105                             return@withLock
106                         }
107                         failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
108                         failure === RETRY_ATOMIC -> {} // retry
109                         failure === ALREADY_SELECTED || failure is Closed<*> -> {
110                             this.size.value = size // restore size
111                             return failure
112                         }
113                         else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
114                     }
115                 }
116             }
117             // let's try to select sending this element to buffer
118             if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
119                 this.size.value = size // restore size
120                 return ALREADY_SELECTED
121             }
122             enqueueElement(size, element)
123             return OFFER_SUCCESS
124         }
125         // breaks here if offer meets receiver
126         receive!!.completeResumeReceive(element)
127         return receive!!.offerResult
128     }
129 
<lambda>null130     override fun enqueueSend(send: Send): Any? = lock.withLock {
131         super.enqueueSend(send)
132     }
133 
134     // Guarded by lock
135     // Result is `OFFER_SUCCESS | OFFER_FAILED | null`
updateBufferSizenull136     private fun updateBufferSize(currentSize: Int): Symbol? {
137         if (currentSize < capacity) {
138             size.value = currentSize + 1 // tentatively put it into the buffer
139             return null // proceed
140         }
141         // buffer is full
142         return when (onBufferOverflow) {
143             BufferOverflow.SUSPEND -> OFFER_FAILED
144             BufferOverflow.DROP_LATEST -> OFFER_SUCCESS
145             BufferOverflow.DROP_OLDEST -> null // proceed, will drop oldest in enqueueElement
146         }
147     }
148 
149     // Guarded by lock
enqueueElementnull150     private fun enqueueElement(currentSize: Int, element: E) {
151         if (currentSize < capacity) {
152             ensureCapacity(currentSize)
153             buffer[(head + currentSize) % buffer.size] = element // actually queue element
154         } else {
155             // buffer is full
156             assert { onBufferOverflow == BufferOverflow.DROP_OLDEST } // the only way we can get here
157             buffer[head % buffer.size] = null // drop oldest element
158             buffer[(head + currentSize) % buffer.size] = element // actually queue element
159             head = (head + 1) % buffer.size
160         }
161     }
162 
163     // Guarded by lock
ensureCapacitynull164     private fun ensureCapacity(currentSize: Int) {
165         if (currentSize >= buffer.size) {
166             val newSize = min(buffer.size * 2, capacity)
167             val newBuffer = arrayOfNulls<Any?>(newSize)
168             for (i in 0 until currentSize) {
169                 newBuffer[i] = buffer[(head + i) % buffer.size]
170             }
171             newBuffer.fill(EMPTY, currentSize, newSize)
172             buffer = newBuffer
173             head = 0
174         }
175     }
176 
177     // result is `E | POLL_FAILED | Closed`
pollInternalnull178     protected override fun pollInternal(): Any? {
179         var send: Send? = null
180         var resumed = false
181         var result: Any? = null
182         lock.withLock {
183             val size = this.size.value
184             if (size == 0) return closedForSend ?: POLL_FAILED // when nothing can be read from buffer
185             // size > 0: not empty -- retrieve element
186             result = buffer[head]
187             buffer[head] = null
188             this.size.value = size - 1 // update size before checking queue (!!!)
189             // check for senders that were waiting on full queue
190             var replacement: Any? = POLL_FAILED
191             if (size == capacity) {
192                 loop@ while (true) {
193                     send = takeFirstSendOrPeekClosed() ?: break
194                     val token = send!!.tryResumeSend(null)
195                     if (token != null) {
196                         assert { token === RESUME_TOKEN }
197                         resumed = true
198                         replacement = send!!.pollResult
199                         break@loop
200                     }
201                     // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
202                     send!!.undeliveredElement()
203                 }
204             }
205             if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
206                 this.size.value = size // restore size
207                 buffer[(head + size) % buffer.size] = replacement
208             }
209             head = (head + 1) % buffer.size
210         }
211         // complete send the we're taken replacement from
212         if (resumed)
213             send!!.completeResumeSend()
214         return result
215     }
216 
217     // result is `ALREADY_SELECTED | E | POLL_FAILED | Closed`
pollSelectInternalnull218     protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
219         var send: Send? = null
220         var success = false
221         var result: Any? = null
222         lock.withLock {
223             val size = this.size.value
224             if (size == 0) return closedForSend ?: POLL_FAILED
225             // size > 0: not empty -- retrieve element
226             result = buffer[head]
227             buffer[head] = null
228             this.size.value = size - 1 // update size before checking queue (!!!)
229             // check for senders that were waiting on full queue
230             var replacement: Any? = POLL_FAILED
231             if (size == capacity) {
232                 loop@ while (true) {
233                     val pollOp = describeTryPoll()
234                     val failure = select.performAtomicTrySelect(pollOp)
235                     when {
236                         failure == null -> { // polled successfully
237                             send = pollOp.result
238                             success = true
239                             replacement = send!!.pollResult
240                             break@loop
241                         }
242                         failure === POLL_FAILED -> break@loop // cannot poll -> Ok to take from buffer
243                         failure === RETRY_ATOMIC -> {} // retry
244                         failure === ALREADY_SELECTED -> {
245                             this.size.value = size // restore size
246                             buffer[head] = result // restore head
247                             return failure
248                         }
249                         failure is Closed<*> -> {
250                             send = failure
251                             success = true
252                             replacement = failure
253                             break@loop
254                         }
255                         else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
256                     }
257                 }
258             }
259             if (replacement !== POLL_FAILED && replacement !is Closed<*>) {
260                 this.size.value = size // restore size
261                 buffer[(head + size) % buffer.size] = replacement
262             } else {
263                 // failed to poll or is already closed --> let's try to select receiving this element from buffer
264                 if (!select.trySelect()) { // :todo: move trySelect completion outside of lock
265                     this.size.value = size // restore size
266                     buffer[head] = result // restore head
267                     return ALREADY_SELECTED
268                 }
269             }
270             head = (head + 1) % buffer.size
271         }
272         // complete send the we're taken replacement from
273         if (success)
274             send!!.completeResumeSend()
275         return result
276     }
277 
<lambda>null278     override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
279         super.enqueueReceiveInternal(receive)
280     }
281 
282     // Note: this function is invoked when channel is already closed
onCancelIdempotentnull283     override fun onCancelIdempotent(wasClosed: Boolean) {
284         // clear buffer first, but do not wait for it in helpers
285         val onUndeliveredElement = onUndeliveredElement
286         var undeliveredElementException: UndeliveredElementException? = null // first cancel exception, others suppressed
287         lock.withLock {
288             repeat(size.value) {
289                 val value = buffer[head]
290                 if (onUndeliveredElement != null && value !== EMPTY) {
291                     @Suppress("UNCHECKED_CAST")
292                     undeliveredElementException = onUndeliveredElement.callUndeliveredElementCatchingException(value as E, undeliveredElementException)
293                 }
294                 buffer[head] = EMPTY
295                 head = (head + 1) % buffer.size
296             }
297             size.value = 0
298         }
299         // then clean all queued senders
300         super.onCancelIdempotent(wasClosed)
301         undeliveredElementException?.let { throw it } // throw cancel exception at the end if there was one
302     }
303 
304     // ------ debug ------
305 
306     override val bufferDebugString: String
307         get() = "(buffer:capacity=$capacity,size=${size.value})"
308 }
309