1 /* <lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.channels 6 7 import kotlinx.atomicfu.* 8 import kotlinx.coroutines.channels.BufferOverflow.* 9 import kotlinx.coroutines.channels.ChannelResult.Companion.closed 10 import kotlinx.coroutines.channels.ChannelResult.Companion.success 11 import kotlinx.coroutines.internal.* 12 import kotlinx.coroutines.internal.OnUndeliveredElement 13 import kotlinx.coroutines.selects.* 14 import kotlin.coroutines.* 15 16 /** 17 * This is a special [BufferedChannel] extension that supports [DROP_OLDEST] and [DROP_LATEST] 18 * strategies for buffer overflowing. This implementation ensures that `send(e)` never suspends, 19 * either extracting the first element ([DROP_OLDEST]) or dropping the sending one ([DROP_LATEST]) 20 * when the channel capacity exceeds. 21 */ 22 internal open class ConflatedBufferedChannel<E>( 23 private val capacity: Int, 24 private val onBufferOverflow: BufferOverflow, 25 onUndeliveredElement: OnUndeliveredElement<E>? = null 26 ) : BufferedChannel<E>(capacity = capacity, onUndeliveredElement = onUndeliveredElement) { 27 init { 28 require(onBufferOverflow !== SUSPEND) { 29 "This implementation does not support suspension for senders, use ${BufferedChannel::class.simpleName} instead" 30 } 31 require(capacity >= 1) { 32 "Buffered channel capacity must be at least 1, but $capacity was specified" 33 } 34 } 35 36 override val isConflatedDropOldest: Boolean 37 get() = onBufferOverflow == DROP_OLDEST 38 39 override suspend fun send(element: E) { 40 // Should never suspend, implement via `trySend(..)`. 41 trySendImpl(element, isSendOp = true).onClosed { // fails only when this channel is closed. 42 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { 43 it.addSuppressed(sendException) 44 throw it 45 } 46 throw sendException 47 } 48 } 49 50 override suspend fun sendBroadcast(element: E): Boolean { 51 // Should never suspend, implement via `trySend(..)`. 52 trySendImpl(element, isSendOp = true) // fails only when this channel is closed. 53 .onSuccess { return true } 54 return false 55 } 56 57 override fun trySend(element: E): ChannelResult<Unit> = trySendImpl(element, isSendOp = false) 58 59 private fun trySendImpl(element: E, isSendOp: Boolean) = 60 if (onBufferOverflow === DROP_LATEST) trySendDropLatest(element, isSendOp) 61 else trySendDropOldest(element) 62 63 private fun trySendDropLatest(element: E, isSendOp: Boolean): ChannelResult<Unit> { 64 // Try to send the element without suspension. 65 val result = super.trySend(element) 66 // Complete on success or if this channel is closed. 67 if (result.isSuccess || result.isClosed) return result 68 // This channel is full. Drop the sending element. 69 // Call the `onUndeliveredElement` lambda ONLY for 'send()' invocations, 70 // for 'trySend()' it is responsibility of the caller 71 if (isSendOp) { 72 onUndeliveredElement?.callUndeliveredElementCatchingException(element)?.let { 73 throw it 74 } 75 } 76 return success(Unit) 77 } 78 79 private fun trySendDropOldest(element: E): ChannelResult<Unit> = 80 sendImpl( // <-- this is an inline function 81 element = element, 82 // Put the element into the logical buffer even 83 // if this channel is already full, the `onSuspend` 84 // callback below extract the first (oldest) element. 85 waiter = BUFFERED, 86 // Finish successfully when a rendezvous has happened 87 // or the element has been buffered. 88 onRendezvousOrBuffered = { return success(Unit) }, 89 // In case the algorithm decided to suspend, the element 90 // was added to the buffer. However, as the buffer is now 91 // overflowed, the first (oldest) element has to be extracted. 92 onSuspend = { segm, i -> 93 dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i) 94 return success(Unit) 95 }, 96 // If the channel is closed, return the corresponding result. 97 onClosed = { return closed(sendException) } 98 ) 99 100 @Suppress("UNCHECKED_CAST") 101 override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { 102 // The plain `send(..)` operation never suspends. Thus, either this 103 // attempt to send the element succeeds or the channel is closed. 104 // In any case, complete this `select` in the registration phase. 105 trySend(element as E).let { 106 it.onSuccess { 107 select.selectInRegistrationPhase(Unit) 108 return 109 }.onClosed { 110 select.selectInRegistrationPhase(CHANNEL_CLOSED) 111 return 112 } 113 } 114 error("unreachable") 115 } 116 117 override fun shouldSendSuspend() = false // never suspends. 118 } 119