• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.channels.BufferOverflow.*
9 import kotlinx.coroutines.channels.ChannelResult.Companion.closed
10 import kotlinx.coroutines.channels.ChannelResult.Companion.success
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.internal.OnUndeliveredElement
13 import kotlinx.coroutines.selects.*
14 import kotlin.coroutines.*
15 
16 /**
17  * This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST]
18  * strategies for buffer overflowing. This implementation ensures that `send(e)` never suspends,
19  * either extracting the first element ([DROP_OLDEST]) or dropping the sending one ([DROP_LATEST])
20  * when the channel capacity exceeds.
21  */
22 internal open class ConflatedBufferedChannel<E>(
23     private val capacity: Int,
24     private val onBufferOverflow: BufferOverflow,
25     onUndeliveredElement: OnUndeliveredElement<E>? = null
26 ) : BufferedChannel<E>(capacity = capacity, onUndeliveredElement = onUndeliveredElement) {
27     init {
28         require(onBufferOverflow !== SUSPEND) {
29             "This implementation does not support suspension for senders, use ${BufferedChannel::class.simpleName} instead"
30         }
31         require(capacity >= 1) {
32             "Buffered channel capacity must be at least 1, but $capacity was specified"
33         }
34     }
35 
36     override val isConflatedDropOldest: Boolean
37         get() = onBufferOverflow == DROP_OLDEST
38 
39     override suspend fun send(element: E) {
40         // Should never suspend, implement via `trySend(..)`.
41         trySendImpl(element, isSendOp = true).onClosed { // fails only when this channel is closed.
42             onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
43                 it.addSuppressed(sendException)
44                 throw it
45             }
46             throw sendException
47         }
48     }
49 
50     override suspend fun sendBroadcast(element: E): Boolean {
51         // Should never suspend, implement via `trySend(..)`.
52         trySendImpl(element, isSendOp = true) // fails only when this channel is closed.
53             .onSuccess { return true }
54         return false
55     }
56 
57     override fun trySend(element: E): ChannelResult<Unit> = trySendImpl(element, isSendOp = false)
58 
59     private fun trySendImpl(element: E, isSendOp: Boolean) =
60         if (onBufferOverflow === DROP_LATEST) trySendDropLatest(element, isSendOp)
61         else trySendDropOldest(element)
62 
63     private fun trySendDropLatest(element: E, isSendOp: Boolean): ChannelResult<Unit> {
64         // Try to send the element without suspension.
65         val result = super.trySend(element)
66         // Complete on success or if this channel is closed.
67         if (result.isSuccess || result.isClosed) return result
68         // This channel is full. Drop the sending element.
69         // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations,
70         // for 'trySend()' it is responsibility of the caller
71         if (isSendOp) {
72             onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let {
73                 throw it
74             }
75         }
76         return success(Unit)
77     }
78 
79     private fun trySendDropOldest(element: E): ChannelResult<Unit> =
80         sendImpl( // <-- this is an inline function
81             element = element,
82             // Put the element into the logical buffer even
83             // if this channel is already full, the `onSuspend`
84             // callback below extract the first (oldest) element.
85             waiter = BUFFERED,
86             // Finish successfully when a rendezvous has happened
87             // or the element has been buffered.
88             onRendezvousOrBuffered = { return success(Unit) },
89             // In case the algorithm decided to suspend, the element
90             // was added to the buffer. However, as the buffer is now
91             // overflowed, the first (oldest) element has to be extracted.
92             onSuspend = { segm, i ->
93                 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i)
94                 return success(Unit)
95             },
96             // If the channel is closed, return the corresponding result.
97             onClosed = { return closed(sendException) }
98         )
99 
100     @Suppress("UNCHECKED_CAST")
101     override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
102         // The plain `send(..)` operation never suspends. Thus, either this
103         // attempt to send the element succeeds or the channel is closed.
104         // In any case, complete this `select` in the registration phase.
105         trySend(element as E).let {
106             it.onSuccess {
107                 select.selectInRegistrationPhase(Unit)
108                 return
109             }.onClosed {
110                 select.selectInRegistrationPhase(CHANNEL_CLOSED)
111                 return
112             }
113         }
114         error("unreachable")
115     }
116 
117     override fun shouldSendSuspend() = false // never suspends.
118 }
119