• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.selects.*
8 
9 /**
10  * Channel with linked-list buffer of a unlimited capacity (limited only by available memory).
11  * Sender to this channel never suspends and [offer] always returns `true`.
12  *
13  * This channel is created by `Channel(Channel.UNLIMITED)` factory function invocation.
14  *
15  * This implementation is fully lock-free.
16  *
17  * @suppress **This an internal API and should not be used from general code.**
18  */
19 internal open class LinkedListChannel<E> : AbstractChannel<E>() {
20     protected final override val isBufferAlwaysEmpty: Boolean get() = true
21     protected final override val isBufferEmpty: Boolean get() = true
22     protected final override val isBufferAlwaysFull: Boolean get() = false
23     protected final override val isBufferFull: Boolean get() = false
24 
25     // result is always `OFFER_SUCCESS | Closed`
offerInternalnull26     protected override fun offerInternal(element: E): Any {
27         while (true) {
28             val result = super.offerInternal(element)
29             when {
30                 result === OFFER_SUCCESS -> return OFFER_SUCCESS
31                 result === OFFER_FAILED -> { // try to buffer
32                     val sendResult = sendBuffered(element)
33                     when (sendResult) {
34                         null -> return OFFER_SUCCESS
35                         is Closed<*> -> return sendResult
36                     }
37                     // otherwise there was receiver in queue, retry super.offerInternal
38                 }
39                 result is Closed<*> -> return result
40                 else -> error("Invalid offerInternal result $result")
41             }
42         }
43     }
44 
45     // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
offerSelectInternalnull46     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
47         while (true) {
48             val result = if (hasReceiveOrClosed)
49                 super.offerSelectInternal(element, select) else
50                 (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS)
51             when {
52                 result === ALREADY_SELECTED -> return ALREADY_SELECTED
53                 result === OFFER_SUCCESS -> return OFFER_SUCCESS
54                 result === OFFER_FAILED -> {} // retry
55                 result is Closed<*> -> return result
56                 else -> error("Invalid result $result")
57             }
58         }
59     }
60 }
61 
62