1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.channels 6 7 import kotlinx.coroutines.internal.* 8 import kotlinx.coroutines.selects.* 9 10 /** 11 * Channel with linked-list buffer of a unlimited capacity (limited only by available memory). 12 * Sender to this channel never suspends and [trySend] always succeeds. 13 * 14 * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation. 15 * 16 * This implementation is fully lock-free. 17 * 18 * @suppress **This an internal API and should not be used from general code.** 19 */ 20 internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) { 21 protected final override val isBufferAlwaysEmpty: Boolean get() = true 22 protected final override val isBufferEmpty: Boolean get() = true 23 protected final override val isBufferAlwaysFull: Boolean get() = false 24 protected final override val isBufferFull: Boolean get() = false 25 26 // result is always `OFFER_SUCCESS | Closed` offerInternalnull27 protected override fun offerInternal(element: E): Any { 28 while (true) { 29 val result = super.offerInternal(element) 30 when { 31 result === OFFER_SUCCESS -> return OFFER_SUCCESS 32 result === OFFER_FAILED -> { // try to buffer 33 when (val sendResult = sendBuffered(element)) { 34 null -> return OFFER_SUCCESS 35 is Closed<*> -> return sendResult 36 } 37 // otherwise there was receiver in queue, retry super.offerInternal 38 } 39 result is Closed<*> -> return result 40 else -> error("Invalid offerInternal result $result") 41 } 42 } 43 } 44 45 // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`. offerSelectInternalnull46 protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { 47 while (true) { 48 val result = if (hasReceiveOrClosed) 49 super.offerSelectInternal(element, select) else 50 (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS) 51 when { 52 result === ALREADY_SELECTED -> return ALREADY_SELECTED 53 result === OFFER_SUCCESS -> return OFFER_SUCCESS 54 result === OFFER_FAILED -> {} // retry 55 result === RETRY_ATOMIC -> {} // retry 56 result is Closed<*> -> return result 57 else -> error("Invalid result $result") 58 } 59 } 60 } 61 onCancelIdempotentListnull62 override fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) { 63 var undeliveredElementException: UndeliveredElementException? = null 64 list.forEachReversed { 65 when (it) { 66 is SendBuffered<*> -> { 67 @Suppress("UNCHECKED_CAST") 68 undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException) 69 } 70 else -> it.resumeSendClosed(closed) 71 } 72 } 73 undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one 74 } 75 } 76 77