• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.flow
2 
3 import kotlinx.coroutines.*
4 import kotlinx.coroutines.channels.*
5 import kotlinx.coroutines.flow.internal.*
6 import kotlinx.coroutines.internal.*
7 import kotlin.coroutines.*
8 import kotlin.jvm.*
9 
10 /**
11  * A _hot_ [Flow] that shares emitted values among all its collectors in a broadcast fashion, so that all collectors
12  * get all emitted values. A shared flow is called _hot_ because its active instance exists independently of the
13  * presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function,
14  * which is _cold_ and is started separately for each collector.
15  *
16  * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and
17  * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
18  *
19  * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running
20  * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for
21  * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete,
22  * when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a
23  * shared flow to turn it into a completing one.
24  *
25  * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function.
26  * Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations.
27  * See the [MutableSharedFlow] documentation for details.
28  *
29  * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go.
30  * For example, the following class encapsulates an event bus that distributes events to all subscribers
31  * in a _rendezvous_ manner, suspending until all subscribers receive emitted event:
32  *
33  * ```
34  * class EventBus {
35  *     private val _events = MutableSharedFlow<Event>() // private mutable shared flow
36  *     val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
37  *
38  *     suspend fun produceEvent(event: Event) {
39  *         _events.emit(event) // suspends until all subscribers receive it
40  *     }
41  * }
42  * ```
43  *
44  * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function,
45  * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator.
46  *
47  * There is a specialized implementation of shared flow for the case where the most recent state value needs
48  * to be shared. See [StateFlow] for details.
49  *
50  * ### Replay cache and buffer
51  *
52  * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first
53  * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is
54  * specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache
55  * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function.
56  *
57  * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to
58  * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
59  * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
60  * using the `extraBufferCapacity` parameter.
61  *
62  * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
63  * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
64  * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
65  *
66  * **Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept
67  * the new value.**  In the absence of subscribers only the most recent `replay` values are stored and the buffer
68  * overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never
69  * suspends despite [BufferOverflow.SUSPEND] option and [BufferOverflow.DROP_LATEST] option does not have effect either.
70  * Essentially, the behavior in the absence of subscribers is always similar to [BufferOverflow.DROP_OLDEST],
71  * but the buffer is just of `replay` size (without any `extraBufferCapacity`).
72  *
73  * ### Unbuffered shared flow
74  *
75  * A default implementation of a shared flow that is created with `MutableSharedFlow()` constructor function
76  * without parameters has no replay cache nor additional buffer.
77  * [emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value
78  * and returns immediately if there are no subscribers.
79  * Thus, [tryEmit][MutableSharedFlow.tryEmit] call succeeds and returns `true` only if
80  * there are no subscribers (in which case the emitted value is immediately lost).
81  *
82  * ### SharedFlow vs BroadcastChannel
83  *
84  * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
85  * and is designed to completely replace it.
86  * It has the following important differences:
87  *
88  * - `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
89  *   for faster and simpler implementation.
90  * - `SharedFlow` supports configurable replay and buffer overflow strategy.
91  * - `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow].
92  * - `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure.
93  *   All errors and completion signals should be explicitly _materialized_ if needed.
94  *
95  * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
96  * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
97  * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
98  * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
99  *
100  * ### Concurrency
101  *
102  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
103  * external synchronization.
104  *
105  * ### Operator fusion
106  *
107  * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
108  * or [cancellable] operators to a shared flow has no effect.
109  *
110  * ### Implementation notes
111  *
112  * Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are
113  * resumed outside of this lock to avoid deadlocks when using unconfined coroutines. Adding new subscribers
114  * has `O(1)` amortized cost, but emitting has `O(N)` cost, where `N` is the number of subscribers.
115  *
116  * ### Not stable for inheritance
117  *
118  * **The `SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
119  * might be added to this interface in the future, but is stable for use.
120  * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
121  */
122 @OptIn(ExperimentalSubclassOptIn::class)
123 @SubclassOptInRequired(ExperimentalForInheritanceCoroutinesApi::class)
124 public interface SharedFlow<out T> : Flow<T> {
125     /**
126      * A snapshot of the replay cache.
127      */
128     public val replayCache: List<T>
129 
130     /**
131      * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
132      * To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }`
133      * SAM-conversion can be used.
134      *
135      * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
136      * on a shared flow never completes normally.
137      *
138      * It is guaranteed that, by the time the first suspension happens, [collect] has already subscribed to the
139      * [SharedFlow] and is eligible for receiving emissions. In particular, the following code will always print `1`:
140      * ```
141      * val flow = MutableSharedFlow<Int>()
142      * launch(start = CoroutineStart.UNDISPATCHED) {
143      *     flow.collect { println(1) }
144      * }
145      * flow.emit(1)
146      * ```
147      *
148      * @see [Flow.collect] for implementation and inheritance details.
149      */
150     override suspend fun collect(collector: FlowCollector<T>): Nothing
151 }
152 
153 /**
154  * A mutable [SharedFlow] that provides functions to [emit] values to the flow.
155  * An instance of `MutableSharedFlow` with the given configuration parameters can be created using `MutableSharedFlow(...)`
156  * constructor function.
157  *
158  * See the [SharedFlow] documentation for details on shared flows.
159  *
160  * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value,
161  * to [tryEmit] without suspension if possible, to track the [subscriptionCount],
162  * and to [resetReplayCache].
163  *
164  * ### Concurrency
165  *
166  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
167  * external synchronization.
168  *
169  * ### Not stable for inheritance
170  *
171  * **The `MutableSharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
172  * might be added to this interface in the future, but is stable for use.
173  * Use the `MutableSharedFlow(...)` constructor function to create an implementation.
174  */
175 @OptIn(ExperimentalSubclassOptIn::class)
176 @SubclassOptInRequired(ExperimentalForInheritanceCoroutinesApi::class)
177 public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
178     /**
179      * Emits a [value] to this shared flow, suspending on buffer overflow.
180      *
181      * This call can suspend only when the [BufferOverflow] strategy is
182      * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
183      *
184      * If there are no subscribers, the buffer is not used.
185      * Instead, the most recently emitted value is simply stored into
186      * the replay cache if one was configured, displacing the older elements there,
187      * or dropped if no replay cache was configured.
188      *
189      * See [tryEmit] for a non-suspending variant of this function.
190      *
191      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
192      * external synchronization.
193      */
emitnull194     override suspend fun emit(value: T)
195 
196     /**
197      * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
198      * emitted successfully (see below). When this function returns `false`, it means that a call to a plain [emit]
199      * function would suspend until there is buffer space available.
200      *
201      * This call can return `false` only when the [BufferOverflow] strategy is
202      * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
203      *
204      * If there are no subscribers, the buffer is not used.
205      * Instead, the most recently emitted value is simply stored into
206      * the replay cache if one was configured, displacing the older elements there,
207      * or dropped if no replay cache was configured. In any case, `tryEmit` returns `true`.
208      *
209      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
210      * external synchronization.
211      */
212     public fun tryEmit(value: T): Boolean
213 
214     /**
215      * The number of subscribers (active collectors) to this shared flow.
216      *
217      * The integer in the resulting [StateFlow] is not negative and starts with zero for a freshly created
218      * shared flow.
219      *
220      * This state can be used to react to changes in the number of subscriptions to this shared flow.
221      * For example, if you need to call `onActive` when the first subscriber appears and `onInactive`
222      * when the last one disappears, you can set it up like this:
223      *
224      * ```
225      * sharedFlow.subscriptionCount
226      *     .map { count -> count > 0 } // map count into active/inactive flag
227      *     .distinctUntilChanged() // only react to true<->false changes
228      *     .onEach { isActive -> // configure an action
229      *         if (isActive) onActive() else onInactive()
230      *     }
231      *     .launchIn(scope) // launch it
232      * ```
233      *
234      * Usually, [StateFlow] conflates values, but [subscriptionCount] is not conflated.
235      * This is done so that any subscribers that need to be notified when subscribers appear do
236      * reliably observe it. With conflation, if a single subscriber appeared and immediately left, those
237      * collecting [subscriptionCount] could fail to notice it due to `0` immediately conflating the
238      * subscription count.
239      */
240     public val subscriptionCount: StateFlow<Int>
241 
242     /**
243      * Resets the [replayCache] of this shared flow to an empty state.
244      * New subscribers will be receiving only the values that were emitted after this call,
245      * while old subscribers will still be receiving previously buffered values.
246      * To reset a shared flow to an initial value, emit the value after this call.
247      *
248      * On a [MutableStateFlow], which always contains a single value, this function is not
249      * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
250      * to an initial value, just update its [value][MutableStateFlow.value].
251      *
252      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
253      * external synchronization.
254      *
255      * **Note: This is an experimental api.** This function may be removed or renamed in the future.
256      */
257     @ExperimentalCoroutinesApi
258     public fun resetReplayCache()
259 }
260 
261 /**
262  * Creates a [MutableSharedFlow] with the given configuration parameters.
263  *
264  * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
265  *
266  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
267  * @param extraBufferCapacity the number of values buffered in addition to `replay`.
268  *   [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
269  * @param onBufferOverflow configures an [emit][MutableSharedFlow.emit] action on buffer overflow. Optional, defaults to
270  *   [suspending][BufferOverflow.SUSPEND] attempts to emit a value.
271  *   Values other than [BufferOverflow.SUSPEND] are supported only when `replay > 0` or `extraBufferCapacity > 0`.
272  *   **Buffer overflow can happen only when there is at least one subscriber that is not ready to accept
273  *   the new value.** In the absence of subscribers only the most recent [replay] values are stored and
274  *   the buffer overflow behavior is never triggered and has no effect.
275  */
276 @Suppress("FunctionName", "UNCHECKED_CAST")
277 public fun <T> MutableSharedFlow(
278     replay: Int = 0,
279     extraBufferCapacity: Int = 0,
280     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
281 ): MutableSharedFlow<T> {
282     require(replay >= 0) { "replay cannot be negative, but was $replay" }
283     require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
284     require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
285         "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
286     }
287     val bufferCapacity0 = replay + extraBufferCapacity
288     val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
289     return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
290 }
291 
292 // ------------------------------------ Implementation ------------------------------------
293 
294 internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
295     @JvmField
296     var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
297 
298     @JvmField
299     var cont: Continuation<Unit>? = null // collector waiting for new value
300 
allocateLockednull301     override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
302         if (index >= 0) return false // not free
303         index = flow.updateNewCollectorIndexLocked()
304         return true
305     }
306 
freeLockednull307     override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
308         assert { index >= 0 }
309         val oldIndex = index
310         index = -1L
311         cont = null // cleanup continuation reference
312         return flow.updateCollectorIndexLocked(oldIndex)
313     }
314 }
315 
316 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
317 internal open class SharedFlowImpl<T>(
318     private val replay: Int,
319     private val bufferCapacity: Int,
320     private val onBufferOverflow: BufferOverflow
321 ) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
322     /*
323         Logical structure of the buffer
324 
325                   buffered values
326              /-----------------------\
327                           replayCache      queued emitters
328                           /----------\/----------------------\
329          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
330          |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
331          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
332                ^           ^           ^                      ^
333                |           |           |                      |
334               head         |      head + bufferSize     head + totalSize
335                |           |           |
336      index of the slowest  |    index of the fastest
337       possible collector   |     possible collector
338                |           |
339                |     replayIndex == new collector's index
340                \---------------------- /
341           range of possible minCollectorIndex
342 
343           head == minOf(minCollectorIndex, replayIndex) // by definition
344           totalSize == bufferSize + queueSize // by definition
345 
346        INVARIANTS:
347           minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
348           replayIndex <= head + bufferSize
349      */
350 
351     // Stored state
352     private var buffer: Array<Any?>? = null // allocated when needed, allocated size always power of two
353     private var replayIndex = 0L // minimal index from which new collector gets values
354     private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none
355     private var bufferSize = 0 // number of buffered values
356     private var queueSize = 0 // number of queued emitters
357 
358     // Computed state
359     private val head: Long get() = minOf(minCollectorIndex, replayIndex)
360     private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
361     private val totalSize: Int get() = bufferSize + queueSize
362     private val bufferEndIndex: Long get() = head + bufferSize
363     private val queueEndIndex: Long get() = head + bufferSize + queueSize
364 
365     override val replayCache: List<T>
<lambda>null366         get() = synchronized(this) {
367             val replaySize = this.replaySize
368             if (replaySize == 0) return emptyList()
369             val result = ArrayList<T>(replaySize)
370             val buffer = buffer!! // must be allocated, because replaySize > 0
371             @Suppress("UNCHECKED_CAST")
372             for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
373             result
374         }
375 
376     /*
377      * A tweak for SubscriptionCountStateFlow to get the latest value.
378      */
379     @Suppress("UNCHECKED_CAST")
380     protected val lastReplayedLocked: T
381         get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
382 
383     @Suppress("UNCHECKED_CAST")
collectnull384     override suspend fun collect(collector: FlowCollector<T>): Nothing {
385         val slot = allocateSlot()
386         try {
387             if (collector is SubscribedFlowCollector) collector.onSubscription()
388             val collectorJob = currentCoroutineContext()[Job]
389             while (true) {
390                 var newValue: Any?
391                 while (true) {
392                     newValue = tryTakeValue(slot) // attempt no-suspend fast path first
393                     if (newValue !== NO_VALUE) break
394                     awaitValue(slot) // await signal that the new value is available
395                 }
396                 collectorJob?.ensureActive()
397                 collector.emit(newValue as T)
398             }
399         } finally {
400             freeSlot(slot)
401         }
402     }
403 
tryEmitnull404     override fun tryEmit(value: T): Boolean {
405         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
406         val emitted = synchronized(this) {
407             if (tryEmitLocked(value)) {
408                 resumes = findSlotsToResumeLocked(resumes)
409                 true
410             } else {
411                 false
412             }
413         }
414         for (cont in resumes) cont?.resume(Unit)
415         return emitted
416     }
417 
emitnull418     override suspend fun emit(value: T) {
419         if (tryEmit(value)) return // fast-path
420         emitSuspend(value)
421     }
422 
423     @Suppress("UNCHECKED_CAST")
tryEmitLockednull424     private fun tryEmitLocked(value: T): Boolean {
425         // Fast path without collectors -> no buffering
426         if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
427         // With collectors we'll have to buffer
428         // cannot emit now if buffer is full & blocked by slow collectors
429         if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
430             when (onBufferOverflow) {
431                 BufferOverflow.SUSPEND -> return false // will suspend
432                 BufferOverflow.DROP_LATEST -> return true // just drop incoming
433                 BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
434             }
435         }
436         enqueueLocked(value)
437         bufferSize++ // value was added to buffer
438         // drop oldest from the buffer if it became more than bufferCapacity
439         if (bufferSize > bufferCapacity) dropOldestLocked()
440         // keep replaySize not larger that needed
441         if (replaySize > replay) { // increment replayIndex by one
442             updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
443         }
444         return true
445     }
446 
tryEmitNoCollectorsLockednull447     private fun tryEmitNoCollectorsLocked(value: T): Boolean {
448         assert { nCollectors == 0 }
449         if (replay == 0) return true // no need to replay, just forget it now
450         enqueueLocked(value) // enqueue to replayCache
451         bufferSize++ // value was added to buffer
452         // drop oldest from the buffer if it became more than replay
453         if (bufferSize > replay) dropOldestLocked()
454         minCollectorIndex = head + bufferSize // a default value (max allowed)
455         return true
456     }
457 
dropOldestLockednull458     private fun dropOldestLocked() {
459         buffer!!.setBufferAt(head, null)
460         bufferSize--
461         val newHead = head + 1
462         if (replayIndex < newHead) replayIndex = newHead
463         if (minCollectorIndex < newHead) correctCollectorIndexesOnDropOldest(newHead)
464         assert { head == newHead } // since head = minOf(minCollectorIndex, replayIndex) it should have updated
465     }
466 
correctCollectorIndexesOnDropOldestnull467     private fun correctCollectorIndexesOnDropOldest(newHead: Long) {
468         forEachSlotLocked { slot ->
469             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
470             if (slot.index >= 0 && slot.index < newHead) {
471                 slot.index = newHead // force move it up (this collector was too slow and missed the value at its index)
472             }
473         }
474         minCollectorIndex = newHead
475     }
476 
477     // enqueues item to buffer array, caller shall increment either bufferSize or queueSize
enqueueLockednull478     private fun enqueueLocked(item: Any?) {
479         val curSize = totalSize
480         val buffer = when (val curBuffer = buffer) {
481             null -> growBuffer(null, 0, 2)
482             else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
483         }
484         buffer.setBufferAt(head + curSize, item)
485     }
486 
growBuffernull487     private fun growBuffer(curBuffer: Array<Any?>?, curSize: Int, newSize: Int): Array<Any?> {
488         check(newSize > 0) { "Buffer size overflow" }
489         val newBuffer = arrayOfNulls<Any?>(newSize).also { buffer = it }
490         if (curBuffer == null) return newBuffer
491         val head = head
492         for (i in 0 until curSize) {
493             newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
494         }
495         return newBuffer
496     }
497 
emitSuspendnull498     private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
499         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
500         val emitter = synchronized(this) lock@{
501             // recheck buffer under lock again (make sure it is really full)
502             if (tryEmitLocked(value)) {
503                 cont.resume(Unit)
504                 resumes = findSlotsToResumeLocked(resumes)
505                 return@lock null
506             }
507             // add suspended emitter to the buffer
508             Emitter(this, head + totalSize, value, cont).also {
509                 enqueueLocked(it)
510                 queueSize++ // added to queue of waiting emitters
511                 // synchronous shared flow might rendezvous with waiting emitter
512                 if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
513             }
514         }
515         // outside of the lock: register dispose on cancellation
516         emitter?.let { cont.disposeOnCancellation(it) }
517         // outside of the lock: resume slots if needed
518         for (r in resumes) r?.resume(Unit)
519     }
520 
<lambda>null521     private fun cancelEmitter(emitter: Emitter) = synchronized(this) {
522         if (emitter.index < head) return // already skipped past this index
523         val buffer = buffer!!
524         if (buffer.getBufferAt(emitter.index) !== emitter) return // already resumed
525         buffer.setBufferAt(emitter.index, NO_VALUE)
526         cleanupTailLocked()
527     }
528 
updateNewCollectorIndexLockednull529     internal fun updateNewCollectorIndexLocked(): Long {
530         val index = replayIndex
531         if (index < minCollectorIndex) minCollectorIndex = index
532         return index
533     }
534 
535     // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
updateCollectorIndexLockednull536     internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
537         assert { oldIndex >= minCollectorIndex }
538         if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min
539         // start computing new minimal index of active collectors
540         val head = head
541         var newMinCollectorIndex = head + bufferSize
542         // take into account a special case of sync shared flow that can go past 1st queued emitter
543         if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
544         forEachSlotLocked { slot ->
545             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
546             if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index
547         }
548         assert { newMinCollectorIndex >= minCollectorIndex } // can only grow
549         if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes
550         // Compute new buffer size if we drop items we no longer need and no emitter is resumed:
551         // We must keep all the items from newMinIndex to the end of buffer
552         var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed
553         val maxResumeCount = if (nCollectors > 0) {
554             // If we have collectors we can resume up to maxResumeCount waiting emitters
555             // a) queueSize -> that's how many waiting emitters we have
556             // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity
557             val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
558             minOf(queueSize, bufferCapacity - newBufferSize0)
559         } else {
560             // If we don't have collectors anymore we must resume all waiting emitters
561             queueSize // that's how many waiting emitters we have (at most)
562         }
563         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
564         val newQueueEndIndex = newBufferEndIndex + queueSize
565         if (maxResumeCount > 0) { // collect emitters to resume if we have them
566             resumes = arrayOfNulls(maxResumeCount)
567             var resumeCount = 0
568             val buffer = buffer!!
569             for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
570                 val emitter = buffer.getBufferAt(curEmitterIndex)
571                 if (emitter !== NO_VALUE) {
572                     emitter as Emitter // must have Emitter class
573                     resumes[resumeCount++] = emitter.cont
574                     buffer.setBufferAt(curEmitterIndex, NO_VALUE) // make as canceled if we moved ahead
575                     buffer.setBufferAt(newBufferEndIndex, emitter.value)
576                     newBufferEndIndex++
577                     if (resumeCount >= maxResumeCount) break // enough resumed, done
578                 }
579             }
580         }
581         // Compute new buffer size -> how many values we now actually have after resume
582         val newBufferSize1 = (newBufferEndIndex - head).toInt()
583         // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
584         // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
585         // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
586         // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
587         // expression, which coerces values that are too big anyway.
588         if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
589         // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
590         var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
591         // adjustment for synchronous case with cancelled emitter (NO_VALUE)
592         if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) {
593             newBufferEndIndex++
594             newReplayIndex++
595         }
596         // Update buffer state
597         updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
598         // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
599         cleanupTailLocked()
600         // We need to waken up suspended collectors if any emitters were resumed here
601         if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
602         return resumes
603     }
604 
updateBufferLockednull605     private fun updateBufferLocked(
606         newReplayIndex: Long,
607         newMinCollectorIndex: Long,
608         newBufferEndIndex: Long,
609         newQueueEndIndex: Long
610     ) {
611         // Compute new head value
612         val newHead = minOf(newMinCollectorIndex, newReplayIndex)
613         assert { newHead >= head }
614         // cleanup items we don't have to buffer anymore (because head is about to move)
615         for (index in head until newHead) buffer!!.setBufferAt(index, null)
616         // update all state variables to newly computed values
617         replayIndex = newReplayIndex
618         minCollectorIndex = newMinCollectorIndex
619         bufferSize = (newBufferEndIndex - newHead).toInt()
620         queueSize = (newQueueEndIndex - newBufferEndIndex).toInt()
621         // check our key invariants (just in case)
622         assert { bufferSize >= 0 }
623         assert { queueSize >= 0 }
624         assert { replayIndex <= this.head + bufferSize }
625     }
626 
627     // Removes all the NO_VALUE items from the end of the queue and reduces its size
cleanupTailLockednull628     private fun cleanupTailLocked() {
629         // If we have synchronous case, then keep one emitter queued
630         if (bufferCapacity == 0 && queueSize <= 1) return // return, don't clear it
631         val buffer = buffer!!
632         while (queueSize > 0 && buffer.getBufferAt(head + totalSize - 1) === NO_VALUE) {
633             queueSize--
634             buffer.setBufferAt(head + totalSize, null)
635         }
636     }
637 
638     // returns NO_VALUE if cannot take value without suspension
tryTakeValuenull639     private fun tryTakeValue(slot: SharedFlowSlot): Any? {
640         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
641         val value = synchronized(this) {
642             val index = tryPeekLocked(slot)
643             if (index < 0) {
644                 NO_VALUE
645             } else {
646                 val oldIndex = slot.index
647                 val newValue = getPeekedValueLockedAt(index)
648                 slot.index = index + 1 // points to the next index after peeked one
649                 resumes = updateCollectorIndexLocked(oldIndex)
650                 newValue
651             }
652         }
653         for (resume in resumes) resume?.resume(Unit)
654         return value
655     }
656 
657     // returns -1 if cannot peek value without suspension
tryPeekLockednull658     private fun tryPeekLocked(slot: SharedFlowSlot): Long {
659         // return buffered value if possible
660         val index = slot.index
661         if (index < bufferEndIndex) return index
662         if (bufferCapacity > 0) return -1L // if there's a buffer, never try to rendezvous with emitters
663         // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous
664         if (index > head) return -1L // ... but only with the first emitter (never look forward)
665         if (queueSize == 0) return -1L // nothing there to rendezvous with
666         return index // rendezvous with the first emitter
667     }
668 
getPeekedValueLockedAtnull669     private fun getPeekedValueLockedAt(index: Long): Any? =
670         when (val item = buffer!!.getBufferAt(index)) {
671             is Emitter -> item.value
672             else -> item
673         }
674 
awaitValuenull675     private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
676         synchronized(this) lock@{
677             val index = tryPeekLocked(slot) // recheck under this lock
678             if (index < 0) {
679                 slot.cont = cont // Ok -- suspending
680             } else {
681                 cont.resume(Unit) // has value, no need to suspend
682                 return@lock
683             }
684             slot.cont = cont // suspend, waiting
685         }
686     }
687 
findSlotsToResumeLockednull688     private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
689         var resumes: Array<Continuation<Unit>?> = resumesIn
690         var resumeCount = resumesIn.size
691         forEachSlotLocked loop@{ slot ->
692             val cont = slot.cont ?: return@loop // only waiting slots
693             if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
694             if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
695             resumes[resumeCount++] = cont
696             slot.cont = null // not waiting anymore
697         }
698         return resumes
699     }
700 
createSlotnull701     override fun createSlot() = SharedFlowSlot()
702     override fun createSlotArray(size: Int): Array<SharedFlowSlot?> = arrayOfNulls(size)
703 
704     override fun resetReplayCache() = synchronized(this) {
705         // Update buffer state
706         updateBufferLocked(
707             newReplayIndex = bufferEndIndex,
708             newMinCollectorIndex = minCollectorIndex,
709             newBufferEndIndex = bufferEndIndex,
710             newQueueEndIndex = queueEndIndex
711         )
712     }
713 
fusenull714     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
715         fuseSharedFlow(context, capacity, onBufferOverflow)
716 
717     private class Emitter(
718         @JvmField val flow: SharedFlowImpl<*>,
719         @JvmField var index: Long,
720         @JvmField val value: Any?,
721         @JvmField val cont: Continuation<Unit>
722     ) : DisposableHandle {
723         override fun dispose() = flow.cancelEmitter(this)
724     }
725 }
726 
727 @JvmField
728 internal val NO_VALUE = Symbol("NO_VALUE")
729 
Arraynull730 private fun Array<Any?>.getBufferAt(index: Long) = get(index.toInt() and (size - 1))
731 private fun Array<Any?>.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item)
732 
733 internal fun <T> SharedFlow<T>.fuseSharedFlow(
734     context: CoroutineContext,
735     capacity: Int,
736     onBufferOverflow: BufferOverflow
737 ): Flow<T> {
738     // context is irrelevant for shared flow and making additional rendezvous is meaningless
739     // however, additional non-trivial buffering after shared flow could make sense for very slow subscribers
740     if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
741         return this
742     }
743     // Apply channel flow operator as usual
744     return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
745 }
746