• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 import kotlin.native.concurrent.*
15 
16 /**
17  * Abstract send channel. It is a base class for all send channel implementations.
18  */
19 internal abstract class AbstractSendChannel<E>(
20     @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
21 ) : SendChannel<E> {
22     /** @suppress **This is unstable API and it is subject to change.** */
23     protected val queue = LockFreeLinkedListHead()
24 
25     // ------ extension points for buffered channels ------
26 
27     /**
28      * Returns `true` if [isBufferFull] is always `true`.
29      * @suppress **This is unstable API and it is subject to change.**
30      */
31     protected abstract val isBufferAlwaysFull: Boolean
32 
33     /**
34      * Returns `true` if this channel's buffer is full.
35      * This operation should be atomic if it is invoked by [enqueueSend].
36      * @suppress **This is unstable API and it is subject to change.**
37      */
38     protected abstract val isBufferFull: Boolean
39 
40     // State transitions: null -> handler -> HANDLER_INVOKED
41     private val onCloseHandler = atomic<Any?>(null)
42 
43     // ------ internal functions for override by buffered channels ------
44 
45     /**
46      * Tries to add element to buffer or to queued receiver.
47      * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
48      * @suppress **This is unstable API and it is subject to change.**
49      */
50     protected open fun offerInternal(element: E): Any {
51         while (true) {
52             val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
53             val token = receive.tryResumeReceive(element, null)
54             if (token != null) {
55                 assert { token === RESUME_TOKEN }
56                 receive.completeResumeReceive(element)
57                 return receive.offerResult
58             }
59         }
60     }
61 
62     /**
63      * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
64      * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
65      * @suppress **This is unstable API and it is subject to change.**
66      */
67     protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
68         // offer atomically with select
69         val offerOp = describeTryOffer(element)
70         val failure = select.performAtomicTrySelect(offerOp)
71         if (failure != null) return failure
72         val receive = offerOp.result
73         receive.completeResumeReceive(element)
74         return receive.offerResult
75     }
76 
77     // ------ state functions & helpers for concrete implementations ------
78 
79     /**
80      * Returns non-null closed token if it is last in the queue.
81      * @suppress **This is unstable API and it is subject to change.**
82      */
83     protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
84 
85     /**
86      * Returns non-null closed token if it is first in the queue.
87      * @suppress **This is unstable API and it is subject to change.**
88      */
89     protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
90 
91     /**
92      * Retrieves first sending waiter from the queue or returns closed token.
93      * @suppress **This is unstable API and it is subject to change.**
94      */
95     protected fun takeFirstSendOrPeekClosed(): Send? =
96         queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
97 
98     /**
99      * Queues buffered element, returns null on success or
100      * returns node reference if it was already closed or is waiting for receive.
101      * @suppress **This is unstable API and it is subject to change.**
102      */
103     protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
104         queue.addLastIfPrev(SendBuffered(element)) { prev ->
105             if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
106             true
107         }
108         return null
109     }
110 
111     /**
112      * @suppress **This is unstable API and it is subject to change.**
113      */
114     protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
115 
116     private open class SendBufferedDesc<E>(
117         queue: LockFreeLinkedListHead,
118         element: E
119     ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
120         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
121             is Closed<*> -> affected
122             is ReceiveOrClosed<*> -> OFFER_FAILED
123             else -> null
124         }
125     }
126 
127     // ------ SendChannel ------
128 
129     public final override val isClosedForSend: Boolean get() = closedForSend != null
130     public override val isFull: Boolean get() = isFullImpl
131     protected val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
132 
133     public final override suspend fun send(element: E) {
134         // fast path -- try offer non-blocking
135         if (offerInternal(element) === OFFER_SUCCESS) return
136         // slow-path does suspend or throws exception
137         return sendSuspend(element)
138     }
139 
140     public final override fun offer(element: E): Boolean {
141         val result = offerInternal(element)
142         return when {
143             result === OFFER_SUCCESS -> true
144             result === OFFER_FAILED -> {
145                 // We should check for closed token on offer as well, otherwise offer won't be linearizable
146                 // in the face of concurrent close()
147                 // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
148                 throw recoverStackTrace(helpCloseAndGetSendException(element, closedForSend ?: return false))
149             }
150             result is Closed<*> -> {
151                 throw recoverStackTrace(helpCloseAndGetSendException(element, result))
152             }
153             else -> error("offerInternal returned $result")
154         }
155     }
156 
157     private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
158         // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
159         // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
160         helpClose(closed)
161         // Element was not delivered -> cals onUndeliveredElement
162         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
163             // If it crashes, add send exception as suppressed for better diagnostics
164             it.addSuppressed(closed.sendException)
165             throw it
166         }
167         return closed.sendException
168     }
169 
170     private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
171         loop@ while (true) {
172             if (isFullImpl) {
173                 val send = if (onUndeliveredElement == null)
174                     SendElement(element, cont) else
175                     SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
176                 val enqueueResult = enqueueSend(send)
177                 when {
178                     enqueueResult == null -> { // enqueued successfully
179                         cont.removeOnCancellation(send)
180                         return@sc
181                     }
182                     enqueueResult is Closed<*> -> {
183                         cont.helpCloseAndResumeWithSendException(element, enqueueResult)
184                         return@sc
185                     }
186                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
187                     enqueueResult is Receive<*> -> {} // try to offer instead
188                     else -> error("enqueueSend returned $enqueueResult")
189                 }
190             }
191             // hm... receiver is waiting or buffer is not full. try to offer
192             val offerResult = offerInternal(element)
193             when {
194                 offerResult === OFFER_SUCCESS -> {
195                     cont.resume(Unit)
196                     return@sc
197                 }
198                 offerResult === OFFER_FAILED -> continue@loop
199                 offerResult is Closed<*> -> {
200                     cont.helpCloseAndResumeWithSendException(element, offerResult)
201                     return@sc
202                 }
203                 else -> error("offerInternal returned $offerResult")
204             }
205         }
206     }
207 
208     private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
209         helpClose(closed)
210         val sendException = closed.sendException
211         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
212             it.addSuppressed(sendException)
213             resumeWithException(it)
214             return
215         }
216         resumeWithException(sendException)
217     }
218 
219     /**
220      * Result is:
221      * * null -- successfully enqueued
222      * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
223      * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
224      */
225     protected open fun enqueueSend(send: Send): Any? {
226         if (isBufferAlwaysFull) {
227             queue.addLastIfPrev(send) { prev ->
228                 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
229                 true
230             }
231         } else {
232             if (!queue.addLastIfPrevAndIf(send, { prev ->
233                 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
234                 true
235             }, { isBufferFull }))
236                 return ENQUEUE_FAILED
237         }
238         return null
239     }
240 
241     public override fun close(cause: Throwable?): Boolean {
242         val closed = Closed<E>(cause)
243         /*
244          * Try to commit close by adding a close token to the end of the queue.
245          * Successful -> we're now responsible for closing receivers
246          * Not successful -> help closing pending receivers to maintain invariant
247          * "if (!close()) next send will throw"
248          */
249         val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
250         val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
251         helpClose(actuallyClosed)
252         if (closeAdded) invokeOnCloseHandler(cause)
253         return closeAdded // true if we have closed
254     }
255 
256     private fun invokeOnCloseHandler(cause: Throwable?) {
257         val handler = onCloseHandler.value
258         if (handler !== null && handler !== HANDLER_INVOKED
259             && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
260             // CAS failed -> concurrent invokeOnClose() invoked handler
261             @Suppress("UNCHECKED_CAST")
262             (handler as Handler)(cause)
263         }
264     }
265 
266     override fun invokeOnClose(handler: Handler) {
267         // Intricate dance for concurrent invokeOnClose and close calls
268         if (!onCloseHandler.compareAndSet(null, handler)) {
269             val value = onCloseHandler.value
270             if (value === HANDLER_INVOKED) {
271                 throw IllegalStateException("Another handler was already registered and successfully invoked")
272             }
273 
274             throw IllegalStateException("Another handler was already registered: $value")
275         } else {
276             val closedToken = closedForSend
277             if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
278                 // CAS failed -> close() call invoked handler
279                 (handler)(closedToken.closeCause)
280             }
281         }
282     }
283 
284     private fun helpClose(closed: Closed<*>) {
285         /*
286          * It's important to traverse list from right to left to avoid races with sender.
287          * Consider channel state: head -> [receive_1] -> [receive_2] -> head
288          * - T1 calls receive()
289          * - T2 calls close()
290          * - T3 calls close() + send(value)
291          *
292          * If both will traverse list from left to right, following non-linearizable history is possible:
293          * [close -> false], [send -> transferred 'value' to receiver]
294          *
295          * Another problem with linearizability of close is that we cannot resume closed receives until all
296          * receivers are removed from the list.
297          * Consider channel state: head -> [receive_1] -> [receive_2] -> head
298          * - T1 called receive_2, and will call send() when it's receive call resumes
299          * - T2 calls close()
300          *
301          * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
302          * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
303          */
304         var closedList = InlineList<Receive<E>>()
305         while (true) {
306             // Break when channel is empty or has no receivers
307             @Suppress("UNCHECKED_CAST")
308             val previous = closed.prevNode as? Receive<E> ?: break
309             if (!previous.remove()) {
310                 // failed to remove the node (due to race) -- retry finding non-removed prevNode
311                 // NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
312                 previous.helpRemove() // make sure remove is complete before continuing
313                 continue
314             }
315             // add removed nodes to a separate list
316             closedList += previous
317         }
318         /*
319          * Now notify all removed nodes that the channel was closed
320          * in the order they were added to the channel
321          */
322         closedList.forEachReversed { it.resumeReceiveClosed(closed) }
323         // and do other post-processing
324         onClosedIdempotent(closed)
325     }
326 
327     /**
328      * Invoked when channel is closed as the last action of [close] invocation.
329      * This method should be idempotent and can be called multiple times.
330      */
331     protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
332 
333     /**
334      * Retrieves first receiving waiter from the queue or returns closed token.
335      * @suppress **This is unstable API and it is subject to change.**
336      */
337     protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
338         queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
339 
340     // ------ registerSelectSend ------
341 
342     /**
343      * @suppress **This is unstable API and it is subject to change.**
344      */
345     protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
346 
347     /**
348      * @suppress **This is unstable API and it is subject to change.**
349      */
350     protected class TryOfferDesc<E>(
351         @JvmField val element: E,
352         queue: LockFreeLinkedListHead
353     ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
354         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
355             is Closed<*> -> affected
356             !is ReceiveOrClosed<*> -> OFFER_FAILED
357             else -> null
358         }
359 
360         @Suppress("UNCHECKED_CAST")
361         override fun onPrepare(prepareOp: PrepareOp): Any? {
362             val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
363             val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
364             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
365             assert { token === RESUME_TOKEN }
366             return null
367         }
368     }
369 
370     final override val onSend: SelectClause2<E, SendChannel<E>>
371         get() = object : SelectClause2<E, SendChannel<E>> {
372             override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
373                 registerSelectSend(select, param, block)
374             }
375         }
376 
377     private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
378         while (true) {
379             if (select.isSelected) return
380             if (isFullImpl) {
381                 val node = SendSelect(element, this, select, block)
382                 val enqueueResult = enqueueSend(node)
383                 when {
384                     enqueueResult == null -> { // enqueued successfully
385                         select.disposeOnSelect(node)
386                         return
387                     }
388                     enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
389                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer
390                     enqueueResult is Receive<*> -> {} // try to offer
391                     else -> error("enqueueSend returned $enqueueResult ")
392                 }
393             }
394             // hm... receiver is waiting or buffer is not full. try to offer
395             val offerResult = offerSelectInternal(element, select)
396             when {
397                 offerResult === ALREADY_SELECTED -> return
398                 offerResult === OFFER_FAILED -> {} // retry
399                 offerResult === RETRY_ATOMIC -> {} // retry
400                 offerResult === OFFER_SUCCESS -> {
401                     block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
402                     return
403                 }
404                 offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
405                 else -> error("offerSelectInternal returned $offerResult")
406             }
407         }
408     }
409 
410     // ------ debug ------
411 
412     public override fun toString() =
413         "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
414 
415     private val queueDebugStateString: String
416         get() {
417             val head = queue.nextNode
418             if (head === queue) return "EmptyQueue"
419             var result = when (head) {
420                 is Closed<*> -> head.toString()
421                 is Receive<*> -> "ReceiveQueued"
422                 is Send -> "SendQueued"
423                 else -> "UNEXPECTED:$head" // should not happen
424             }
425             val tail = queue.prevNode
426             if (tail !== head) {
427                 result += ",queueSize=${countQueueSize()}"
428                 if (tail is Closed<*>) result += ",closedForSend=$tail"
429             }
430             return result
431         }
432 
433     private fun countQueueSize(): Int {
434         var size = 0
435         queue.forEach<LockFreeLinkedListNode> { size++ }
436         return size
437     }
438 
439     protected open val bufferDebugString: String get() = ""
440 
441     // ------ private ------
442 
443     private class SendSelect<E, R>(
444         override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
445         @JvmField val channel: AbstractSendChannel<E>,
446         @JvmField val select: SelectInstance<R>,
447         @JvmField val block: suspend (SendChannel<E>) -> R
448     ) : Send(), DisposableHandle {
449         override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
450             select.trySelectOther(otherOp) as Symbol? // must return symbol
451 
452         override fun completeResumeSend() {
453             block.startCoroutineCancellable(receiver = channel, completion = select.completion)
454         }
455 
456         override fun dispose() { // invoked on select completion
457             if (!remove()) return
458             // if the node was successfully removed (meaning it was added but was not received) then element not delivered
459             undeliveredElement()
460         }
461 
462         override fun resumeSendClosed(closed: Closed<*>) {
463             if (select.trySelect())
464                 select.resumeSelectWithException(closed.sendException)
465         }
466 
467         override fun undeliveredElement() {
468             channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
469         }
470 
471         override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
472     }
473 
474     internal class SendBuffered<out E>(
475         @JvmField val element: E
476     ) : Send() {
477         override val pollResult: Any? get() = element
478         override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
479         override fun completeResumeSend() {}
480         override fun resumeSendClosed(closed: Closed<*>) {}
481         override fun toString(): String = "SendBuffered@$hexAddress($element)"
482     }
483 }
484 
485 /**
486  * Abstract send/receive channel. It is a base class for all channel implementations.
487  */
488 internal abstract class AbstractChannel<E>(
489     onUndeliveredElement: OnUndeliveredElement<E>?
490 ) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
491     // ------ extension points for buffered channels ------
492 
493     /**
494      * Returns `true` if [isBufferEmpty] is always `true`.
495      * @suppress **This is unstable API and it is subject to change.**
496      */
497     protected abstract val isBufferAlwaysEmpty: Boolean
498 
499     /**
500      * Returns `true` if this channel's buffer is empty.
501      * This operation should be atomic if it is invoked by [enqueueReceive].
502      * @suppress **This is unstable API and it is subject to change.**
503      */
504     protected abstract val isBufferEmpty: Boolean
505 
506     // ------ internal functions for override by buffered channels ------
507 
508     /**
509      * Tries to remove element from buffer or from queued sender.
510      * Return type is `E | POLL_FAILED | Closed`
511      * @suppress **This is unstable API and it is subject to change.**
512      */
pollInternalnull513     protected open fun pollInternal(): Any? {
514         while (true) {
515             val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
516             val token = send.tryResumeSend(null)
517             if (token != null) {
518                 assert { token === RESUME_TOKEN }
519                 send.completeResumeSend()
520                 return send.pollResult
521             }
522             // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
523             send.undeliveredElement()
524         }
525     }
526 
527     /**
528      * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
529      * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
530      * @suppress **This is unstable API and it is subject to change.**
531      */
pollSelectInternalnull532     protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
533         // poll atomically with select
534         val pollOp = describeTryPoll()
535         val failure = select.performAtomicTrySelect(pollOp)
536         if (failure != null) return failure
537         val send = pollOp.result
538         send.completeResumeSend()
539         return pollOp.result.pollResult
540     }
541 
542     // ------ state functions & helpers for concrete implementations ------
543 
544     /**
545      * @suppress **This is unstable API and it is subject to change.**
546      */
547     protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
548 
549     // ------ ReceiveChannel ------
550 
551     public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
552     public override val isEmpty: Boolean get() = isEmptyImpl
553     protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
554 
receivenull555     public final override suspend fun receive(): E {
556         // fast path -- try poll non-blocking
557         val result = pollInternal()
558         /*
559          * If result is Closed -- go to tail-call slow-path that will allow us to
560          * properly recover stacktrace without paying a performance cost on fast path.
561          * We prefer to recover stacktrace using suspending path to have a more precise stacktrace.
562          */
563         @Suppress("UNCHECKED_CAST")
564         if (result !== POLL_FAILED && result !is Closed<*>) return result as E
565         // slow-path does suspend
566         return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
567     }
568 
569     @Suppress("UNCHECKED_CAST")
receiveSuspendnull570     private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
571         val receive = if (onUndeliveredElement == null)
572             ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
573             ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
574         while (true) {
575             if (enqueueReceive(receive)) {
576                 removeReceiveOnCancel(cont, receive)
577                 return@sc
578             }
579             // hm... something is not right. try to poll
580             val result = pollInternal()
581             if (result is Closed<*>) {
582                 receive.resumeReceiveClosed(result)
583                 return@sc
584             }
585             if (result !== POLL_FAILED) {
586                 cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
587                 return@sc
588             }
589         }
590     }
591 
enqueueReceiveInternalnull592     protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
593         queue.addLastIfPrev(receive) { it !is Send } else
<lambda>null594         queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
595 
resultnull596     private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
597         if (result) onReceiveEnqueued()
598     }
599 
receiveOrNullnull600     public final override suspend fun receiveOrNull(): E? {
601         // fast path -- try poll non-blocking
602         val result = pollInternal()
603         @Suppress("UNCHECKED_CAST")
604         if (result !== POLL_FAILED && result !is Closed<*>) return result as E
605         // slow-path does suspend
606         return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
607     }
608 
609     @Suppress("UNCHECKED_CAST")
receiveOrNullResultnull610     private fun receiveOrNullResult(result: Any?): E? {
611         if (result is Closed<*>) {
612             if (result.closeCause != null) throw recoverStackTrace(result.closeCause)
613             return null
614         }
615         return result as E
616     }
617 
618     @Suppress("UNCHECKED_CAST")
receiveOrClosednull619     public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
620         // fast path -- try poll non-blocking
621         val result = pollInternal()
622         if (result !== POLL_FAILED) return result.toResult()
623         // slow-path does suspend
624         return receiveSuspend(RECEIVE_RESULT)
625     }
626 
627     @Suppress("UNCHECKED_CAST")
pollnull628     public final override fun poll(): E? {
629         val result = pollInternal()
630         return if (result === POLL_FAILED) null else receiveOrNullResult(result)
631     }
632 
633     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
cancelnull634     final override fun cancel(cause: Throwable?): Boolean =
635         cancelInternal(cause)
636 
637     final override fun cancel(cause: CancellationException?) {
638         cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
639     }
640 
641     // It needs to be internal to support deprecated cancel(Throwable?) API
cancelInternalnull642     internal fun cancelInternal(cause: Throwable?): Boolean =
643         close(cause).also {
644             onCancelIdempotent(it)
645         }
646 
647     /**
648      * Method that is invoked right after [close] in [cancel] sequence.
649      * [wasClosed] is directly mapped to the value returned by [close].
650      */
onCancelIdempotentnull651     protected open fun onCancelIdempotent(wasClosed: Boolean) {
652         /*
653          * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
654          * has the same rationale.
655          */
656         val closed = closedForSend ?: error("Cannot happen")
657         var list = InlineList<Send>()
658         while (true) {
659             val previous = closed.prevNode
660             if (previous is LockFreeLinkedListHead) {
661                 break
662             }
663             assert { previous is Send }
664             if (!previous.remove()) {
665                 previous.helpRemove() // make sure remove is complete before continuing
666                 continue
667             }
668             // Add to the list only **after** successful removal
669             list += previous as Send
670         }
671         list.forEachReversed { it.resumeSendClosed(closed) }
672     }
673 
iteratornull674     public final override fun iterator(): ChannelIterator<E> = Itr(this)
675 
676     // ------ registerSelectReceive ------
677 
678     /**
679      * @suppress **This is unstable API and it is subject to change.**
680      */
681     protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
682 
683     /**
684      * @suppress **This is unstable API and it is subject to change.**
685      */
686     protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
687         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
688             is Closed<*> -> affected
689             !is Send -> POLL_FAILED
690             else -> null
691         }
692 
693         @Suppress("UNCHECKED_CAST")
694         override fun onPrepare(prepareOp: PrepareOp): Any? {
695             val affected = prepareOp.affected as Send // see "failure" impl
696             val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
697             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
698             assert { token === RESUME_TOKEN }
699             return null
700         }
701 
702         override fun onRemoved(affected: LockFreeLinkedListNode) {
703             // Called when we removed it from the queue but were too late to resume, so we have undelivered element
704             (affected as Send).undeliveredElement()
705         }
706     }
707 
708     final override val onReceive: SelectClause1<E>
709         get() = object : SelectClause1<E> {
710             @Suppress("UNCHECKED_CAST")
registerSelectClause1null711             override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
712                 registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
713             }
714         }
715 
716     final override val onReceiveOrNull: SelectClause1<E?>
717         get() = object : SelectClause1<E?> {
718             @Suppress("UNCHECKED_CAST")
registerSelectClause1null719             override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E?) -> R) {
720                 registerSelectReceiveMode(select, RECEIVE_NULL_ON_CLOSE, block as suspend (Any?) -> R)
721             }
722         }
723 
724     final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
725         get() = object : SelectClause1<ValueOrClosed<E>> {
726             @Suppress("UNCHECKED_CAST")
registerSelectClause1null727             override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
728                 registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
729             }
730         }
731 
registerSelectReceiveModenull732     private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
733         while (true) {
734             if (select.isSelected) return
735             if (isEmptyImpl) {
736                 if (enqueueReceiveSelect(select, block, receiveMode)) return
737             } else {
738                 val pollResult = pollSelectInternal(select)
739                 when {
740                     pollResult === ALREADY_SELECTED -> return
741                     pollResult === POLL_FAILED -> {} // retry
742                     pollResult === RETRY_ATOMIC -> {} // retry
743                     else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
744                 }
745             }
746         }
747     }
748 
tryStartBlockUninterceptednull749     private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {
750         when (value) {
751             is Closed<*> -> {
752                 when (receiveMode) {
753                     RECEIVE_THROWS_ON_CLOSE -> {
754                         throw recoverStackTrace(value.receiveException)
755                     }
756                     RECEIVE_RESULT -> {
757                         if (!select.trySelect()) return
758                         startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
759                     }
760                     RECEIVE_NULL_ON_CLOSE -> {
761                         if (value.closeCause == null) {
762                             if (!select.trySelect()) return
763                             startCoroutineUnintercepted(null, select.completion)
764                         } else {
765                             throw recoverStackTrace(value.receiveException)
766                         }
767                     }
768                 }
769             }
770             else -> {
771                 if (receiveMode == RECEIVE_RESULT) {
772                     startCoroutineUnintercepted(value.toResult<Any>(), select.completion)
773                 } else {
774                     startCoroutineUnintercepted(value, select.completion)
775                 }
776             }
777         }
778     }
779 
enqueueReceiveSelectnull780     private fun <R> enqueueReceiveSelect(
781         select: SelectInstance<R>,
782         block: suspend (Any?) -> R,
783         receiveMode: Int
784     ): Boolean {
785         val node = ReceiveSelect(this, select, block, receiveMode)
786         val result = enqueueReceive(node)
787         if (result) select.disposeOnSelect(node)
788         return result
789     }
790 
791     // ------ protected ------
792 
takeFirstReceiveOrPeekClosednull793     override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
794         super.takeFirstReceiveOrPeekClosed().also {
795             if (it != null && it !is Closed<*>) onReceiveDequeued()
796         }
797 
798     /**
799      * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
800      * @suppress **This is unstable API and it is subject to change.**
801      */
onReceiveEnqueuednull802     protected open fun onReceiveEnqueued() {}
803 
804     /**
805      * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
806      * @suppress **This is unstable API and it is subject to change.**
807      */
onReceiveDequeuednull808     protected open fun onReceiveDequeued() {}
809 
810     // ------ private ------
811 
removeReceiveOnCancelnull812     private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
813         cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
814 
815     private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
816         override fun invoke(cause: Throwable?) {
817             if (receive.remove())
818                 onReceiveDequeued()
819         }
820         override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
821     }
822 
823     private class Itr<E>(@JvmField val channel: AbstractChannel<E>) : ChannelIterator<E> {
824         var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
825 
hasNextnull826         override suspend fun hasNext(): Boolean {
827             // check for repeated hasNext
828             if (result !== POLL_FAILED) return hasNextResult(result)
829             // fast path -- try poll non-blocking
830             result = channel.pollInternal()
831             if (result !== POLL_FAILED) return hasNextResult(result)
832             // slow-path does suspend
833             return hasNextSuspend()
834         }
835 
hasNextResultnull836         private fun hasNextResult(result: Any?): Boolean {
837             if (result is Closed<*>) {
838                 if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
839                 return false
840             }
841             return true
842         }
843 
hasNextSuspendnull844         private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
845             val receive = ReceiveHasNext(this, cont)
846             while (true) {
847                 if (channel.enqueueReceive(receive)) {
848                     channel.removeReceiveOnCancel(cont, receive)
849                     return@sc
850                 }
851                 // hm... something is not right. try to poll
852                 val result = channel.pollInternal()
853                 this.result = result
854                 if (result is Closed<*>) {
855                     if (result.closeCause == null)
856                         cont.resume(false)
857                     else
858                         cont.resumeWithException(result.receiveException)
859                     return@sc
860                 }
861                 if (result !== POLL_FAILED) {
862                     @Suppress("UNCHECKED_CAST")
863                     cont.resume(true, channel.onUndeliveredElement?.bindCancellationFun(result as E, cont.context))
864                     return@sc
865                 }
866             }
867         }
868 
869         @Suppress("UNCHECKED_CAST")
nextnull870         override fun next(): E {
871             val result = this.result
872             if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
873             if (result !== POLL_FAILED) {
874                 this.result = POLL_FAILED
875                 return result as E
876             }
877 
878             throw IllegalStateException("'hasNext' should be called prior to 'next' invocation")
879         }
880     }
881 
882     private open class ReceiveElement<in E>(
883         @JvmField val cont: CancellableContinuation<Any?>,
884         @JvmField val receiveMode: Int
885     ) : Receive<E>() {
resumeValuenull886         fun resumeValue(value: E): Any? = when (receiveMode) {
887             RECEIVE_RESULT -> ValueOrClosed.value(value)
888             else -> value
889         }
890 
tryResumeReceivenull891         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
892             val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
893             assert { token === RESUME_TOKEN } // the only other possible result
894             // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
895             otherOp?.finishPrepare()
896             return RESUME_TOKEN
897         }
898 
completeResumeReceivenull899         override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
900 
901         override fun resumeReceiveClosed(closed: Closed<*>) {
902             when {
903                 receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null)
904                 receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
905                 else -> cont.resumeWithException(closed.receiveException)
906             }
907         }
toStringnull908         override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
909     }
910 
911     private class ReceiveElementWithUndeliveredHandler<in E>(
912         cont: CancellableContinuation<Any?>,
913         receiveMode: Int,
914         @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
915     ) : ReceiveElement<E>(cont, receiveMode) {
916         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
917             onUndeliveredElement.bindCancellationFun(value, cont.context)
918     }
919 
920     private open class ReceiveHasNext<E>(
921         @JvmField val iterator: Itr<E>,
922         @JvmField val cont: CancellableContinuation<Boolean>
923     ) : Receive<E>() {
tryResumeReceivenull924         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
925             val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
926                 ?: return null
927             assert { token === RESUME_TOKEN } // the only other possible result
928             // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
929             otherOp?.finishPrepare()
930             return RESUME_TOKEN
931         }
932 
completeResumeReceivenull933         override fun completeResumeReceive(value: E) {
934             /*
935                When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
936                but completeResumeReceive is called once so we set iterator result here.
937              */
938             iterator.result = value
939             cont.completeResume(RESUME_TOKEN)
940         }
941 
resumeReceiveClosednull942         override fun resumeReceiveClosed(closed: Closed<*>) {
943             val token = if (closed.closeCause == null) {
944                 cont.tryResume(false)
945             } else {
946                 cont.tryResumeWithException(closed.receiveException)
947             }
948             if (token != null) {
949                 iterator.result = closed
950                 cont.completeResume(token)
951             }
952         }
953 
resumeOnCancellationFunnull954         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
955             iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
956 
957         override fun toString(): String = "ReceiveHasNext@$hexAddress"
958     }
959 
960     private class ReceiveSelect<R, E>(
961         @JvmField val channel: AbstractChannel<E>,
962         @JvmField val select: SelectInstance<R>,
963         @JvmField val block: suspend (Any?) -> R,
964         @JvmField val receiveMode: Int
965     ) : Receive<E>(), DisposableHandle {
966         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
967             select.trySelectOther(otherOp) as Symbol?
968 
969         @Suppress("UNCHECKED_CAST")
970         override fun completeResumeReceive(value: E) {
971             block.startCoroutineCancellable(
972                 if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
973                 select.completion,
974                 resumeOnCancellationFun(value)
975             )
976         }
977 
978         override fun resumeReceiveClosed(closed: Closed<*>) {
979             if (!select.trySelect()) return
980             when (receiveMode) {
981                 RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
982                 RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
983                 RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
984                     block.startCoroutineCancellable(null, select.completion)
985                 } else {
986                     select.resumeSelectWithException(closed.receiveException)
987                 }
988             }
989         }
990 
991         override fun dispose() { // invoked on select completion
992             if (remove())
993                 channel.onReceiveDequeued() // notify cancellation of receive
994         }
995 
996         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
997             channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
998 
999         override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
1000     }
1001 }
1002 
1003 // receiveMode values
1004 internal const val RECEIVE_THROWS_ON_CLOSE = 0
1005 internal const val RECEIVE_NULL_ON_CLOSE = 1
1006 internal const val RECEIVE_RESULT = 2
1007 
1008 @JvmField
1009 @SharedImmutable
1010 internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
1011 
1012 @JvmField
1013 @SharedImmutable
1014 internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
1015 
1016 @JvmField
1017 @SharedImmutable
1018 internal val OFFER_FAILED = Symbol("OFFER_FAILED")
1019 
1020 @JvmField
1021 @SharedImmutable
1022 internal val POLL_FAILED = Symbol("POLL_FAILED")
1023 
1024 @JvmField
1025 @SharedImmutable
1026 internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
1027 
1028 @JvmField
1029 @SharedImmutable
1030 internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
1031 
1032 internal typealias Handler = (Throwable?) -> Unit
1033 
1034 /**
1035  * Represents sending waiter in the queue.
1036  */
1037 internal abstract class Send : LockFreeLinkedListNode() {
1038     abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
1039     // Returns: null - failure,
1040     //          RETRY_ATOMIC for retry (only when otherOp != null),
1041     //          RESUME_TOKEN on success (call completeResumeSend)
1042     // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeSendnull1043     abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
1044     abstract fun completeResumeSend()
1045     abstract fun resumeSendClosed(closed: Closed<*>)
1046     open fun undeliveredElement() {}
1047 }
1048 
1049 /**
1050  * Represents receiver waiter in the queue or closed token.
1051  */
1052 internal interface ReceiveOrClosed<in E> {
1053     val offerResult: Any // OFFER_SUCCESS | Closed
1054     // Returns: null - failure,
1055     //          RETRY_ATOMIC for retry (only when otherOp != null),
1056     //          RESUME_TOKEN on success (call completeResumeReceive)
1057     // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeReceivenull1058     fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
1059     fun completeResumeReceive(value: E)
1060 }
1061 
1062 /**
1063  * Represents sender for a specific element.
1064  */
1065 internal open class SendElement<E>(
1066     override val pollResult: E,
1067     @JvmField val cont: CancellableContinuation<Unit>
1068 ) : Send() {
1069     override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1070         val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1071         assert { token === RESUME_TOKEN } // the only other possible result
1072         // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1073         otherOp?.finishPrepare() // finish preparations
1074         return RESUME_TOKEN
1075     }
1076 
1077     override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1078     override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1079     override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
1080 }
1081 
1082 internal class SendElementWithUndeliveredHandler<E>(
1083     pollResult: E,
1084     cont: CancellableContinuation<Unit>,
1085     @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
1086 ) : SendElement<E>(pollResult, cont) {
removenull1087     override fun remove(): Boolean {
1088         if (!super.remove()) return false
1089         // if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
1090         undeliveredElement()
1091         return true
1092     }
1093 
undeliveredElementnull1094     override fun undeliveredElement() {
1095         onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
1096     }
1097 }
1098 
1099 /**
1100  * Represents closed channel.
1101  */
1102 internal class Closed<in E>(
1103     @JvmField val closeCause: Throwable?
1104 ) : Send(), ReceiveOrClosed<E> {
1105     val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1106     val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1107 
1108     override val offerResult get() = this
1109     override val pollResult get() = this
tryResumeSendnull1110     override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeSendnull1111     override fun completeResumeSend() {}
<lambda>null1112     override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeReceivenull1113     override fun completeResumeReceive(value: E) {}
resumeSendClosednull1114     override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
toStringnull1115     override fun toString(): String = "Closed@$hexAddress[$closeCause]"
1116 }
1117 
1118 internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1119     override val offerResult get() = OFFER_SUCCESS
1120     abstract fun resumeReceiveClosed(closed: Closed<*>)
1121     open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
1122 }
1123 
1124 @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
toResultnull1125 private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
1126     if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
1127 
1128 @Suppress("NOTHING_TO_INLINE")
1129 private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
1130