• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:Suppress("FunctionName", "DEPRECATION", "DEPRECATION_ERROR")
2 
3 package kotlinx.coroutines.channels
4 
5 import kotlinx.coroutines.*
6 import kotlinx.coroutines.channels.BufferOverflow.*
7 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
8 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11 import kotlinx.coroutines.internal.*
12 import kotlinx.coroutines.selects.*
13 
14 /**
15  * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
16  */
17 @ObsoleteCoroutinesApi
18 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
19 public interface BroadcastChannel<E> : SendChannel<E> {
20     /**
21      * @suppress
22      */
openSubscriptionnull23     public fun openSubscription(): ReceiveChannel<E>
24 
25     /**
26      * @suppress
27      */
28     public fun cancel(cause: CancellationException? = null)
29 
30     /**
31      * @suppress
32      */
33     @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
34     public fun cancel(cause: Throwable? = null): Boolean
35 }
36 
37 /**
38  * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
39  */
40 @ObsoleteCoroutinesApi
41 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
42 public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
43     when (capacity) {
44         0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
45         UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
46         CONFLATED -> ConflatedBroadcastChannel()
47         BUFFERED -> BroadcastChannelImpl(CHANNEL_DEFAULT_CAPACITY)
48         else -> BroadcastChannelImpl(capacity)
49     }
50 
51 /**
52  * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
53  */
54 @ObsoleteCoroutinesApi
55 @Deprecated(level = DeprecationLevel.ERROR, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
56 public class ConflatedBroadcastChannel<E> private constructor(
57     private val broadcast: BroadcastChannelImpl<E>
58 ) : BroadcastChannel<E> by broadcast {
59     public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED))
60     /**
61      * @suppress
62      */
63     public constructor(value: E) : this() {
64         trySend(value)
65     }
66 
67     /**
68      * @suppress
69      */
70     public val value: E get() = broadcast.value
71 
72     /**
73      * @suppress
74      */
75     public val valueOrNull: E? get() = broadcast.valueOrNull
76 }
77 
78 /**
79  * A common implementation for both the broadcast channel with a buffer of fixed [capacity]
80  * and the conflated broadcast channel (see [ConflatedBroadcastChannel]).
81  *
82  * **Note**, that elements that are sent to this channel while there are no
83  * [openSubscription] subscribers are immediately lost.
84  *
85  * This channel is created by `BroadcastChannel(capacity)` factory function invocation.
86  */
87 @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING", "MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2
88 internal class BroadcastChannelImpl<E>(
89     /**
90      * Buffer capacity; [Channel.CONFLATED] when this broadcast is conflated.
91      */
92     val capacity: Int
93 ) : BufferedChannel<E>(capacity = Channel.RENDEZVOUS, onUndeliveredElement = null), BroadcastChannel<E> {
94     init {
<lambda>null95         require(capacity >= 1 || capacity == CONFLATED) {
96             "BroadcastChannel capacity must be positive or Channel.CONFLATED, but $capacity was specified"
97         }
98     }
99 
100     // This implementation uses coarse-grained synchronization,
101     // as, reputedly, it is the simplest synchronization scheme.
102     // All operations are protected by this lock.
103     private val lock = ReentrantLock()
104     // The list of subscribers; all accesses should be protected by lock.
105     // Each change must create a new list instance to avoid `ConcurrentModificationException`.
106     private var subscribers: List<BufferedChannel<E>> = emptyList()
107     // When this broadcast is conflated, this field stores the last sent element.
108     // If this channel is empty or not conflated, it stores a special `NO_ELEMENT` marker.
109     private var lastConflatedElement: Any? = NO_ELEMENT // NO_ELEMENT or E
110 
111     // ###########################
112     // # Subscription Management #
113     // ###########################
114 
<lambda>null115     override fun openSubscription(): ReceiveChannel<E> = lock.withLock { // protected by lock
116         // Is this broadcast conflated or buffered?
117         // Create the corresponding subscription channel.
118         val s = if (capacity == CONFLATED) SubscriberConflated() else SubscriberBuffered()
119         // If this broadcast is already closed or cancelled,
120         // and the last sent element is not available in case
121         // this broadcast is conflated, close the created
122         // subscriber immediately and return it.
123         if (isClosedForSend && lastConflatedElement === NO_ELEMENT) {
124             s.close(closeCause)
125             return s
126         }
127         // Is this broadcast conflated? If so, send
128         // the last sent element to the subscriber.
129         if (lastConflatedElement !== NO_ELEMENT) {
130             s.trySend(value)
131         }
132         // Add the subscriber to the list and return it.
133         subscribers += s
134         s
135     }
136 
<lambda>null137     private fun removeSubscriber(s: ReceiveChannel<E>) = lock.withLock { // protected by lock
138         subscribers = subscribers.filter { it !== s }
139     }
140 
141     // #############################
142     // # The `send(..)` Operations #
143     // #############################
144 
145     /**
146      * Sends the specified element to all subscribers.
147      *
148      * **!!! THIS IMPLEMENTATION IS NOT LINEARIZABLE !!!**
149      *
150      * As the operation should send the element to multiple
151      * subscribers simultaneously, it is non-trivial to
152      * implement it in an atomic way. Specifically, this
153      * would require a special implementation that does
154      * not transfer the element until all parties are able
155      * to resume it (this `send(..)` can be cancelled
156      * or the broadcast can become closed in the meantime).
157      * As broadcasts are obsolete, we keep this implementation
158      * as simple as possible, allowing non-linearizability
159      * in corner cases.
160      */
sendnull161     override suspend fun send(element: E) {
162         val subs = lock.withLock { // protected by lock
163             // Is this channel closed for send?
164             if (isClosedForSend) throw sendException
165             // Update the last sent element if this broadcast is conflated.
166             if (capacity == CONFLATED) lastConflatedElement = element
167             // Get a reference to the list of subscribers under the lock.
168             subscribers
169         }
170         // The lock has been released. Send the element to the
171         // subscribers one-by-one, and finish immediately
172         // when this broadcast discovered in the closed state.
173         // Note that this implementation is non-linearizable;
174         // see this method documentation for details.
175         subs.forEach {
176             // We use special function to send the element,
177             // which returns `true` on success and `false`
178             // if the subscriber is closed.
179             val success = it.sendBroadcast(element)
180             // The sending attempt has failed.
181             // Check whether the broadcast is closed.
182             if (!success && isClosedForSend) throw sendException
183         }
184     }
185 
<lambda>null186     override fun trySend(element: E): ChannelResult<Unit> = lock.withLock { // protected by lock
187         // Is this channel closed for send?
188         if (isClosedForSend) return super.trySend(element)
189         // Check whether the plain `send(..)` operation
190         // should suspend and fail in this case.
191         val shouldSuspend = subscribers.any { it.shouldSendSuspend() }
192         if (shouldSuspend) return ChannelResult.failure()
193         // Update the last sent element if this broadcast is conflated.
194         if (capacity == CONFLATED) lastConflatedElement = element
195         // Send the element to all subscribers.
196         // It is guaranteed that the attempt cannot fail,
197         // as both the broadcast closing and subscription
198         // cancellation are guarded by lock, which is held
199         // by the current operation.
200         subscribers.forEach { it.trySend(element) }
201         // Finish with success.
202         return ChannelResult.success(Unit)
203     }
204 
205     // ###########################################
206     // # The `select` Expression: onSend { ... } #
207     // ###########################################
208 
registerSelectForSendnull209     override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) {
210         // It is extremely complicated to support sending via `select` for broadcasts,
211         // as the operation should wait on multiple subscribers simultaneously.
212         // At the same time, broadcasts are obsolete, so we need a simple implementation
213         // that works somehow. Here is a tricky work-around. First, we launch a new
214         // coroutine that performs plain `send(..)` operation and tries to complete
215         // this `select` via `trySelect`, independently on whether it is in the
216         // registration or in the waiting phase. On success, the operation finishes.
217         // On failure, if another clause is already selected or the `select` operation
218         // has been cancelled, we observe non-linearizable behaviour, as this `onSend`
219         // clause is completed as well. However, we believe that such a non-linearizability
220         // is fine for obsolete API. The last case is when the `select` operation is still
221         // in the registration case, so this `onSend` clause should be re-registered.
222         // The idea is that we keep information that this `onSend` clause is already selected
223         // and finish immediately.
224         @Suppress("UNCHECKED_CAST")
225         element as E
226         // First, check whether this `onSend` clause is already
227         // selected, finishing immediately in this case.
228         lock.withLock {
229             val result = onSendInternalResult.remove(select)
230             if (result != null) { // already selected!
231                 // `result` is either `Unit` ot `CHANNEL_CLOSED`.
232                 select.selectInRegistrationPhase(result)
233                 return
234             }
235         }
236         // Start a new coroutine that performs plain `send(..)`
237         // and tries to select this `onSend` clause at the end.
238         CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) {
239             val success: Boolean = try {
240                 send(element)
241                 // The element has been successfully sent!
242                 true
243             } catch (t: Throwable) {
244                 // This broadcast must be closed. However, it is possible that
245                 // an unrelated exception, such as `OutOfMemoryError` has been thrown.
246                 // This implementation checks that the channel is actually closed,
247                 // re-throwing the caught exception otherwise.
248                 if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false
249                 else throw t
250             }
251             // Mark this `onSend` clause as selected and
252             // try to complete the `select` operation.
253             lock.withLock {
254                 // Status of this `onSend` clause should not be presented yet.
255                 assert { onSendInternalResult[select] == null }
256                 // Success or fail? Put the corresponding result.
257                 onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED
258                 // Try to select this `onSend` clause.
259                 select as SelectImplementation<*>
260                 val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl,  Unit)
261                 if (trySelectResult !== TrySelectDetailedResult.REREGISTER) {
262                     // In case of re-registration (this `select` was still
263                     // in the registration phase), the algorithm will invoke
264                     // `registerSelectForSend`. As we stored an information that
265                     // this `onSend` clause is already selected (in `onSendInternalResult`),
266                     // the algorithm, will complete immediately. Otherwise, to avoid memory
267                     // leaks, we must remove this information from the hashmap.
268                     onSendInternalResult.remove(select)
269                 }
270             }
271 
272         }
273     }
274     private val onSendInternalResult = HashMap<SelectInstance<*>, Any?>() // select -> Unit or CHANNEL_CLOSED
275 
276     // ############################
277     // # Closing and Cancellation #
278     // ############################
279 
<lambda>null280     override fun close(cause: Throwable?): Boolean = lock.withLock { // protected by lock
281         // Close all subscriptions first.
282         subscribers.forEach { it.close(cause) }
283         // Remove all subscriptions that do not contain
284         // buffered elements or waiting send-s to avoid
285         // memory leaks. We must keep other subscriptions
286         // in case `broadcast.cancel(..)` is called.
287         subscribers = subscribers.filter { it.hasElements() }
288         // Delegate to the parent implementation.
289         super.close(cause)
290     }
291 
<lambda>null292     override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { // protected by lock
293         // Cancel all subscriptions. As part of cancellation procedure,
294         // subscriptions automatically remove themselves from this broadcast.
295         subscribers.forEach { it.cancelImpl(cause) }
296         // For the conflated implementation, clear the last sent element.
297         lastConflatedElement = NO_ELEMENT
298         // Finally, delegate to the parent implementation.
299         super.cancelImpl(cause)
300     }
301 
302     override val isClosedForSend: Boolean
303         // Protect by lock to synchronize with `close(..)` / `cancel(..)`.
<lambda>null304         get() = lock.withLock { super.isClosedForSend }
305 
306     // ##############################
307     // # Subscriber Implementations #
308     // ##############################
309 
310     private inner class SubscriberBuffered : BufferedChannel<E>(capacity = capacity) {
<lambda>null311         public override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock {
312             // Remove this subscriber from the broadcast on cancellation.
313             removeSubscriber(this@SubscriberBuffered )
314             super.cancelImpl(cause)
315         }
316     }
317 
318     private inner class SubscriberConflated : ConflatedBufferedChannel<E>(capacity = 1, onBufferOverflow = DROP_OLDEST) {
cancelImplnull319         public override fun cancelImpl(cause: Throwable?): Boolean {
320             // Remove this subscriber from the broadcast on cancellation.
321             removeSubscriber(this@SubscriberConflated )
322             return super.cancelImpl(cause)
323         }
324     }
325 
326     // ########################################
327     // # ConflatedBroadcastChannel Operations #
328     // ########################################
329 
330     @Suppress("UNCHECKED_CAST")
<lambda>null331     val value: E get() = lock.withLock {
332         // Is this channel closed for sending?
333         if (isClosedForSend) {
334             throw closeCause ?: IllegalStateException("This broadcast channel is closed")
335         }
336         // Is there sent element?
337         if (lastConflatedElement === NO_ELEMENT) error("No value")
338         // Return the last sent element.
339         lastConflatedElement as E
340     }
341 
342     @Suppress("UNCHECKED_CAST")
<lambda>null343     val valueOrNull: E? get() = lock.withLock {
344         // Is this channel closed for sending?
345         if (isClosedForReceive) null
346         // Is there sent element?
347         else if (lastConflatedElement === NO_ELEMENT) null
348         // Return the last sent element.
349         else lastConflatedElement as E
350     }
351 
352     // #################
353     // # For Debugging #
354     // #################
355 
toStringnull356     override fun toString() =
357         (if (lastConflatedElement !== NO_ELEMENT) "CONFLATED_ELEMENT=$lastConflatedElement; " else "") +
358             "BROADCAST=<${super.toString()}>; " +
359             "SUBSCRIBERS=${subscribers.joinToString(separator = ";", prefix = "<", postfix = ">")}"
360 }
361 
362 private val NO_ELEMENT = Symbol("NO_ELEMENT")
363