• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.internal.*
8 import kotlinx.coroutines.selects.*
9 
10 /**
11  * Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
12  * Sender to this channel never suspends and [trySend] always succeeds.
13  *
14  * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
15  *
16  * This implementation is fully lock-free.
17  *
18  * @suppress **This an internal API and should not be used from general code.**
19  */
20 internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
21     protected final override val isBufferAlwaysEmpty: Boolean get() = true
22     protected final override val isBufferEmpty: Boolean get() = true
23     protected final override val isBufferAlwaysFull: Boolean get() = false
24     protected final override val isBufferFull: Boolean get() = false
25 
26     // result is always `OFFER_SUCCESS | Closed`
offerInternalnull27     protected override fun offerInternal(element: E): Any {
28         while (true) {
29             val result = super.offerInternal(element)
30             when {
31                 result === OFFER_SUCCESS -> return OFFER_SUCCESS
32                 result === OFFER_FAILED -> { // try to buffer
33                     when (val sendResult = sendBuffered(element)) {
34                         null -> return OFFER_SUCCESS
35                         is Closed<*> -> return sendResult
36                     }
37                     // otherwise there was receiver in queue, retry super.offerInternal
38                 }
39                 result is Closed<*> -> return result
40                 else -> error("Invalid offerInternal result $result")
41             }
42         }
43     }
44 
45     // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
offerSelectInternalnull46     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
47         while (true) {
48             val result = if (hasReceiveOrClosed)
49                 super.offerSelectInternal(element, select) else
50                 (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS)
51             when {
52                 result === ALREADY_SELECTED -> return ALREADY_SELECTED
53                 result === OFFER_SUCCESS -> return OFFER_SUCCESS
54                 result === OFFER_FAILED -> {} // retry
55                 result === RETRY_ATOMIC -> {} // retry
56                 result is Closed<*> -> return result
57                 else -> error("Invalid result $result")
58             }
59         }
60     }
61 
onCancelIdempotentListnull62     override fun onCancelIdempotentList(list: InlineList<Send>, closed: Closed<*>) {
63         var undeliveredElementException: UndeliveredElementException? = null
64         list.forEachReversed {
65             when (it) {
66                 is SendBuffered<*> -> {
67                     @Suppress("UNCHECKED_CAST")
68                     undeliveredElementException = onUndeliveredElement?.callUndeliveredElementCatchingException(it.element as E, undeliveredElementException)
69                 }
70                 else -> it.resumeSendClosed(closed)
71             }
72         }
73         undeliveredElementException?.let { throw it } // throw UndeliveredElementException at the end if there was one
74     }
75 }
76 
77