• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import kotlinx.coroutines.intrinsics.*
11 import kotlinx.coroutines.selects.*
12 import kotlin.coroutines.*
13 import kotlin.jvm.*
14 import kotlin.native.concurrent.*
15 
16 /**
17  * Abstract send channel. It is a base class for all send channel implementations.
18  */
19 internal abstract class AbstractSendChannel<E>(
20     @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
21 ) : SendChannel<E> {
22     /** @suppress **This is unstable API and it is subject to change.** */
23     protected val queue = LockFreeLinkedListHead()
24 
25     // ------ extension points for buffered channels ------
26 
27     /**
28      * Returns `true` if [isBufferFull] is always `true`.
29      * @suppress **This is unstable API and it is subject to change.**
30      */
31     protected abstract val isBufferAlwaysFull: Boolean
32 
33     /**
34      * Returns `true` if this channel's buffer is full.
35      * This operation should be atomic if it is invoked by [enqueueSend].
36      * @suppress **This is unstable API and it is subject to change.**
37      */
38     protected abstract val isBufferFull: Boolean
39 
40     // State transitions: null -> handler -> HANDLER_INVOKED
41     private val onCloseHandler = atomic<Any?>(null)
42 
43     // ------ internal functions for override by buffered channels ------
44 
45     /**
46      * Tries to add element to buffer or to queued receiver.
47      * Return type is `OFFER_SUCCESS | OFFER_FAILED | Closed`.
48      * @suppress **This is unstable API and it is subject to change.**
49      */
50     protected open fun offerInternal(element: E): Any {
51         while (true) {
52             val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED
53             val token = receive.tryResumeReceive(element, null)
54             if (token != null) {
55                 assert { token === RESUME_TOKEN }
56                 receive.completeResumeReceive(element)
57                 return receive.offerResult
58             }
59         }
60     }
61 
62     /**
63      * Tries to add element to buffer or to queued receiver if select statement clause was not selected yet.
64      * Return type is `ALREADY_SELECTED | OFFER_SUCCESS | OFFER_FAILED | RETRY_ATOMIC | Closed`.
65      * @suppress **This is unstable API and it is subject to change.**
66      */
67     protected open fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
68         // offer atomically with select
69         val offerOp = describeTryOffer(element)
70         val failure = select.performAtomicTrySelect(offerOp)
71         if (failure != null) return failure
72         val receive = offerOp.result
73         receive.completeResumeReceive(element)
74         return receive.offerResult
75     }
76 
77     // ------ state functions & helpers for concrete implementations ------
78 
79     /**
80      * Returns non-null closed token if it is last in the queue.
81      * @suppress **This is unstable API and it is subject to change.**
82      */
83     protected val closedForSend: Closed<*>? get() = (queue.prevNode as? Closed<*>)?.also { helpClose(it) }
84 
85     /**
86      * Returns non-null closed token if it is first in the queue.
87      * @suppress **This is unstable API and it is subject to change.**
88      */
89     protected val closedForReceive: Closed<*>? get() = (queue.nextNode as? Closed<*>)?.also { helpClose(it) }
90 
91     /**
92      * Retrieves first sending waiter from the queue or returns closed token.
93      * @suppress **This is unstable API and it is subject to change.**
94      */
95     protected fun takeFirstSendOrPeekClosed(): Send? =
96         queue.removeFirstIfIsInstanceOfOrPeekIf<Send> { it is Closed<*> }
97 
98     /**
99      * Queues buffered element, returns null on success or
100      * returns node reference if it was already closed or is waiting for receive.
101      * @suppress **This is unstable API and it is subject to change.**
102      */
103     protected fun sendBuffered(element: E): ReceiveOrClosed<*>? {
104         queue.addLastIfPrev(SendBuffered(element)) { prev ->
105             if (prev is ReceiveOrClosed<*>) return@sendBuffered prev
106             true
107         }
108         return null
109     }
110 
111     /**
112      * @suppress **This is unstable API and it is subject to change.**
113      */
114     protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
115 
116     private open class SendBufferedDesc<E>(
117         queue: LockFreeLinkedListHead,
118         element: E
119     ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
120         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
121             is Closed<*> -> affected
122             is ReceiveOrClosed<*> -> OFFER_FAILED
123             else -> null
124         }
125     }
126 
127     // ------ SendChannel ------
128 
129     public final override val isClosedForSend: Boolean get() = closedForSend != null
130     private val isFullImpl: Boolean get() = queue.nextNode !is ReceiveOrClosed<*> && isBufferFull
131 
132     public final override suspend fun send(element: E) {
133         // fast path -- try offer non-blocking
134         if (offerInternal(element) === OFFER_SUCCESS) return
135         // slow-path does suspend or throws exception
136         return sendSuspend(element)
137     }
138 
139     @Suppress("DEPRECATION", "DEPRECATION_ERROR")
140     override fun offer(element: E): Boolean {
141         // Temporary migration for offer users who rely on onUndeliveredElement
142         try {
143             return super.offer(element)
144         } catch (e: Throwable) {
145             onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
146                 // If it crashes, add send exception as suppressed for better diagnostics
147                 it.addSuppressed(e)
148                 throw it
149             }
150             throw e
151         }
152     }
153 
154     public final override fun trySend(element: E): ChannelResult<Unit> {
155         val result = offerInternal(element)
156         return when {
157             result === OFFER_SUCCESS -> ChannelResult.success(Unit)
158             result === OFFER_FAILED -> {
159                 // We should check for closed token on trySend as well, otherwise trySend won't be linearizable
160                 // in the face of concurrent close()
161                 // See https://github.com/Kotlin/kotlinx.coroutines/issues/359
162                 val closedForSend = closedForSend ?: return ChannelResult.failure()
163                 ChannelResult.closed(helpCloseAndGetSendException(closedForSend))
164             }
165             result is Closed<*> -> {
166                 ChannelResult.closed(helpCloseAndGetSendException(result))
167             }
168             else -> error("trySend returned $result")
169         }
170     }
171 
172     private fun helpCloseAndGetSendException(closed: Closed<*>): Throwable {
173         helpClose(closed)
174         return closed.sendException
175     }
176 
177     private fun helpCloseAndGetSendException(element: E, closed: Closed<*>): Throwable {
178         // To ensure linearizablity we must ALWAYS help close the channel when we observe that it was closed
179         // See https://github.com/Kotlin/kotlinx.coroutines/issues/1419
180         helpClose(closed)
181         // Element was not delivered -> cals onUndeliveredElement
182         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
183             // If it crashes, add send exception as suppressed for better diagnostics
184             it.addSuppressed(closed.sendException)
185             throw it
186         }
187         return closed.sendException
188     }
189 
190     private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
191         loop@ while (true) {
192             if (isFullImpl) {
193                 val send = if (onUndeliveredElement == null)
194                     SendElement(element, cont) else
195                     SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
196                 val enqueueResult = enqueueSend(send)
197                 when {
198                     enqueueResult == null -> { // enqueued successfully
199                         cont.removeOnCancellation(send)
200                         return@sc
201                     }
202                     enqueueResult is Closed<*> -> {
203                         cont.helpCloseAndResumeWithSendException(element, enqueueResult)
204                         return@sc
205                     }
206                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead
207                     enqueueResult is Receive<*> -> {} // try to offer instead
208                     else -> error("enqueueSend returned $enqueueResult")
209                 }
210             }
211             // hm... receiver is waiting or buffer is not full. try to offer
212             val offerResult = offerInternal(element)
213             when {
214                 offerResult === OFFER_SUCCESS -> {
215                     cont.resume(Unit)
216                     return@sc
217                 }
218                 offerResult === OFFER_FAILED -> continue@loop
219                 offerResult is Closed<*> -> {
220                     cont.helpCloseAndResumeWithSendException(element, offerResult)
221                     return@sc
222                 }
223                 else -> error("offerInternal returned $offerResult")
224             }
225         }
226     }
227 
228     private fun Continuation<*>.helpCloseAndResumeWithSendException(element: E, closed: Closed<*>) {
229         helpClose(closed)
230         val sendException = closed.sendException
231         onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
232             it.addSuppressed(sendException)
233             resumeWithException(it)
234             return
235         }
236         resumeWithException(sendException)
237     }
238 
239     /**
240      * Result is:
241      * * null -- successfully enqueued
242      * * ENQUEUE_FAILED -- buffer is not full (should not enqueue)
243      * * ReceiveOrClosed<*> -- receiver is waiting or it is closed (should not enqueue)
244      */
245     protected open fun enqueueSend(send: Send): Any? {
246         if (isBufferAlwaysFull) {
247             queue.addLastIfPrev(send) { prev ->
248                 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
249                 true
250             }
251         } else {
252             if (!queue.addLastIfPrevAndIf(send, { prev ->
253                 if (prev is ReceiveOrClosed<*>) return@enqueueSend prev
254                 true
255             }, { isBufferFull }))
256                 return ENQUEUE_FAILED
257         }
258         return null
259     }
260 
261     public override fun close(cause: Throwable?): Boolean {
262         val closed = Closed<E>(cause)
263         /*
264          * Try to commit close by adding a close token to the end of the queue.
265          * Successful -> we're now responsible for closing receivers
266          * Not successful -> help closing pending receivers to maintain invariant
267          * "if (!close()) next send will throw"
268          */
269         val closeAdded = queue.addLastIfPrev(closed) { it !is Closed<*> }
270         val actuallyClosed = if (closeAdded) closed else queue.prevNode as Closed<*>
271         helpClose(actuallyClosed)
272         if (closeAdded) invokeOnCloseHandler(cause)
273         return closeAdded // true if we have closed
274     }
275 
276     private fun invokeOnCloseHandler(cause: Throwable?) {
277         val handler = onCloseHandler.value
278         if (handler !== null && handler !== HANDLER_INVOKED
279             && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
280             // CAS failed -> concurrent invokeOnClose() invoked handler
281             @Suppress("UNCHECKED_CAST")
282             (handler as Handler)(cause)
283         }
284     }
285 
286     override fun invokeOnClose(handler: Handler) {
287         // Intricate dance for concurrent invokeOnClose and close calls
288         if (!onCloseHandler.compareAndSet(null, handler)) {
289             val value = onCloseHandler.value
290             if (value === HANDLER_INVOKED) {
291                 throw IllegalStateException("Another handler was already registered and successfully invoked")
292             }
293 
294             throw IllegalStateException("Another handler was already registered: $value")
295         } else {
296             val closedToken = closedForSend
297             if (closedToken != null && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
298                 // CAS failed -> close() call invoked handler
299                 (handler)(closedToken.closeCause)
300             }
301         }
302     }
303 
304     private fun helpClose(closed: Closed<*>) {
305         /*
306          * It's important to traverse list from right to left to avoid races with sender.
307          * Consider channel state: head -> [receive_1] -> [receive_2] -> head
308          * - T1 calls receive()
309          * - T2 calls close()
310          * - T3 calls close() + send(value)
311          *
312          * If both will traverse list from left to right, following non-linearizable history is possible:
313          * [close -> false], [send -> transferred 'value' to receiver]
314          *
315          * Another problem with linearizability of close is that we cannot resume closed receives until all
316          * receivers are removed from the list.
317          * Consider channel state: head -> [receive_1] -> [receive_2] -> head
318          * - T1 called receive_2, and will call send() when it's receive call resumes
319          * - T2 calls close()
320          *
321          * Now if T2's close resumes T1's receive_2 then it's receive gets "closed for receive" exception, but
322          * its subsequent attempt to send successfully rendezvous with receive_1, producing non-linearizable execution.
323          */
324         var closedList = InlineList<Receive<E>>()
325         while (true) {
326             // Break when channel is empty or has no receivers
327             @Suppress("UNCHECKED_CAST")
328             val previous = closed.prevNode as? Receive<E> ?: break
329             if (!previous.remove()) {
330                 // failed to remove the node (due to race) -- retry finding non-removed prevNode
331                 // NOTE: remove() DOES NOT help pending remove operation (that marked next pointer)
332                 previous.helpRemove() // make sure remove is complete before continuing
333                 continue
334             }
335             // add removed nodes to a separate list
336             closedList += previous
337         }
338         /*
339          * Now notify all removed nodes that the channel was closed
340          * in the order they were added to the channel
341          */
342         closedList.forEachReversed { it.resumeReceiveClosed(closed) }
343         // and do other post-processing
344         onClosedIdempotent(closed)
345     }
346 
347     /**
348      * Invoked when channel is closed as the last action of [close] invocation.
349      * This method should be idempotent and can be called multiple times.
350      */
351     protected open fun onClosedIdempotent(closed: LockFreeLinkedListNode) {}
352 
353     /**
354      * Retrieves first receiving waiter from the queue or returns closed token.
355      * @suppress **This is unstable API and it is subject to change.**
356      */
357     protected open fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
358         queue.removeFirstIfIsInstanceOfOrPeekIf<ReceiveOrClosed<E>>({ it is Closed<*> })
359 
360     // ------ registerSelectSend ------
361 
362     /**
363      * @suppress **This is unstable API and it is subject to change.**
364      */
365     protected fun describeTryOffer(element: E): TryOfferDesc<E> = TryOfferDesc(element, queue)
366 
367     /**
368      * @suppress **This is unstable API and it is subject to change.**
369      */
370     protected class TryOfferDesc<E>(
371         @JvmField val element: E,
372         queue: LockFreeLinkedListHead
373     ) : RemoveFirstDesc<ReceiveOrClosed<E>>(queue) {
374         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
375             is Closed<*> -> affected
376             !is ReceiveOrClosed<*> -> OFFER_FAILED
377             else -> null
378         }
379 
380         @Suppress("UNCHECKED_CAST")
381         override fun onPrepare(prepareOp: PrepareOp): Any? {
382             val affected = prepareOp.affected as ReceiveOrClosed<E> // see "failure" impl
383             val token = affected.tryResumeReceive(element, prepareOp) ?: return REMOVE_PREPARED
384             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
385             assert { token === RESUME_TOKEN }
386             return null
387         }
388     }
389 
390     final override val onSend: SelectClause2<E, SendChannel<E>>
391         get() = object : SelectClause2<E, SendChannel<E>> {
392             override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
393                 registerSelectSend(select, param, block)
394             }
395         }
396 
397     private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
398         while (true) {
399             if (select.isSelected) return
400             if (isFullImpl) {
401                 val node = SendSelect(element, this, select, block)
402                 val enqueueResult = enqueueSend(node)
403                 when {
404                     enqueueResult == null -> { // enqueued successfully
405                         select.disposeOnSelect(node)
406                         return
407                     }
408                     enqueueResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, enqueueResult))
409                     enqueueResult === ENQUEUE_FAILED -> {} // try to offer
410                     enqueueResult is Receive<*> -> {} // try to offer
411                     else -> error("enqueueSend returned $enqueueResult ")
412                 }
413             }
414             // hm... receiver is waiting or buffer is not full. try to offer
415             val offerResult = offerSelectInternal(element, select)
416             when {
417                 offerResult === ALREADY_SELECTED -> return
418                 offerResult === OFFER_FAILED -> {} // retry
419                 offerResult === RETRY_ATOMIC -> {} // retry
420                 offerResult === OFFER_SUCCESS -> {
421                     block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
422                     return
423                 }
424                 offerResult is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(element, offerResult))
425                 else -> error("offerSelectInternal returned $offerResult")
426             }
427         }
428     }
429 
430     // ------ debug ------
431 
432     public override fun toString() =
433         "$classSimpleName@$hexAddress{$queueDebugStateString}$bufferDebugString"
434 
435     private val queueDebugStateString: String
436         get() {
437             val head = queue.nextNode
438             if (head === queue) return "EmptyQueue"
439             var result = when (head) {
440                 is Closed<*> -> head.toString()
441                 is Receive<*> -> "ReceiveQueued"
442                 is Send -> "SendQueued"
443                 else -> "UNEXPECTED:$head" // should not happen
444             }
445             val tail = queue.prevNode
446             if (tail !== head) {
447                 result += ",queueSize=${countQueueSize()}"
448                 if (tail is Closed<*>) result += ",closedForSend=$tail"
449             }
450             return result
451         }
452 
453     private fun countQueueSize(): Int {
454         var size = 0
455         queue.forEach<LockFreeLinkedListNode> { size++ }
456         return size
457     }
458 
459     protected open val bufferDebugString: String get() = ""
460 
461     // ------ private ------
462 
463     private class SendSelect<E, R>(
464         override val pollResult: E, // E | Closed - the result pollInternal returns when it rendezvous with this node
465         @JvmField val channel: AbstractSendChannel<E>,
466         @JvmField val select: SelectInstance<R>,
467         @JvmField val block: suspend (SendChannel<E>) -> R
468     ) : Send(), DisposableHandle {
469         override fun tryResumeSend(otherOp: PrepareOp?): Symbol? =
470             select.trySelectOther(otherOp) as Symbol? // must return symbol
471 
472         override fun completeResumeSend() {
473             block.startCoroutineCancellable(receiver = channel, completion = select.completion)
474         }
475 
476         override fun dispose() { // invoked on select completion
477             if (!remove()) return
478             // if the node was successfully removed (meaning it was added but was not received) then element not delivered
479             undeliveredElement()
480         }
481 
482         override fun resumeSendClosed(closed: Closed<*>) {
483             if (select.trySelect())
484                 select.resumeSelectWithException(closed.sendException)
485         }
486 
487         override fun undeliveredElement() {
488             channel.onUndeliveredElement?.callUndeliveredElement(pollResult, select.completion.context)
489         }
490 
491         override fun toString(): String = "SendSelect@$hexAddress($pollResult)[$channel, $select]"
492     }
493 
494     internal class SendBuffered<out E>(
495         @JvmField val element: E
496     ) : Send() {
497         override val pollResult: Any? get() = element
498         override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
499         override fun completeResumeSend() {}
500 
501         /**
502          * This method should be never called, see special logic in [LinkedListChannel.onCancelIdempotentList].
503          */
504         override fun resumeSendClosed(closed: Closed<*>) {
505             assert { false }
506         }
507 
508         override fun toString(): String = "SendBuffered@$hexAddress($element)"
509     }
510 }
511 
512 /**
513  * Abstract send/receive channel. It is a base class for all channel implementations.
514  */
515 internal abstract class AbstractChannel<E>(
516     onUndeliveredElement: OnUndeliveredElement<E>?
517 ) : AbstractSendChannel<E>(onUndeliveredElement), Channel<E> {
518     // ------ extension points for buffered channels ------
519 
520     /**
521      * Returns `true` if [isBufferEmpty] is always `true`.
522      * @suppress **This is unstable API and it is subject to change.**
523      */
524     protected abstract val isBufferAlwaysEmpty: Boolean
525 
526     /**
527      * Returns `true` if this channel's buffer is empty.
528      * This operation should be atomic if it is invoked by [enqueueReceive].
529      * @suppress **This is unstable API and it is subject to change.**
530      */
531     protected abstract val isBufferEmpty: Boolean
532 
533     // ------ internal functions for override by buffered channels ------
534 
535     /**
536      * Tries to remove element from buffer or from queued sender.
537      * Return type is `E | POLL_FAILED | Closed`
538      * @suppress **This is unstable API and it is subject to change.**
539      */
pollInternalnull540     protected open fun pollInternal(): Any? {
541         while (true) {
542             val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED
543             val token = send.tryResumeSend(null)
544             if (token != null) {
545                 assert { token === RESUME_TOKEN }
546                 send.completeResumeSend()
547                 return send.pollResult
548             }
549             // too late, already cancelled, but we removed it from the queue and need to notify on undelivered element
550             send.undeliveredElement()
551         }
552     }
553 
554     /**
555      * Tries to remove element from buffer or from queued sender if select statement clause was not selected yet.
556      * Return type is `ALREADY_SELECTED | E | POLL_FAILED | RETRY_ATOMIC | Closed`
557      * @suppress **This is unstable API and it is subject to change.**
558      */
pollSelectInternalnull559     protected open fun pollSelectInternal(select: SelectInstance<*>): Any? {
560         // poll atomically with select
561         val pollOp = describeTryPoll()
562         val failure = select.performAtomicTrySelect(pollOp)
563         if (failure != null) return failure
564         val send = pollOp.result
565         send.completeResumeSend()
566         return pollOp.result.pollResult
567     }
568 
569     // ------ state functions & helpers for concrete implementations ------
570 
571     /**
572      * @suppress **This is unstable API and it is subject to change.**
573      */
574     protected val hasReceiveOrClosed: Boolean get() = queue.nextNode is ReceiveOrClosed<*>
575 
576     // ------ ReceiveChannel ------
577 
578     public override val isClosedForReceive: Boolean get() = closedForReceive != null && isBufferEmpty
579     public override val isEmpty: Boolean get() = isEmptyImpl
580     protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
581 
receivenull582     public final override suspend fun receive(): E {
583         // fast path -- try poll non-blocking
584         val result = pollInternal()
585         /*
586          * If result is Closed -- go to tail-call slow-path that will allow us to
587          * properly recover stacktrace without paying a performance cost on fast path.
588          * We prefer to recover stacktrace using suspending path to have a more precise stacktrace.
589          */
590         @Suppress("UNCHECKED_CAST")
591         if (result !== POLL_FAILED && result !is Closed<*>) return result as E
592         // slow-path does suspend
593         return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
594     }
595 
596     @Suppress("UNCHECKED_CAST")
receiveSuspendnull597     private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
598         val receive = if (onUndeliveredElement == null)
599             ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
600             ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
601         while (true) {
602             if (enqueueReceive(receive)) {
603                 removeReceiveOnCancel(cont, receive)
604                 return@sc
605             }
606             // hm... something is not right. try to poll
607             val result = pollInternal()
608             if (result is Closed<*>) {
609                 receive.resumeReceiveClosed(result)
610                 return@sc
611             }
612             if (result !== POLL_FAILED) {
613                 cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
614                 return@sc
615             }
616         }
617     }
618 
enqueueReceiveInternalnull619     protected open fun enqueueReceiveInternal(receive: Receive<E>): Boolean = if (isBufferAlwaysEmpty)
620         queue.addLastIfPrev(receive) { it !is Send } else
<lambda>null621         queue.addLastIfPrevAndIf(receive, { it !is Send }, { isBufferEmpty })
622 
resultnull623     private fun enqueueReceive(receive: Receive<E>) = enqueueReceiveInternal(receive).also { result ->
624         if (result) onReceiveEnqueued()
625     }
626 
627     @Suppress("UNCHECKED_CAST")
receiveCatchingnull628     public final override suspend fun receiveCatching(): ChannelResult<E> {
629         // fast path -- try poll non-blocking
630         val result = pollInternal()
631         if (result !== POLL_FAILED) return result.toResult()
632         // slow-path does suspend
633         return receiveSuspend(RECEIVE_RESULT)
634     }
635 
636     @Suppress("UNCHECKED_CAST")
tryReceivenull637     public final override fun tryReceive(): ChannelResult<E> {
638         val result = pollInternal()
639         if (result === POLL_FAILED) return ChannelResult.failure()
640         if (result is Closed<*>) return ChannelResult.closed(result.closeCause)
641         return ChannelResult.success(result as E)
642     }
643 
644     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
cancelnull645     final override fun cancel(cause: Throwable?): Boolean =
646         cancelInternal(cause)
647 
648     final override fun cancel(cause: CancellationException?) {
649         /*
650          * Do not create an exception if channel is already cancelled.
651          * Channel is closed for receive when either it is cancelled (then we are free to bail out)
652          * or was closed and elements were received.
653          * Then `onCancelIdempotent` does nothing for all implementations.
654          */
655         if (isClosedForReceive) return
656         cancelInternal(cause ?: CancellationException("$classSimpleName was cancelled"))
657     }
658 
659     // It needs to be internal to support deprecated cancel(Throwable?) API
cancelInternalnull660     internal fun cancelInternal(cause: Throwable?): Boolean =
661         close(cause).also {
662             onCancelIdempotent(it)
663         }
664 
665     /**
666      * Method that is invoked right after [close] in [cancel] sequence.
667      * [wasClosed] is directly mapped to the value returned by [close].
668      */
onCancelIdempotentnull669     protected open fun onCancelIdempotent(wasClosed: Boolean) {
670         /*
671          * See the comment to helpClose, all these machinery (reversed order of iteration, postponed resume)
672          * has the same rationale.
673          */
674         val closed = closedForSend ?: error("Cannot happen")
675         var list = InlineList<Send>()
676         while (true) {
677             val previous = closed.prevNode
678             if (previous is LockFreeLinkedListHead) {
679                 break
680             }
681             assert { previous is Send }
682             if (!previous.remove()) {
683                 previous.helpRemove() // make sure remove is complete before continuing
684                 continue
685             }
686             // Add to the list only **after** successful removal
687             list += previous as Send
688         }
689         onCancelIdempotentList(list, closed)
690     }
691 
692     /**
693      * This method is overridden by [LinkedListChannel] to handle cancellation of [SendBuffered] elements from the list.
694      */
onCancelIdempotentListnull695     protected open fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
696         list.forEachReversed { it.resumeSendClosed(closed) }
697     }
698 
iteratornull699     public final override fun iterator(): ChannelIterator<E> = Itr(this)
700 
701     // ------ registerSelectReceive ------
702 
703     /**
704      * @suppress **This is unstable API and it is subject to change.**
705      */
706     protected fun describeTryPoll(): TryPollDesc<E> = TryPollDesc(queue)
707 
708     /**
709      * @suppress **This is unstable API and it is subject to change.**
710      */
711     protected class TryPollDesc<E>(queue: LockFreeLinkedListHead) : RemoveFirstDesc<Send>(queue) {
712         override fun failure(affected: LockFreeLinkedListNode): Any? = when (affected) {
713             is Closed<*> -> affected
714             !is Send -> POLL_FAILED
715             else -> null
716         }
717 
718         @Suppress("UNCHECKED_CAST")
719         override fun onPrepare(prepareOp: PrepareOp): Any? {
720             val affected = prepareOp.affected as Send // see "failure" impl
721             val token = affected.tryResumeSend(prepareOp) ?: return REMOVE_PREPARED
722             if (token === RETRY_ATOMIC) return RETRY_ATOMIC
723             assert { token === RESUME_TOKEN }
724             return null
725         }
726 
727         override fun onRemoved(affected: LockFreeLinkedListNode) {
728             // Called when we removed it from the queue but were too late to resume, so we have undelivered element
729             (affected as Send).undeliveredElement()
730         }
731     }
732 
733     final override val onReceive: SelectClause1<E>
734         get() = object : SelectClause1<E> {
735             @Suppress("UNCHECKED_CAST")
registerSelectClause1null736             override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
737                 registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
738             }
739         }
740 
741     final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
742         get() = object : SelectClause1<ChannelResult<E>> {
743             @Suppress("UNCHECKED_CAST")
registerSelectClause1null744             override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
745                 registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
746             }
747         }
748 
registerSelectReceiveModenull749     private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
750         while (true) {
751             if (select.isSelected) return
752             if (isEmptyImpl) {
753                 if (enqueueReceiveSelect(select, block, receiveMode)) return
754             } else {
755                 val pollResult = pollSelectInternal(select)
756                 when {
757                     pollResult === ALREADY_SELECTED -> return
758                     pollResult === POLL_FAILED -> {} // retry
759                     pollResult === RETRY_ATOMIC -> {} // retry
760                     else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
761                 }
762             }
763         }
764     }
765 
tryStartBlockUninterceptednull766     private fun <R> (suspend (Any?) -> R).tryStartBlockUnintercepted(select: SelectInstance<R>, receiveMode: Int, value: Any?) {
767         when (value) {
768             is Closed<*> -> {
769                 when (receiveMode) {
770                     RECEIVE_THROWS_ON_CLOSE -> {
771                         throw recoverStackTrace(value.receiveException)
772                     }
773                     RECEIVE_RESULT -> {
774                         if (!select.trySelect()) return
775                         startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
776                     }
777                 }
778             }
779             else -> {
780                 if (receiveMode == RECEIVE_RESULT) {
781                     startCoroutineUnintercepted(value.toResult<Any>(), select.completion)
782                 } else {
783                     startCoroutineUnintercepted(value, select.completion)
784                 }
785             }
786         }
787     }
788 
enqueueReceiveSelectnull789     private fun <R> enqueueReceiveSelect(
790         select: SelectInstance<R>,
791         block: suspend (Any?) -> R,
792         receiveMode: Int
793     ): Boolean {
794         val node = ReceiveSelect(this, select, block, receiveMode)
795         val result = enqueueReceive(node)
796         if (result) select.disposeOnSelect(node)
797         return result
798     }
799 
800     // ------ protected ------
801 
takeFirstReceiveOrPeekClosednull802     override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
803         super.takeFirstReceiveOrPeekClosed().also {
804             if (it != null && it !is Closed<*>) onReceiveDequeued()
805         }
806 
807     /**
808      * Invoked when receiver is successfully enqueued to the queue of waiting receivers.
809      * @suppress **This is unstable API and it is subject to change.**
810      */
onReceiveEnqueuednull811     protected open fun onReceiveEnqueued() {}
812 
813     /**
814      * Invoked when enqueued receiver was successfully removed from the queue of waiting receivers.
815      * @suppress **This is unstable API and it is subject to change.**
816      */
onReceiveDequeuednull817     protected open fun onReceiveDequeued() {}
818 
819     // ------ private ------
820 
removeReceiveOnCancelnull821     private fun removeReceiveOnCancel(cont: CancellableContinuation<*>, receive: Receive<*>) =
822         cont.invokeOnCancellation(handler = RemoveReceiveOnCancel(receive).asHandler)
823 
824     private inner class RemoveReceiveOnCancel(private val receive: Receive<*>) : BeforeResumeCancelHandler() {
825         override fun invoke(cause: Throwable?) {
826             if (receive.remove())
827                 onReceiveDequeued()
828         }
829         override fun toString(): String = "RemoveReceiveOnCancel[$receive]"
830     }
831 
832     private class Itr<E>(@JvmField val channel: AbstractChannel<E>) : ChannelIterator<E> {
833         var result: Any? = POLL_FAILED // E | POLL_FAILED | Closed
834 
hasNextnull835         override suspend fun hasNext(): Boolean {
836             // check for repeated hasNext
837             if (result !== POLL_FAILED) return hasNextResult(result)
838             // fast path -- try poll non-blocking
839             result = channel.pollInternal()
840             if (result !== POLL_FAILED) return hasNextResult(result)
841             // slow-path does suspend
842             return hasNextSuspend()
843         }
844 
hasNextResultnull845         private fun hasNextResult(result: Any?): Boolean {
846             if (result is Closed<*>) {
847                 if (result.closeCause != null) throw recoverStackTrace(result.receiveException)
848                 return false
849             }
850             return true
851         }
852 
hasNextSuspendnull853         private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
854             val receive = ReceiveHasNext(this, cont)
855             while (true) {
856                 if (channel.enqueueReceive(receive)) {
857                     channel.removeReceiveOnCancel(cont, receive)
858                     return@sc
859                 }
860                 // hm... something is not right. try to poll
861                 val result = channel.pollInternal()
862                 this.result = result
863                 if (result is Closed<*>) {
864                     if (result.closeCause == null)
865                         cont.resume(false)
866                     else
867                         cont.resumeWithException(result.receiveException)
868                     return@sc
869                 }
870                 if (result !== POLL_FAILED) {
871                     @Suppress("UNCHECKED_CAST")
872                     cont.resume(true, channel.onUndeliveredElement?.bindCancellationFun(result as E, cont.context))
873                     return@sc
874                 }
875             }
876         }
877 
878         @Suppress("UNCHECKED_CAST")
nextnull879         override fun next(): E {
880             val result = this.result
881             if (result is Closed<*>) throw recoverStackTrace(result.receiveException)
882             if (result !== POLL_FAILED) {
883                 this.result = POLL_FAILED
884                 return result as E
885             }
886 
887             throw IllegalStateException("'hasNext' should be called prior to 'next' invocation")
888         }
889     }
890 
891     private open class ReceiveElement<in E>(
892         @JvmField val cont: CancellableContinuation<Any?>,
893         @JvmField val receiveMode: Int
894     ) : Receive<E>() {
resumeValuenull895         fun resumeValue(value: E): Any? = when (receiveMode) {
896             RECEIVE_RESULT -> ChannelResult.success(value)
897             else -> value
898         }
899 
tryResumeReceivenull900         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
901             val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
902             assert { token === RESUME_TOKEN } // the only other possible result
903             // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
904             otherOp?.finishPrepare()
905             return RESUME_TOKEN
906         }
907 
completeResumeReceivenull908         override fun completeResumeReceive(value: E) = cont.completeResume(RESUME_TOKEN)
909 
910         override fun resumeReceiveClosed(closed: Closed<*>) {
911             when {
912                 receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
913                 else -> cont.resumeWithException(closed.receiveException)
914             }
915         }
toStringnull916         override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
917     }
918 
919     private class ReceiveElementWithUndeliveredHandler<in E>(
920         cont: CancellableContinuation<Any?>,
921         receiveMode: Int,
922         @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
923     ) : ReceiveElement<E>(cont, receiveMode) {
924         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
925             onUndeliveredElement.bindCancellationFun(value, cont.context)
926     }
927 
928     private open class ReceiveHasNext<E>(
929         @JvmField val iterator: Itr<E>,
930         @JvmField val cont: CancellableContinuation<Boolean>
931     ) : Receive<E>() {
tryResumeReceivenull932         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
933             val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
934                 ?: return null
935             assert { token === RESUME_TOKEN } // the only other possible result
936             // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
937             otherOp?.finishPrepare()
938             return RESUME_TOKEN
939         }
940 
completeResumeReceivenull941         override fun completeResumeReceive(value: E) {
942             /*
943                When otherOp != null invocation of tryResumeReceive can happen multiple times and much later,
944                but completeResumeReceive is called once so we set iterator result here.
945              */
946             iterator.result = value
947             cont.completeResume(RESUME_TOKEN)
948         }
949 
resumeReceiveClosednull950         override fun resumeReceiveClosed(closed: Closed<*>) {
951             val token = if (closed.closeCause == null) {
952                 cont.tryResume(false)
953             } else {
954                 cont.tryResumeWithException(closed.receiveException)
955             }
956             if (token != null) {
957                 iterator.result = closed
958                 cont.completeResume(token)
959             }
960         }
961 
resumeOnCancellationFunnull962         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
963             iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)
964 
965         override fun toString(): String = "ReceiveHasNext@$hexAddress"
966     }
967 
968     private class ReceiveSelect<R, E>(
969         @JvmField val channel: AbstractChannel<E>,
970         @JvmField val select: SelectInstance<R>,
971         @JvmField val block: suspend (Any?) -> R,
972         @JvmField val receiveMode: Int
973     ) : Receive<E>(), DisposableHandle {
974         override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? =
975             select.trySelectOther(otherOp) as Symbol?
976 
977         @Suppress("UNCHECKED_CAST")
978         override fun completeResumeReceive(value: E) {
979             block.startCoroutineCancellable(
980                 if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
981                 select.completion,
982                 resumeOnCancellationFun(value)
983             )
984         }
985 
986         override fun resumeReceiveClosed(closed: Closed<*>) {
987             if (!select.trySelect()) return
988             when (receiveMode) {
989                 RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
990                 RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
991             }
992         }
993 
994         override fun dispose() { // invoked on select completion
995             if (remove())
996                 channel.onReceiveDequeued() // notify cancellation of receive
997         }
998 
999         override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
1000             channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
1001 
1002         override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
1003     }
1004 }
1005 
1006 // receiveMode values
1007 internal const val RECEIVE_THROWS_ON_CLOSE = 0
1008 internal const val RECEIVE_RESULT = 1
1009 
1010 @JvmField
1011 @SharedImmutable
1012 internal val EMPTY = Symbol("EMPTY") // marker for Conflated & Buffered channels
1013 
1014 @JvmField
1015 @SharedImmutable
1016 internal val OFFER_SUCCESS = Symbol("OFFER_SUCCESS")
1017 
1018 @JvmField
1019 @SharedImmutable
1020 internal val OFFER_FAILED = Symbol("OFFER_FAILED")
1021 
1022 @JvmField
1023 @SharedImmutable
1024 internal val POLL_FAILED = Symbol("POLL_FAILED")
1025 
1026 @JvmField
1027 @SharedImmutable
1028 internal val ENQUEUE_FAILED = Symbol("ENQUEUE_FAILED")
1029 
1030 @JvmField
1031 @SharedImmutable
1032 internal val HANDLER_INVOKED = Symbol("ON_CLOSE_HANDLER_INVOKED")
1033 
1034 internal typealias Handler = (Throwable?) -> Unit
1035 
1036 /**
1037  * Represents sending waiter in the queue.
1038  */
1039 internal abstract class Send : LockFreeLinkedListNode() {
1040     abstract val pollResult: Any? // E | Closed - the result pollInternal returns when it rendezvous with this node
1041     // Returns: null - failure,
1042     //          RETRY_ATOMIC for retry (only when otherOp != null),
1043     //          RESUME_TOKEN on success (call completeResumeSend)
1044     // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeSendnull1045     abstract fun tryResumeSend(otherOp: PrepareOp?): Symbol?
1046     abstract fun completeResumeSend()
1047     abstract fun resumeSendClosed(closed: Closed<*>)
1048     open fun undeliveredElement() {}
1049 }
1050 
1051 /**
1052  * Represents receiver waiter in the queue or closed token.
1053  */
1054 internal interface ReceiveOrClosed<in E> {
1055     val offerResult: Any // OFFER_SUCCESS | Closed
1056     // Returns: null - failure,
1057     //          RETRY_ATOMIC for retry (only when otherOp != null),
1058     //          RESUME_TOKEN on success (call completeResumeReceive)
1059     // Must call otherOp?.finishPrepare() after deciding on result other than RETRY_ATOMIC
tryResumeReceivenull1060     fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol?
1061     fun completeResumeReceive(value: E)
1062 }
1063 
1064 /**
1065  * Represents sender for a specific element.
1066  */
1067 internal open class SendElement<E>(
1068     override val pollResult: E,
1069     @JvmField val cont: CancellableContinuation<Unit>
1070 ) : Send() {
1071     override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
1072         val token = cont.tryResume(Unit, otherOp?.desc) ?: return null
1073         assert { token === RESUME_TOKEN } // the only other possible result
1074         // We can call finishPrepare only after successful tryResume, so that only good affected node is saved
1075         otherOp?.finishPrepare() // finish preparations
1076         return RESUME_TOKEN
1077     }
1078 
1079     override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
1080     override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
1081     override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
1082 }
1083 
1084 internal class SendElementWithUndeliveredHandler<E>(
1085     pollResult: E,
1086     cont: CancellableContinuation<Unit>,
1087     @JvmField val onUndeliveredElement: OnUndeliveredElement<E>
1088 ) : SendElement<E>(pollResult, cont) {
removenull1089     override fun remove(): Boolean {
1090         if (!super.remove()) return false
1091         // if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
1092         undeliveredElement()
1093         return true
1094     }
1095 
undeliveredElementnull1096     override fun undeliveredElement() {
1097         onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
1098     }
1099 }
1100 
1101 /**
1102  * Represents closed channel.
1103  */
1104 internal class Closed<in E>(
1105     @JvmField val closeCause: Throwable?
1106 ) : Send(), ReceiveOrClosed<E> {
1107     val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
1108     val receiveException: Throwable get() = closeCause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
1109 
1110     override val offerResult get() = this
1111     override val pollResult get() = this
tryResumeSendnull1112     override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeSendnull1113     override fun completeResumeSend() {}
<lambda>null1114     override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
completeResumeReceivenull1115     override fun completeResumeReceive(value: E) {}
resumeSendClosednull1116     override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
toStringnull1117     override fun toString(): String = "Closed@$hexAddress[$closeCause]"
1118 }
1119 
1120 internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
1121     override val offerResult get() = OFFER_SUCCESS
1122     abstract fun resumeReceiveClosed(closed: Closed<*>)
1123     open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
1124 }
1125 
1126 @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
toResultnull1127 private inline fun <E> Any?.toResult(): ChannelResult<E> =
1128     if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.success(this as E)
1129 
1130 @Suppress("NOTHING_TO_INLINE")
1131 private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
1132