1 @file:Suppress("FunctionName", "DEPRECATION", "DEPRECATION_ERROR") 2 3 package kotlinx.coroutines.channels 4 5 import kotlinx.coroutines.* 6 import kotlinx.coroutines.channels.BufferOverflow.* 7 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED 8 import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY 9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED 10 import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED 11 import kotlinx.coroutines.internal.* 12 import kotlinx.coroutines.selects.* 13 14 /** 15 * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0 16 */ 17 @ObsoleteCoroutinesApi 18 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") 19 public interface BroadcastChannel<E> : SendChannel<E> { 20 /** 21 * @suppress 22 */ openSubscriptionnull23 public fun openSubscription(): ReceiveChannel<E> 24 25 /** 26 * @suppress 27 */ 28 public fun cancel(cause: CancellationException? = null) 29 30 /** 31 * @suppress 32 */ 33 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only") 34 public fun cancel(cause: Throwable? = null): Boolean 35 } 36 37 /** 38 * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0 39 */ 40 @ObsoleteCoroutinesApi 41 @Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported") 42 public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> = 43 when (capacity) { 44 0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel") 45 UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel") 46 CONFLATED -> ConflatedBroadcastChannel() 47 BUFFERED -> BroadcastChannelImpl(CHANNEL_DEFAULT_CAPACITY) 48 else -> BroadcastChannelImpl(capacity) 49 } 50 51 /** 52 * @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0 53 */ 54 @ObsoleteCoroutinesApi 55 @Deprecated(level = DeprecationLevel.ERROR, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported") 56 public class ConflatedBroadcastChannel<E> private constructor( 57 private val broadcast: BroadcastChannelImpl<E> 58 ) : BroadcastChannel<E> by broadcast { 59 public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED)) 60 /** 61 * @suppress 62 */ 63 public constructor(value: E) : this() { 64 trySend(value) 65 } 66 67 /** 68 * @suppress 69 */ 70 public val value: E get() = broadcast.value 71 72 /** 73 * @suppress 74 */ 75 public val valueOrNull: E? get() = broadcast.valueOrNull 76 } 77 78 /** 79 * A common implementation for both the broadcast channel with a buffer of fixed [capacity] 80 * and the conflated broadcast channel (see [ConflatedBroadcastChannel]). 81 * 82 * **Note**, that elements that are sent to this channel while there are no 83 * [openSubscription] subscribers are immediately lost. 84 * 85 * This channel is created by `BroadcastChannel(capacity)` factory function invocation. 86 */ 87 @Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_DEPRECATION_WARNING", "MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE_DEPRECATION_WARNING") // do not remove the MULTIPLE_DEFAULTS suppression: required in K2 88 internal class BroadcastChannelImpl<E>( 89 /** 90 * Buffer capacity; [Channel.CONFLATED] when this broadcast is conflated. 91 */ 92 val capacity: Int 93 ) : BufferedChannel<E>(capacity = Channel.RENDEZVOUS, onUndeliveredElement = null), BroadcastChannel<E> { 94 init { <lambda>null95 require(capacity >= 1 || capacity == CONFLATED) { 96 "BroadcastChannel capacity must be positive or Channel.CONFLATED, but $capacity was specified" 97 } 98 } 99 100 // This implementation uses coarse-grained synchronization, 101 // as, reputedly, it is the simplest synchronization scheme. 102 // All operations are protected by this lock. 103 private val lock = ReentrantLock() 104 // The list of subscribers; all accesses should be protected by lock. 105 // Each change must create a new list instance to avoid `ConcurrentModificationException`. 106 private var subscribers: List<BufferedChannel<E>> = emptyList() 107 // When this broadcast is conflated, this field stores the last sent element. 108 // If this channel is empty or not conflated, it stores a special `NO_ELEMENT` marker. 109 private var lastConflatedElement: Any? = NO_ELEMENT // NO_ELEMENT or E 110 111 // ########################### 112 // # Subscription Management # 113 // ########################### 114 <lambda>null115 override fun openSubscription(): ReceiveChannel<E> = lock.withLock { // protected by lock 116 // Is this broadcast conflated or buffered? 117 // Create the corresponding subscription channel. 118 val s = if (capacity == CONFLATED) SubscriberConflated() else SubscriberBuffered() 119 // If this broadcast is already closed or cancelled, 120 // and the last sent element is not available in case 121 // this broadcast is conflated, close the created 122 // subscriber immediately and return it. 123 if (isClosedForSend && lastConflatedElement === NO_ELEMENT) { 124 s.close(closeCause) 125 return s 126 } 127 // Is this broadcast conflated? If so, send 128 // the last sent element to the subscriber. 129 if (lastConflatedElement !== NO_ELEMENT) { 130 s.trySend(value) 131 } 132 // Add the subscriber to the list and return it. 133 subscribers += s 134 s 135 } 136 <lambda>null137 private fun removeSubscriber(s: ReceiveChannel<E>) = lock.withLock { // protected by lock 138 subscribers = subscribers.filter { it !== s } 139 } 140 141 // ############################# 142 // # The `send(..)` Operations # 143 // ############################# 144 145 /** 146 * Sends the specified element to all subscribers. 147 * 148 * **!!! THIS IMPLEMENTATION IS NOT LINEARIZABLE !!!** 149 * 150 * As the operation should send the element to multiple 151 * subscribers simultaneously, it is non-trivial to 152 * implement it in an atomic way. Specifically, this 153 * would require a special implementation that does 154 * not transfer the element until all parties are able 155 * to resume it (this `send(..)` can be cancelled 156 * or the broadcast can become closed in the meantime). 157 * As broadcasts are obsolete, we keep this implementation 158 * as simple as possible, allowing non-linearizability 159 * in corner cases. 160 */ sendnull161 override suspend fun send(element: E) { 162 val subs = lock.withLock { // protected by lock 163 // Is this channel closed for send? 164 if (isClosedForSend) throw sendException 165 // Update the last sent element if this broadcast is conflated. 166 if (capacity == CONFLATED) lastConflatedElement = element 167 // Get a reference to the list of subscribers under the lock. 168 subscribers 169 } 170 // The lock has been released. Send the element to the 171 // subscribers one-by-one, and finish immediately 172 // when this broadcast discovered in the closed state. 173 // Note that this implementation is non-linearizable; 174 // see this method documentation for details. 175 subs.forEach { 176 // We use special function to send the element, 177 // which returns `true` on success and `false` 178 // if the subscriber is closed. 179 val success = it.sendBroadcast(element) 180 // The sending attempt has failed. 181 // Check whether the broadcast is closed. 182 if (!success && isClosedForSend) throw sendException 183 } 184 } 185 <lambda>null186 override fun trySend(element: E): ChannelResult<Unit> = lock.withLock { // protected by lock 187 // Is this channel closed for send? 188 if (isClosedForSend) return super.trySend(element) 189 // Check whether the plain `send(..)` operation 190 // should suspend and fail in this case. 191 val shouldSuspend = subscribers.any { it.shouldSendSuspend() } 192 if (shouldSuspend) return ChannelResult.failure() 193 // Update the last sent element if this broadcast is conflated. 194 if (capacity == CONFLATED) lastConflatedElement = element 195 // Send the element to all subscribers. 196 // It is guaranteed that the attempt cannot fail, 197 // as both the broadcast closing and subscription 198 // cancellation are guarded by lock, which is held 199 // by the current operation. 200 subscribers.forEach { it.trySend(element) } 201 // Finish with success. 202 return ChannelResult.success(Unit) 203 } 204 205 // ########################################### 206 // # The `select` Expression: onSend { ... } # 207 // ########################################### 208 registerSelectForSendnull209 override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { 210 // It is extremely complicated to support sending via `select` for broadcasts, 211 // as the operation should wait on multiple subscribers simultaneously. 212 // At the same time, broadcasts are obsolete, so we need a simple implementation 213 // that works somehow. Here is a tricky work-around. First, we launch a new 214 // coroutine that performs plain `send(..)` operation and tries to complete 215 // this `select` via `trySelect`, independently on whether it is in the 216 // registration or in the waiting phase. On success, the operation finishes. 217 // On failure, if another clause is already selected or the `select` operation 218 // has been cancelled, we observe non-linearizable behaviour, as this `onSend` 219 // clause is completed as well. However, we believe that such a non-linearizability 220 // is fine for obsolete API. The last case is when the `select` operation is still 221 // in the registration case, so this `onSend` clause should be re-registered. 222 // The idea is that we keep information that this `onSend` clause is already selected 223 // and finish immediately. 224 @Suppress("UNCHECKED_CAST") 225 element as E 226 // First, check whether this `onSend` clause is already 227 // selected, finishing immediately in this case. 228 lock.withLock { 229 val result = onSendInternalResult.remove(select) 230 if (result != null) { // already selected! 231 // `result` is either `Unit` ot `CHANNEL_CLOSED`. 232 select.selectInRegistrationPhase(result) 233 return 234 } 235 } 236 // Start a new coroutine that performs plain `send(..)` 237 // and tries to select this `onSend` clause at the end. 238 CoroutineScope(select.context).launch(start = CoroutineStart.UNDISPATCHED) { 239 val success: Boolean = try { 240 send(element) 241 // The element has been successfully sent! 242 true 243 } catch (t: Throwable) { 244 // This broadcast must be closed. However, it is possible that 245 // an unrelated exception, such as `OutOfMemoryError` has been thrown. 246 // This implementation checks that the channel is actually closed, 247 // re-throwing the caught exception otherwise. 248 if (isClosedForSend && (t is ClosedSendChannelException || sendException === t)) false 249 else throw t 250 } 251 // Mark this `onSend` clause as selected and 252 // try to complete the `select` operation. 253 lock.withLock { 254 // Status of this `onSend` clause should not be presented yet. 255 assert { onSendInternalResult[select] == null } 256 // Success or fail? Put the corresponding result. 257 onSendInternalResult[select] = if (success) Unit else CHANNEL_CLOSED 258 // Try to select this `onSend` clause. 259 select as SelectImplementation<*> 260 val trySelectResult = select.trySelectDetailed(this@BroadcastChannelImpl, Unit) 261 if (trySelectResult !== TrySelectDetailedResult.REREGISTER) { 262 // In case of re-registration (this `select` was still 263 // in the registration phase), the algorithm will invoke 264 // `registerSelectForSend`. As we stored an information that 265 // this `onSend` clause is already selected (in `onSendInternalResult`), 266 // the algorithm, will complete immediately. Otherwise, to avoid memory 267 // leaks, we must remove this information from the hashmap. 268 onSendInternalResult.remove(select) 269 } 270 } 271 272 } 273 } 274 private val onSendInternalResult = HashMap<SelectInstance<*>, Any?>() // select -> Unit or CHANNEL_CLOSED 275 276 // ############################ 277 // # Closing and Cancellation # 278 // ############################ 279 <lambda>null280 override fun close(cause: Throwable?): Boolean = lock.withLock { // protected by lock 281 // Close all subscriptions first. 282 subscribers.forEach { it.close(cause) } 283 // Remove all subscriptions that do not contain 284 // buffered elements or waiting send-s to avoid 285 // memory leaks. We must keep other subscriptions 286 // in case `broadcast.cancel(..)` is called. 287 subscribers = subscribers.filter { it.hasElements() } 288 // Delegate to the parent implementation. 289 super.close(cause) 290 } 291 <lambda>null292 override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { // protected by lock 293 // Cancel all subscriptions. As part of cancellation procedure, 294 // subscriptions automatically remove themselves from this broadcast. 295 subscribers.forEach { it.cancelImpl(cause) } 296 // For the conflated implementation, clear the last sent element. 297 lastConflatedElement = NO_ELEMENT 298 // Finally, delegate to the parent implementation. 299 super.cancelImpl(cause) 300 } 301 302 override val isClosedForSend: Boolean 303 // Protect by lock to synchronize with `close(..)` / `cancel(..)`. <lambda>null304 get() = lock.withLock { super.isClosedForSend } 305 306 // ############################## 307 // # Subscriber Implementations # 308 // ############################## 309 310 private inner class SubscriberBuffered : BufferedChannel<E>(capacity = capacity) { <lambda>null311 public override fun cancelImpl(cause: Throwable?): Boolean = lock.withLock { 312 // Remove this subscriber from the broadcast on cancellation. 313 removeSubscriber(this@SubscriberBuffered ) 314 super.cancelImpl(cause) 315 } 316 } 317 318 private inner class SubscriberConflated : ConflatedBufferedChannel<E>(capacity = 1, onBufferOverflow = DROP_OLDEST) { cancelImplnull319 public override fun cancelImpl(cause: Throwable?): Boolean { 320 // Remove this subscriber from the broadcast on cancellation. 321 removeSubscriber(this@SubscriberConflated ) 322 return super.cancelImpl(cause) 323 } 324 } 325 326 // ######################################## 327 // # ConflatedBroadcastChannel Operations # 328 // ######################################## 329 330 @Suppress("UNCHECKED_CAST") <lambda>null331 val value: E get() = lock.withLock { 332 // Is this channel closed for sending? 333 if (isClosedForSend) { 334 throw closeCause ?: IllegalStateException("This broadcast channel is closed") 335 } 336 // Is there sent element? 337 if (lastConflatedElement === NO_ELEMENT) error("No value") 338 // Return the last sent element. 339 lastConflatedElement as E 340 } 341 342 @Suppress("UNCHECKED_CAST") <lambda>null343 val valueOrNull: E? get() = lock.withLock { 344 // Is this channel closed for sending? 345 if (isClosedForReceive) null 346 // Is there sent element? 347 else if (lastConflatedElement === NO_ELEMENT) null 348 // Return the last sent element. 349 else lastConflatedElement as E 350 } 351 352 // ################# 353 // # For Debugging # 354 // ################# 355 toStringnull356 override fun toString() = 357 (if (lastConflatedElement !== NO_ELEMENT) "CONFLATED_ELEMENT=$lastConflatedElement; " else "") + 358 "BROADCAST=<${super.toString()}>; " + 359 "SUBSCRIBERS=${subscribers.joinToString(separator = ";", prefix = "<", postfix = ">")}" 360 } 361 362 private val NO_ELEMENT = Symbol("NO_ELEMENT") 363