1 /* 2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.channels 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.internal.* 9 import kotlinx.coroutines.selects.* 10 11 /** 12 * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations, 13 * so that the receiver always gets the most recently sent element. 14 * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received, 15 * while previously sent elements **are lost**. 16 * Sender to this channel never suspends and [offer] always returns `true`. 17 * 18 * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation. 19 */ 20 internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) { 21 protected final override val isBufferAlwaysEmpty: Boolean get() = false 22 protected final override val isBufferEmpty: Boolean get() = value === EMPTY 23 protected final override val isBufferAlwaysFull: Boolean get() = false 24 protected final override val isBufferFull: Boolean get() = false 25 <lambda>null26 override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl } 27 28 private val lock = ReentrantLock() 29 30 private var value: Any? = EMPTY 31 32 // result is `OFFER_SUCCESS | Closed` offerInternalnull33 protected override fun offerInternal(element: E): Any { 34 var receive: ReceiveOrClosed<E>? = null 35 lock.withLock { 36 closedForSend?.let { return it } 37 // if there is no element written in buffer 38 if (value === EMPTY) { 39 // check for receivers that were waiting on the empty buffer 40 loop@ while(true) { 41 receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued 42 if (receive is Closed) { 43 return receive!! 44 } 45 val token = receive!!.tryResumeReceive(element, null) 46 if (token != null) { 47 assert { token === RESUME_TOKEN } 48 return@withLock 49 } 50 } 51 } 52 updateValueLocked(element)?.let { throw it } 53 return OFFER_SUCCESS 54 } 55 // breaks here if offer meets receiver 56 receive!!.completeResumeReceive(element) 57 return receive!!.offerResult 58 } 59 60 // result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed` offerSelectInternalnull61 protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { 62 var receive: ReceiveOrClosed<E>? = null 63 lock.withLock { 64 closedForSend?.let { return it } 65 if (value === EMPTY) { 66 loop@ while(true) { 67 val offerOp = describeTryOffer(element) 68 val failure = select.performAtomicTrySelect(offerOp) 69 when { 70 failure == null -> { // offered successfully 71 receive = offerOp.result 72 return@withLock 73 } 74 failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer 75 failure === RETRY_ATOMIC -> {} // retry 76 failure === ALREADY_SELECTED || failure is Closed<*> -> return failure 77 else -> error("performAtomicTrySelect(describeTryOffer) returned $failure") 78 } 79 } 80 } 81 // try to select sending this element to buffer 82 if (!select.trySelect()) { 83 return ALREADY_SELECTED 84 } 85 updateValueLocked(element)?.let { throw it } 86 return OFFER_SUCCESS 87 } 88 // breaks here if offer meets receiver 89 receive!!.completeResumeReceive(element) 90 return receive!!.offerResult 91 } 92 93 // result is `E | POLL_FAILED | Closed` pollInternalnull94 protected override fun pollInternal(): Any? { 95 var result: Any? = null 96 lock.withLock { 97 if (value === EMPTY) return closedForSend ?: POLL_FAILED 98 result = value 99 value = EMPTY 100 } 101 return result 102 } 103 104 // result is `E | POLL_FAILED | Closed` pollSelectInternalnull105 protected override fun pollSelectInternal(select: SelectInstance<*>): Any? { 106 var result: Any? = null 107 lock.withLock { 108 if (value === EMPTY) return closedForSend ?: POLL_FAILED 109 if (!select.trySelect()) 110 return ALREADY_SELECTED 111 result = value 112 value = EMPTY 113 } 114 return result 115 } 116 onCancelIdempotentnull117 protected override fun onCancelIdempotent(wasClosed: Boolean) { 118 var undeliveredElementException: UndeliveredElementException? = null // resource cancel exception 119 lock.withLock { 120 undeliveredElementException = updateValueLocked(EMPTY) 121 } 122 super.onCancelIdempotent(wasClosed) 123 undeliveredElementException?.let { throw it } // throw exception at the end if there was one 124 } 125 updateValueLockednull126 private fun updateValueLocked(element: Any?): UndeliveredElementException? { 127 val old = value 128 val undeliveredElementException = if (old === EMPTY) null else 129 onUndeliveredElement?.callUndeliveredElementCatchingException(old as E) 130 value = element 131 return undeliveredElementException 132 } 133 <lambda>null134 override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock { 135 super.enqueueReceiveInternal(receive) 136 } 137 138 // ------ debug ------ 139 140 override val bufferDebugString: String 141 get() = "(value=$value)" 142 } 143