• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.channels
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.internal.*
9 import kotlinx.coroutines.selects.*
10 
11 /**
12  * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
13  * so that the receiver always gets the most recently sent element.
14  * Back-to-send sent elements are _conflated_ -- only the most recently sent element is received,
15  * while previously sent elements **are lost**.
16  * Sender to this channel never suspends and [offer] always returns `true`.
17  *
18  * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
19  */
20 internal open class ConflatedChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
21     protected final override val isBufferAlwaysEmpty: Boolean get() = false
22     protected final override val isBufferEmpty: Boolean get() = value === EMPTY
23     protected final override val isBufferAlwaysFull: Boolean get() = false
24     protected final override val isBufferFull: Boolean get() = false
25 
<lambda>null26     override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }
27 
28     private val lock = ReentrantLock()
29 
30     private var value: Any? = EMPTY
31 
32     // result is `OFFER_SUCCESS | Closed`
offerInternalnull33     protected override fun offerInternal(element: E): Any {
34         var receive: ReceiveOrClosed<E>? = null
35         lock.withLock {
36             closedForSend?.let { return it }
37             // if there is no element written in buffer
38             if (value === EMPTY) {
39                 // check for receivers that were waiting on the empty buffer
40                 loop@ while(true) {
41                     receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
42                     if (receive is Closed) {
43                         return receive!!
44                     }
45                     val token = receive!!.tryResumeReceive(element, null)
46                     if (token != null) {
47                         assert { token === RESUME_TOKEN }
48                         return@withLock
49                     }
50                 }
51             }
52             updateValueLocked(element)?.let { throw it }
53             return OFFER_SUCCESS
54         }
55         // breaks here if offer meets receiver
56         receive!!.completeResumeReceive(element)
57         return receive!!.offerResult
58     }
59 
60     // result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
offerSelectInternalnull61     protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
62         var receive: ReceiveOrClosed<E>? = null
63         lock.withLock {
64             closedForSend?.let { return it }
65             if (value === EMPTY) {
66                 loop@ while(true) {
67                     val offerOp = describeTryOffer(element)
68                     val failure = select.performAtomicTrySelect(offerOp)
69                     when {
70                         failure == null -> { // offered successfully
71                             receive = offerOp.result
72                             return@withLock
73                         }
74                         failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
75                         failure === RETRY_ATOMIC -> {} // retry
76                         failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
77                         else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
78                     }
79                 }
80             }
81             // try to select sending this element to buffer
82             if (!select.trySelect()) {
83                 return ALREADY_SELECTED
84             }
85             updateValueLocked(element)?.let { throw it }
86             return OFFER_SUCCESS
87         }
88         // breaks here if offer meets receiver
89         receive!!.completeResumeReceive(element)
90         return receive!!.offerResult
91     }
92 
93     // result is `E | POLL_FAILED | Closed`
pollInternalnull94     protected override fun pollInternal(): Any? {
95         var result: Any? = null
96         lock.withLock {
97             if (value === EMPTY) return closedForSend ?: POLL_FAILED
98             result = value
99             value = EMPTY
100         }
101         return result
102     }
103 
104     // result is `E | POLL_FAILED | Closed`
pollSelectInternalnull105     protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
106         var result: Any? = null
107         lock.withLock {
108             if (value === EMPTY) return closedForSend ?: POLL_FAILED
109             if (!select.trySelect())
110                 return ALREADY_SELECTED
111             result = value
112             value = EMPTY
113         }
114         return result
115     }
116 
onCancelIdempotentnull117     protected override fun onCancelIdempotent(wasClosed: Boolean) {
118         var undeliveredElementException: UndeliveredElementException? = null // resource cancel exception
119         lock.withLock {
120             undeliveredElementException = updateValueLocked(EMPTY)
121         }
122         super.onCancelIdempotent(wasClosed)
123         undeliveredElementException?.let { throw it } // throw exception at the end if there was one
124     }
125 
updateValueLockednull126     private fun updateValueLocked(element: Any?): UndeliveredElementException? {
127         val old = value
128         val undeliveredElementException = if (old === EMPTY) null else
129             onUndeliveredElement?.callUndeliveredElementCatchingException(old as E)
130         value = element
131         return undeliveredElementException
132     }
133 
<lambda>null134     override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
135         super.enqueueReceiveInternal(receive)
136     }
137 
138     // ------ debug ------
139 
140     override val bufferDebugString: String
141         get() = "(value=$value)"
142 }
143