• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.internal.*
10 import kotlinx.coroutines.internal.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13 
14 /**
15  * A _hot_ [Flow] that shares emitted values among all its collectors in a broadcast fashion, so that all collectors
16  * get all emitted values. A shared flow is called _hot_ because its active instance exists independently of the
17  * presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function,
18  * which is _cold_ and is started separately for each collector.
19  *
20  * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and
21  * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
22  *
23  * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running
24  * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for
25  * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete,
26  * when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a
27  * shared flow to turn it into a completing one.
28  *
29  * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function.
30  * Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations.
31  * See the [MutableSharedFlow] documentation for details.
32  *
33  * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go.
34  * For example, the following class encapsulates an event bus that distributes events to all subscribers
35  * in a _rendezvous_ manner, suspending until all subscribers receive emitted event:
36  *
37  * ```
38  * class EventBus {
39  *     private val _events = MutableSharedFlow<Event>() // private mutable shared flow
40  *     val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
41  *
42  *     suspend fun produceEvent(event: Event) {
43  *         _events.emit(event) // suspends until all subscribers receive it
44  *     }
45  * }
46  * ```
47  *
48  * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function,
49  * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator.
50  *
51  * There is a specialized implementation of shared flow for the case where the most recent state value needs
52  * to be shared. See [StateFlow] for details.
53  *
54  * ### Replay cache and buffer
55  *
56  * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first
57  * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is
58  * specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache
59  * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function.
60  *
61  * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to
62  * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
63  * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
64  * using the `extraBufferCapacity` parameter.
65  *
66  * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
67  * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
68  * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
69  *
70  * **Buffer overflow condition can happen only when there is at least one subscriber that is not ready to accept
71  * the new value.**  In the absence of subscribers only the most recent `replay` values are stored and the buffer
72  * overflow behavior is never triggered and has no effect. In particular, in the absence of subscribers emitter never
73  * suspends despite [BufferOverflow.SUSPEND] option and [BufferOverflow.DROP_LATEST] option does not have effect either.
74  * Essentially, the behavior in the absence of subscribers is always similar to [BufferOverflow.DROP_OLDEST],
75  * but the buffer is just of `replay` size (without any `extraBufferCapacity`).
76  *
77  * ### Unbuffered shared flow
78  *
79  * A default implementation of a shared flow that is created with `MutableSharedFlow()` constructor function
80  * without parameters has no replay cache nor additional buffer.
81  * [emit][MutableSharedFlow.emit] call to such a shared flow suspends until all subscribers receive the emitted value
82  * and returns immediately if there are no subscribers.
83  * Thus, [tryEmit][MutableSharedFlow.tryEmit] call succeeds and returns `true` only if
84  * there are no subscribers (in which case the emitted value is immediately lost).
85  *
86  * ### SharedFlow vs BroadcastChannel
87  *
88  * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
89  * and is designed to completely replace it.
90  * It has the following important differences:
91  *
92  * * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
93  *    for faster and simpler implementation.
94  * * `SharedFlow` supports configurable replay and buffer overflow strategy.
95  * * `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow].
96  * * `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure.
97  *    All errors and completion signals should be explicitly _materialized_ if needed.
98  *
99  * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
100  * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
101  * values to new subscribers). Replace [send][BroadcastChannel.send] and [trySend][BroadcastChannel.trySend] calls
102  * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
103  *
104  * ### Concurrency
105  *
106  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
107  * external synchronization.
108  *
109  * ### Operator fusion
110  *
111  * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
112  * or [cancellable] operators to a shared flow has no effect.
113  *
114  * ### Implementation notes
115  *
116  * Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are
117  * resumed outside of this lock to avoid deadlocks when using unconfined coroutines. Adding new subscribers
118  * has `O(1)` amortized cost, but emitting has `O(N)` cost, where `N` is the number of subscribers.
119  *
120  * ### Not stable for inheritance
121  *
122  * **The `SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
123  * might be added to this interface in the future, but is stable for use.
124  * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
125  */
126 public interface SharedFlow<out T> : Flow<T> {
127     /**
128      * A snapshot of the replay cache.
129      */
130     public val replayCache: List<T>
131 
132     /**
133      * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
134      * To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }`
135      * SAM-conversion can be used.
136      *
137      * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
138      * on a shared flow never completes normally.
139      *
140      * @see [Flow.collect] for implementation and inheritance details.
141      */
142     override suspend fun collect(collector: FlowCollector<T>): Nothing
143 }
144 
145 /**
146  * A mutable [SharedFlow] that provides functions to [emit] values to the flow.
147  * An instance of `MutableSharedFlow` with the given configuration parameters can be created using `MutableSharedFlow(...)`
148  * constructor function.
149  *
150  * See the [SharedFlow] documentation for details on shared flows.
151  *
152  * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value,
153  * to [tryEmit] without suspension if possible, to track the [subscriptionCount],
154  * and to [resetReplayCache].
155  *
156  * ### Concurrency
157  *
158  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
159  * external synchronization.
160  *
161  * ### Not stable for inheritance
162  *
163  * **The `MutableSharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
164  * might be added to this interface in the future, but is stable for use.
165  * Use the `MutableSharedFlow(...)` constructor function to create an implementation.
166  */
167 public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
168     /**
169      * Emits a [value] to this shared flow, suspending on buffer overflow.
170      *
171      * This call can suspend only when the [BufferOverflow] strategy is
172      * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
173      *
174      * If there are no subscribers, the buffer is not used.
175      * Instead, the most recently emitted value is simply stored into
176      * the replay cache if one was configured, displacing the older elements there,
177      * or dropped if no replay cache was configured.
178      *
179      * See [tryEmit] for a non-suspending variant of this function.
180      *
181      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
182      * external synchronization.
183      */
emitnull184     override suspend fun emit(value: T)
185 
186     /**
187      * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
188      * emitted successfully (see below). When this function returns `false`, it means that a call to a plain [emit]
189      * function would suspend until there is buffer space available.
190      *
191      * This call can return `false` only when the [BufferOverflow] strategy is
192      * [SUSPEND][BufferOverflow.SUSPEND] **and** there are subscribers collecting this shared flow.
193      *
194      * If there are no subscribers, the buffer is not used.
195      * Instead, the most recently emitted value is simply stored into
196      * the replay cache if one was configured, displacing the older elements there,
197      * or dropped if no replay cache was configured. In any case, `tryEmit` returns `true`.
198      *
199      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
200      * external synchronization.
201      */
202     public fun tryEmit(value: T): Boolean
203 
204     /**
205      * The number of subscribers (active collectors) to this shared flow.
206      *
207      * The integer in the resulting [StateFlow] is not negative and starts with zero for a freshly created
208      * shared flow.
209      *
210      * This state can be used to react to changes in the number of subscriptions to this shared flow.
211      * For example, if you need to call `onActive` when the first subscriber appears and `onInactive`
212      * when the last one disappears, you can set it up like this:
213      *
214      * ```
215      * sharedFlow.subscriptionCount
216      *     .map { count -> count > 0 } // map count into active/inactive flag
217      *     .distinctUntilChanged() // only react to true<->false changes
218      *     .onEach { isActive -> // configure an action
219      *         if (isActive) onActive() else onInactive()
220      *     }
221      *     .launchIn(scope) // launch it
222      * ```
223      *
224      * Implementation note: the resulting flow **does not** conflate subscription count.
225      */
226     public val subscriptionCount: StateFlow<Int>
227 
228     /**
229      * Resets the [replayCache] of this shared flow to an empty state.
230      * New subscribers will be receiving only the values that were emitted after this call,
231      * while old subscribers will still be receiving previously buffered values.
232      * To reset a shared flow to an initial value, emit the value after this call.
233      *
234      * On a [MutableStateFlow], which always contains a single value, this function is not
235      * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
236      * to an initial value, just update its [value][MutableStateFlow.value].
237      *
238      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
239      * external synchronization.
240      *
241      * **Note: This is an experimental api.** This function may be removed or renamed in the future.
242      */
243     @ExperimentalCoroutinesApi
244     public fun resetReplayCache()
245 }
246 
247 /**
248  * Creates a [MutableSharedFlow] with the given configuration parameters.
249  *
250  * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
251  *
252  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
253  * @param extraBufferCapacity the number of values buffered in addition to `replay`.
254  *   [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
255  * @param onBufferOverflow configures an [emit][MutableSharedFlow.emit] action on buffer overflow. Optional, defaults to
256  *   [suspending][BufferOverflow.SUSPEND] attempts to emit a value.
257  *   Values other than [BufferOverflow.SUSPEND] are supported only when `replay > 0` or `extraBufferCapacity > 0`.
258  *   **Buffer overflow can happen only when there is at least one subscriber that is not ready to accept
259  *   the new value.** In the absence of subscribers only the most recent [replay] values are stored and
260  *   the buffer overflow behavior is never triggered and has no effect.
261  */
262 @Suppress("FunctionName", "UNCHECKED_CAST")
263 public fun <T> MutableSharedFlow(
264     replay: Int = 0,
265     extraBufferCapacity: Int = 0,
266     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
267 ): MutableSharedFlow<T> {
268     require(replay >= 0) { "replay cannot be negative, but was $replay" }
269     require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
270     require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
271         "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
272     }
273     val bufferCapacity0 = replay + extraBufferCapacity
274     val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
275     return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
276 }
277 
278 // ------------------------------------ Implementation ------------------------------------
279 
280 internal class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
281     @JvmField
282     var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
283 
284     @JvmField
285     var cont: Continuation<Unit>? = null // collector waiting for new value
286 
allocateLockednull287     override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
288         if (index >= 0) return false // not free
289         index = flow.updateNewCollectorIndexLocked()
290         return true
291     }
292 
freeLockednull293     override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
294         assert { index >= 0 }
295         val oldIndex = index
296         index = -1L
297         cont = null // cleanup continuation reference
298         return flow.updateCollectorIndexLocked(oldIndex)
299     }
300 }
301 
302 internal open class SharedFlowImpl<T>(
303     private val replay: Int,
304     private val bufferCapacity: Int,
305     private val onBufferOverflow: BufferOverflow
306 ) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
307     /*
308         Logical structure of the buffer
309 
310                   buffered values
311              /-----------------------\
312                           replayCache      queued emitters
313                           /----------\/----------------------\
314          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
315          |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
316          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
317                ^           ^           ^                      ^
318                |           |           |                      |
319               head         |      head + bufferSize     head + totalSize
320                |           |           |
321      index of the slowest  |    index of the fastest
322       possible collector   |     possible collector
323                |           |
324                |     replayIndex == new collector's index
325                \---------------------- /
326           range of possible minCollectorIndex
327 
328           head == minOf(minCollectorIndex, replayIndex) // by definition
329           totalSize == bufferSize + queueSize // by definition
330 
331        INVARIANTS:
332           minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
333           replayIndex <= head + bufferSize
334      */
335 
336     // Stored state
337     private var buffer: Array<Any?>? = null // allocated when needed, allocated size always power of two
338     private var replayIndex = 0L // minimal index from which new collector gets values
339     private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none
340     private var bufferSize = 0 // number of buffered values
341     private var queueSize = 0 // number of queued emitters
342 
343     // Computed state
344     private val head: Long get() = minOf(minCollectorIndex, replayIndex)
345     private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
346     private val totalSize: Int get() = bufferSize + queueSize
347     private val bufferEndIndex: Long get() = head + bufferSize
348     private val queueEndIndex: Long get() = head + bufferSize + queueSize
349 
350     override val replayCache: List<T>
<lambda>null351         get() = synchronized(this) {
352             val replaySize = this.replaySize
353             if (replaySize == 0) return emptyList()
354             val result = ArrayList<T>(replaySize)
355             val buffer = buffer!! // must be allocated, because replaySize > 0
356             @Suppress("UNCHECKED_CAST")
357             for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
358             result
359         }
360 
361     /*
362      * A tweak for SubscriptionCountStateFlow to get the latest value.
363      */
364     @Suppress("UNCHECKED_CAST")
365     protected val lastReplayedLocked: T
366         get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T
367 
368     @Suppress("UNCHECKED_CAST")
collectnull369     override suspend fun collect(collector: FlowCollector<T>): Nothing {
370         val slot = allocateSlot()
371         try {
372             if (collector is SubscribedFlowCollector) collector.onSubscription()
373             val collectorJob = currentCoroutineContext()[Job]
374             while (true) {
375                 var newValue: Any?
376                 while (true) {
377                     newValue = tryTakeValue(slot) // attempt no-suspend fast path first
378                     if (newValue !== NO_VALUE) break
379                     awaitValue(slot) // await signal that the new value is available
380                 }
381                 collectorJob?.ensureActive()
382                 collector.emit(newValue as T)
383             }
384         } finally {
385             freeSlot(slot)
386         }
387     }
388 
tryEmitnull389     override fun tryEmit(value: T): Boolean {
390         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
391         val emitted = synchronized(this) {
392             if (tryEmitLocked(value)) {
393                 resumes = findSlotsToResumeLocked(resumes)
394                 true
395             } else {
396                 false
397             }
398         }
399         for (cont in resumes) cont?.resume(Unit)
400         return emitted
401     }
402 
emitnull403     override suspend fun emit(value: T) {
404         if (tryEmit(value)) return // fast-path
405         emitSuspend(value)
406     }
407 
408     @Suppress("UNCHECKED_CAST")
tryEmitLockednull409     private fun tryEmitLocked(value: T): Boolean {
410         // Fast path without collectors -> no buffering
411         if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
412         // With collectors we'll have to buffer
413         // cannot emit now if buffer is full & blocked by slow collectors
414         if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
415             when (onBufferOverflow) {
416                 BufferOverflow.SUSPEND -> return false // will suspend
417                 BufferOverflow.DROP_LATEST -> return true // just drop incoming
418                 BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
419             }
420         }
421         enqueueLocked(value)
422         bufferSize++ // value was added to buffer
423         // drop oldest from the buffer if it became more than bufferCapacity
424         if (bufferSize > bufferCapacity) dropOldestLocked()
425         // keep replaySize not larger that needed
426         if (replaySize > replay) { // increment replayIndex by one
427             updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
428         }
429         return true
430     }
431 
tryEmitNoCollectorsLockednull432     private fun tryEmitNoCollectorsLocked(value: T): Boolean {
433         assert { nCollectors == 0 }
434         if (replay == 0) return true // no need to replay, just forget it now
435         enqueueLocked(value) // enqueue to replayCache
436         bufferSize++ // value was added to buffer
437         // drop oldest from the buffer if it became more than replay
438         if (bufferSize > replay) dropOldestLocked()
439         minCollectorIndex = head + bufferSize // a default value (max allowed)
440         return true
441     }
442 
dropOldestLockednull443     private fun dropOldestLocked() {
444         buffer!!.setBufferAt(head, null)
445         bufferSize--
446         val newHead = head + 1
447         if (replayIndex < newHead) replayIndex = newHead
448         if (minCollectorIndex < newHead) correctCollectorIndexesOnDropOldest(newHead)
449         assert { head == newHead } // since head = minOf(minCollectorIndex, replayIndex) it should have updated
450     }
451 
correctCollectorIndexesOnDropOldestnull452     private fun correctCollectorIndexesOnDropOldest(newHead: Long) {
453         forEachSlotLocked { slot ->
454             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
455             if (slot.index >= 0 && slot.index < newHead) {
456                 slot.index = newHead // force move it up (this collector was too slow and missed the value at its index)
457             }
458         }
459         minCollectorIndex = newHead
460     }
461 
462     // enqueues item to buffer array, caller shall increment either bufferSize or queueSize
enqueueLockednull463     private fun enqueueLocked(item: Any?) {
464         val curSize = totalSize
465         val buffer = when (val curBuffer = buffer) {
466             null -> growBuffer(null, 0, 2)
467             else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
468         }
469         buffer.setBufferAt(head + curSize, item)
470     }
471 
growBuffernull472     private fun growBuffer(curBuffer: Array<Any?>?, curSize: Int, newSize: Int): Array<Any?> {
473         check(newSize > 0) { "Buffer size overflow" }
474         val newBuffer = arrayOfNulls<Any?>(newSize).also { buffer = it }
475         if (curBuffer == null) return newBuffer
476         val head = head
477         for (i in 0 until curSize) {
478             newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
479         }
480         return newBuffer
481     }
482 
emitSuspendnull483     private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
484         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
485         val emitter = synchronized(this) lock@{
486             // recheck buffer under lock again (make sure it is really full)
487             if (tryEmitLocked(value)) {
488                 cont.resume(Unit)
489                 resumes = findSlotsToResumeLocked(resumes)
490                 return@lock null
491             }
492             // add suspended emitter to the buffer
493             Emitter(this, head + totalSize, value, cont).also {
494                 enqueueLocked(it)
495                 queueSize++ // added to queue of waiting emitters
496                 // synchronous shared flow might rendezvous with waiting emitter
497                 if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
498             }
499         }
500         // outside of the lock: register dispose on cancellation
501         emitter?.let { cont.disposeOnCancellation(it) }
502         // outside of the lock: resume slots if needed
503         for (r in resumes) r?.resume(Unit)
504     }
505 
<lambda>null506     private fun cancelEmitter(emitter: Emitter) = synchronized(this) {
507         if (emitter.index < head) return // already skipped past this index
508         val buffer = buffer!!
509         if (buffer.getBufferAt(emitter.index) !== emitter) return // already resumed
510         buffer.setBufferAt(emitter.index, NO_VALUE)
511         cleanupTailLocked()
512     }
513 
updateNewCollectorIndexLockednull514     internal fun updateNewCollectorIndexLocked(): Long {
515         val index = replayIndex
516         if (index < minCollectorIndex) minCollectorIndex = index
517         return index
518     }
519 
520     // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
updateCollectorIndexLockednull521     internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
522         assert { oldIndex >= minCollectorIndex }
523         if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min
524         // start computing new minimal index of active collectors
525         val head = head
526         var newMinCollectorIndex = head + bufferSize
527         // take into account a special case of sync shared flow that can go past 1st queued emitter
528         if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
529         forEachSlotLocked { slot ->
530             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
531             if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index
532         }
533         assert { newMinCollectorIndex >= minCollectorIndex } // can only grow
534         if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes
535         // Compute new buffer size if we drop items we no longer need and no emitter is resumed:
536         // We must keep all the items from newMinIndex to the end of buffer
537         var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed
538         val maxResumeCount = if (nCollectors > 0) {
539             // If we have collectors we can resume up to maxResumeCount waiting emitters
540             // a) queueSize -> that's how many waiting emitters we have
541             // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity
542             val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
543             minOf(queueSize, bufferCapacity - newBufferSize0)
544         } else {
545             // If we don't have collectors anymore we must resume all waiting emitters
546             queueSize // that's how many waiting emitters we have (at most)
547         }
548         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
549         val newQueueEndIndex = newBufferEndIndex + queueSize
550         if (maxResumeCount > 0) { // collect emitters to resume if we have them
551             resumes = arrayOfNulls(maxResumeCount)
552             var resumeCount = 0
553             val buffer = buffer!!
554             for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
555                 val emitter = buffer.getBufferAt(curEmitterIndex)
556                 if (emitter !== NO_VALUE) {
557                     emitter as Emitter // must have Emitter class
558                     resumes[resumeCount++] = emitter.cont
559                     buffer.setBufferAt(curEmitterIndex, NO_VALUE) // make as canceled if we moved ahead
560                     buffer.setBufferAt(newBufferEndIndex, emitter.value)
561                     newBufferEndIndex++
562                     if (resumeCount >= maxResumeCount) break // enough resumed, done
563                 }
564             }
565         }
566         // Compute new buffer size -> how many values we now actually have after resume
567         val newBufferSize1 = (newBufferEndIndex - head).toInt()
568         // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
569         // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
570         // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
571         // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
572         // expression, which coerces values that are too big anyway.
573         if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
574         // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
575         var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
576         // adjustment for synchronous case with cancelled emitter (NO_VALUE)
577         if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) {
578             newBufferEndIndex++
579             newReplayIndex++
580         }
581         // Update buffer state
582         updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
583         // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
584         cleanupTailLocked()
585         // We need to waken up suspended collectors if any emitters were resumed here
586         if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
587         return resumes
588     }
589 
updateBufferLockednull590     private fun updateBufferLocked(
591         newReplayIndex: Long,
592         newMinCollectorIndex: Long,
593         newBufferEndIndex: Long,
594         newQueueEndIndex: Long
595     ) {
596         // Compute new head value
597         val newHead = minOf(newMinCollectorIndex, newReplayIndex)
598         assert { newHead >= head }
599         // cleanup items we don't have to buffer anymore (because head is about to move)
600         for (index in head until newHead) buffer!!.setBufferAt(index, null)
601         // update all state variables to newly computed values
602         replayIndex = newReplayIndex
603         minCollectorIndex = newMinCollectorIndex
604         bufferSize = (newBufferEndIndex - newHead).toInt()
605         queueSize = (newQueueEndIndex - newBufferEndIndex).toInt()
606         // check our key invariants (just in case)
607         assert { bufferSize >= 0 }
608         assert { queueSize >= 0 }
609         assert { replayIndex <= this.head + bufferSize }
610     }
611 
612     // Removes all the NO_VALUE items from the end of the queue and reduces its size
cleanupTailLockednull613     private fun cleanupTailLocked() {
614         // If we have synchronous case, then keep one emitter queued
615         if (bufferCapacity == 0 && queueSize <= 1) return // return, don't clear it
616         val buffer = buffer!!
617         while (queueSize > 0 && buffer.getBufferAt(head + totalSize - 1) === NO_VALUE) {
618             queueSize--
619             buffer.setBufferAt(head + totalSize, null)
620         }
621     }
622 
623     // returns NO_VALUE if cannot take value without suspension
tryTakeValuenull624     private fun tryTakeValue(slot: SharedFlowSlot): Any? {
625         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
626         val value = synchronized(this) {
627             val index = tryPeekLocked(slot)
628             if (index < 0) {
629                 NO_VALUE
630             } else {
631                 val oldIndex = slot.index
632                 val newValue = getPeekedValueLockedAt(index)
633                 slot.index = index + 1 // points to the next index after peeked one
634                 resumes = updateCollectorIndexLocked(oldIndex)
635                 newValue
636             }
637         }
638         for (resume in resumes) resume?.resume(Unit)
639         return value
640     }
641 
642     // returns -1 if cannot peek value without suspension
tryPeekLockednull643     private fun tryPeekLocked(slot: SharedFlowSlot): Long {
644         // return buffered value if possible
645         val index = slot.index
646         if (index < bufferEndIndex) return index
647         if (bufferCapacity > 0) return -1L // if there's a buffer, never try to rendezvous with emitters
648         // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous
649         if (index > head) return -1L // ... but only with the first emitter (never look forward)
650         if (queueSize == 0) return -1L // nothing there to rendezvous with
651         return index // rendezvous with the first emitter
652     }
653 
getPeekedValueLockedAtnull654     private fun getPeekedValueLockedAt(index: Long): Any? =
655         when (val item = buffer!!.getBufferAt(index)) {
656             is Emitter -> item.value
657             else -> item
658         }
659 
awaitValuenull660     private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
661         synchronized(this) lock@{
662             val index = tryPeekLocked(slot) // recheck under this lock
663             if (index < 0) {
664                 slot.cont = cont // Ok -- suspending
665             } else {
666                 cont.resume(Unit) // has value, no need to suspend
667                 return@lock
668             }
669             slot.cont = cont // suspend, waiting
670         }
671     }
672 
findSlotsToResumeLockednull673     private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
674         var resumes: Array<Continuation<Unit>?> = resumesIn
675         var resumeCount = resumesIn.size
676         forEachSlotLocked loop@{ slot ->
677             val cont = slot.cont ?: return@loop // only waiting slots
678             if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
679             if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
680             resumes[resumeCount++] = cont
681             slot.cont = null // not waiting anymore
682         }
683         return resumes
684     }
685 
createSlotnull686     override fun createSlot() = SharedFlowSlot()
687     override fun createSlotArray(size: Int): Array<SharedFlowSlot?> = arrayOfNulls(size)
688 
689     override fun resetReplayCache() = synchronized(this) {
690         // Update buffer state
691         updateBufferLocked(
692             newReplayIndex = bufferEndIndex,
693             newMinCollectorIndex = minCollectorIndex,
694             newBufferEndIndex = bufferEndIndex,
695             newQueueEndIndex = queueEndIndex
696         )
697     }
698 
fusenull699     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
700         fuseSharedFlow(context, capacity, onBufferOverflow)
701 
702     private class Emitter(
703         @JvmField val flow: SharedFlowImpl<*>,
704         @JvmField var index: Long,
705         @JvmField val value: Any?,
706         @JvmField val cont: Continuation<Unit>
707     ) : DisposableHandle {
708         override fun dispose() = flow.cancelEmitter(this)
709     }
710 }
711 
712 @JvmField
713 internal val NO_VALUE = Symbol("NO_VALUE")
714 
Arraynull715 private fun Array<Any?>.getBufferAt(index: Long) = get(index.toInt() and (size - 1))
716 private fun Array<Any?>.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item)
717 
718 internal fun <T> SharedFlow<T>.fuseSharedFlow(
719     context: CoroutineContext,
720     capacity: Int,
721     onBufferOverflow: BufferOverflow
722 ): Flow<T> {
723     // context is irrelevant for shared flow and making additional rendezvous is meaningless
724     // however, additional non-trivial buffering after shared flow could make sense for very slow subscribers
725     if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
726         return this
727     }
728     // Apply channel flow operator as usual
729     return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
730 }
731