• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.channels.*
9 import kotlinx.coroutines.flow.internal.*
10 import kotlinx.coroutines.internal.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13 import kotlin.native.concurrent.*
14 
15 /**
16  * A _hot_ [Flow] that shares emitted values among all its collectors in a broadcast fashion, so that all collectors
17  * get all emitted values. A shared flow is called _hot_ because its active instance exists independently of the
18  * presence of collectors. This is opposed to a regular [Flow], such as defined by the [`flow { ... }`][flow] function,
19  * which is _cold_ and is started separately for each collector.
20  *
21  * **Shared flow never completes**. A call to [Flow.collect] on a shared flow never completes normally, and
22  * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a shared flow is called a _subscriber_.
23  *
24  * A subscriber of a shared flow can be cancelled. This usually happens when the scope in which the coroutine is running
25  * is cancelled. A subscriber to a shared flow is always [cancellable][Flow.cancellable], and checks for
26  * cancellation before each emission. Note that most terminal operators like [Flow.toList] would also not complete,
27  * when applied to a shared flow, but flow-truncating operators like [Flow.take] and [Flow.takeWhile] can be used on a
28  * shared flow to turn it into a completing one.
29  *
30  * A [mutable shared flow][MutableSharedFlow] is created using the [MutableSharedFlow(...)] constructor function.
31  * Its state can be updated by [emitting][MutableSharedFlow.emit] values to it and performing other operations.
32  * See the [MutableSharedFlow] documentation for details.
33  *
34  * [SharedFlow] is useful for broadcasting events that happen inside an application to subscribers that can come and go.
35  * For example, the following class encapsulates an event bus that distributes events to all subscribers
36  * in a _rendezvous_ manner, suspending until all subscribers process each event:
37  *
38  * ```
39  * class EventBus {
40  *     private val _events = MutableSharedFlow<Event>() // private mutable shared flow
41  *     val events = _events.asSharedFlow() // publicly exposed as read-only shared flow
42  *
43  *     suspend fun produceEvent(event: Event) {
44  *         _events.emit(event) // suspends until all subscribers receive it
45  *     }
46  * }
47  * ```
48  *
49  * As an alternative to the above usage with the `MutableSharedFlow(...)` constructor function,
50  * any _cold_ [Flow] can be converted to a shared flow using the [shareIn] operator.
51  *
52  * There is a specialized implementation of shared flow for the case where the most recent state value needs
53  * to be shared. See [StateFlow] for details.
54  *
55  * ### Replay cache and buffer
56  *
57  * A shared flow keeps a specific number of the most recent values in its _replay cache_. Every new subscriber first
58  * gets the values from the replay cache and then gets new emitted values. The maximum size of the replay cache is
59  * specified when the shared flow is created by the `replay` parameter. A snapshot of the current replay cache
60  * is available via the [replayCache] property and it can be reset with the [MutableSharedFlow.resetReplayCache] function.
61  *
62  * A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to
63  * get values from the buffer without suspending emitters. The buffer space determines how much slow subscribers
64  * can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved
65  * using the `extraBufferCapacity` parameter.
66  *
67  * A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using
68  * the `onBufferOverflow` parameter, which is equal to one of the entries of the [BufferOverflow] enum. When a strategy other
69  * than [SUSPENDED][BufferOverflow.SUSPEND] is configured, emissions to the shared flow never suspend.
70  *
71  * ### SharedFlow vs BroadcastChannel
72  *
73  * Conceptually shared flow is similar to [BroadcastChannel][BroadcastChannel]
74  * and is designed to completely replace `BroadcastChannel` in the future.
75  * It has the following important differences:
76  *
77  * * `SharedFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
78  *    for faster and simpler implementation.
79  * * `SharedFlow` supports configurable replay and buffer overflow strategy.
80  * * `SharedFlow` has a clear separation into a read-only `SharedFlow` interface and a [MutableSharedFlow].
81  * * `SharedFlow` cannot be closed like `BroadcastChannel` and can never represent a failure.
82  *    All errors and completion signals should be explicitly _materialized_ if needed.
83  *
84  * To migrate [BroadcastChannel] usage to [SharedFlow], start by replacing usages of the `BroadcastChannel(capacity)`
85  * constructor with `MutableSharedFlow(0, extraBufferCapacity=capacity)` (broadcast channel does not replay
86  * values to new subscribers). Replace [send][BroadcastChannel.send] and [offer][BroadcastChannel.offer] calls
87  * with [emit][MutableStateFlow.emit] and [tryEmit][MutableStateFlow.tryEmit], and convert subscribers' code to flow operators.
88  *
89  * ### Concurrency
90  *
91  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
92  * external synchronization.
93  *
94  * ### Operator fusion
95  *
96  * Application of [flowOn][Flow.flowOn], [buffer] with [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
97  * or [cancellable] operators to a shared flow has no effect.
98  *
99  * ### Implementation notes
100  *
101  * Shared flow implementation uses a lock to ensure thread-safety, but suspending collector and emitter coroutines are
102  * resumed outside of this lock to avoid dead-locks when using unconfined coroutines. Adding new subscribers
103  * has `O(1)` amortized cost, but emitting has `O(N)` cost, where `N` is the number of subscribers.
104  *
105  * ### Not stable for inheritance
106  *
107  * **The `SharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
108  * might be added to this interface in the future, but is stable for use.
109  * Use the `MutableSharedFlow(replay, ...)` constructor function to create an implementation.
110  */
111 public interface SharedFlow<out T> : Flow<T> {
112     /**
113      * A snapshot of the replay cache.
114      */
115     public val replayCache: List<T>
116 }
117 
118 /**
119  * A mutable [SharedFlow] that provides functions to [emit] values to the flow.
120  * An instance of `MutableSharedFlow` with the given configuration parameters can be created using `MutableSharedFlow(...)`
121  * constructor function.
122  *
123  * See the [SharedFlow] documentation for details on shared flows.
124  *
125  * `MutableSharedFlow` is a [SharedFlow] that also provides the abilities to [emit] a value,
126  * to [tryEmit] without suspension if possible, to track the [subscriptionCount],
127  * and to [resetReplayCache].
128  *
129  * ### Concurrency
130  *
131  * All methods of shared flow are **thread-safe** and can be safely invoked from concurrent coroutines without
132  * external synchronization.
133  *
134  * ### Not stable for inheritance
135  *
136  * **The `MutableSharedFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
137  * might be added to this interface in the future, but is stable for use.
138  * Use the `MutableSharedFlow(...)` constructor function to create an implementation.
139  */
140 public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
141     /**
142      * Tries to emit a [value] to this shared flow without suspending. It returns `true` if the value was
143      * emitted successfully. When this function returns `false`, it means that the call to a plain [emit]
144      * function will suspend until there is a buffer space available.
145      *
146      * A shared flow configured with a [BufferOverflow] strategy other than [SUSPEND][BufferOverflow.SUSPEND]
147      * (either [DROP_OLDEST][BufferOverflow.DROP_OLDEST] or [DROP_LATEST][BufferOverflow.DROP_LATEST]) never
148      * suspends on [emit], and thus `tryEmit` to such a shared flow always returns `true`.
149      */
tryEmitnull150     public fun tryEmit(value: T): Boolean
151 
152     /**
153      * The number of subscribers (active collectors) to this shared flow.
154      *
155      * The integer in the resulting [StateFlow] is not negative and starts with zero for a freshly created
156      * shared flow.
157      *
158      * This state can be used to react to changes in the number of subscriptions to this shared flow.
159      * For example, if you need to call `onActive` when the first subscriber appears and `onInactive`
160      * when the last one disappears, you can set it up like this:
161      *
162      * ```
163      * sharedFlow.subscriptionCount
164      *     .map { count -> count > 0 } // map count into active/inactive flag
165      *     .distinctUntilChanged() // only react to true<->false changes
166      *     .onEach { isActive -> // configure an action
167      *         if (isActive) onActive() else onInactive()
168      *     }
169      *     .launchIn(scope) // launch it
170      * ```
171      */
172     public val subscriptionCount: StateFlow<Int>
173 
174     /**
175      * Resets the [replayCache] of this shared flow to an empty state.
176      * New subscribers will be receiving only the values that were emitted after this call,
177      * while old subscribers will still be receiving previously buffered values.
178      * To reset a shared flow to an initial value, emit the value after this call.
179      *
180      * On a [MutableStateFlow], which always contains a single value, this function is not
181      * supported, and throws an [UnsupportedOperationException]. To reset a [MutableStateFlow]
182      * to an initial value, just update its [value][MutableStateFlow.value].
183      *
184      * **Note: This is an experimental api.** This function may be removed or renamed in the future.
185      */
186     @ExperimentalCoroutinesApi
187     public fun resetReplayCache()
188 }
189 
190 /**
191  * Creates a [MutableSharedFlow] with the given configuration parameters.
192  *
193  * This function throws [IllegalArgumentException] on unsupported values of parameters or combinations thereof.
194  *
195  * @param replay the number of values replayed to new subscribers (cannot be negative, defaults to zero).
196  * @param extraBufferCapacity the number of values buffered in addition to `replay`.
197  *   [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
198  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
199  *   [suspending][BufferOverflow.SUSPEND] attempts to [emit][MutableSharedFlow.emit] a value,
200  *   supported only when `replay > 0` or `extraBufferCapacity > 0`).
201  */
202 @Suppress("FunctionName", "UNCHECKED_CAST")
203 public fun <T> MutableSharedFlow(
204     replay: Int = 0,
205     extraBufferCapacity: Int = 0,
206     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
207 ): MutableSharedFlow<T> {
208     require(replay >= 0) { "replay cannot be negative, but was $replay" }
209     require(extraBufferCapacity >= 0) { "extraBufferCapacity cannot be negative, but was $extraBufferCapacity" }
210     require(replay > 0 || extraBufferCapacity > 0 || onBufferOverflow == BufferOverflow.SUSPEND) {
211         "replay or extraBufferCapacity must be positive with non-default onBufferOverflow strategy $onBufferOverflow"
212     }
213     val bufferCapacity0 = replay + extraBufferCapacity
214     val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
215     return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
216 }
217 
218 // ------------------------------------ Implementation ------------------------------------
219 
220 private class SharedFlowSlot : AbstractSharedFlowSlot<SharedFlowImpl<*>>() {
221     @JvmField
222     var index = -1L // current "to-be-emitted" index, -1 means the slot is free now
223 
224     @JvmField
225     var cont: Continuation<Unit>? = null // collector waiting for new value
226 
allocateLockednull227     override fun allocateLocked(flow: SharedFlowImpl<*>): Boolean {
228         if (index >= 0) return false // not free
229         index = flow.updateNewCollectorIndexLocked()
230         return true
231     }
232 
freeLockednull233     override fun freeLocked(flow: SharedFlowImpl<*>): Array<Continuation<Unit>?> {
234         assert { index >= 0 }
235         val oldIndex = index
236         index = -1L
237         cont = null // cleanup continuation reference
238         return flow.updateCollectorIndexLocked(oldIndex)
239     }
240 }
241 
242 private class SharedFlowImpl<T>(
243     private val replay: Int,
244     private val bufferCapacity: Int,
245     private val onBufferOverflow: BufferOverflow
246 ) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
247     /*
248         Logical structure of the buffer
249 
250                   buffered values
251              /-----------------------\
252                           replayCache      queued emitters
253                           /----------\/----------------------\
254          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
255          |   | 1 | 2 | 3 | 4 | 5 | 6 | E | E | E | E | E | E |   |   |   |
256          +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
257                ^           ^           ^                      ^
258                |           |           |                      |
259               head         |      head + bufferSize     head + totalSize
260                |           |           |
261      index of the slowest  |    index of the fastest
262       possible collector   |     possible collector
263                |           |
264                |     replayIndex == new collector's index
265                \---------------------- /
266           range of possible minCollectorIndex
267 
268           head == minOf(minCollectorIndex, replayIndex) // by definition
269           totalSize == bufferSize + queueSize // by definition
270 
271        INVARIANTS:
272           minCollectorIndex = activeSlots.minOf { it.index } ?: (head + bufferSize)
273           replayIndex <= head + bufferSize
274      */
275 
276     // Stored state
277     private var buffer: Array<Any?>? = null // allocated when needed, allocated size always power of two
278     private var replayIndex = 0L // minimal index from which new collector gets values
279     private var minCollectorIndex = 0L // minimal index of active collectors, equal to replayIndex if there are none
280     private var bufferSize = 0 // number of buffered values
281     private var queueSize = 0 // number of queued emitters
282 
283     // Computed state
284     private val head: Long get() = minOf(minCollectorIndex, replayIndex)
285     private val replaySize: Int get() = (head + bufferSize - replayIndex).toInt()
286     private val totalSize: Int get() = bufferSize + queueSize
287     private val bufferEndIndex: Long get() = head + bufferSize
288     private val queueEndIndex: Long get() = head + bufferSize + queueSize
289 
290     override val replayCache: List<T>
<lambda>null291         get() = synchronized(this) {
292             val replaySize = this.replaySize
293             if (replaySize == 0) return emptyList()
294             val result = ArrayList<T>(replaySize)
295             val buffer = buffer!! // must be allocated, because replaySize > 0
296             @Suppress("UNCHECKED_CAST")
297             for (i in 0 until replaySize) result += buffer.getBufferAt(replayIndex + i) as T
298             result
299         }
300 
301     @Suppress("UNCHECKED_CAST")
collectnull302     override suspend fun collect(collector: FlowCollector<T>) {
303         val slot = allocateSlot()
304         try {
305             if (collector is SubscribedFlowCollector) collector.onSubscription()
306             val collectorJob = currentCoroutineContext()[Job]
307             while (true) {
308                 var newValue: Any?
309                 while (true) {
310                     newValue = tryTakeValue(slot) // attempt no-suspend fast path first
311                     if (newValue !== NO_VALUE) break
312                     awaitValue(slot) // await signal that the new value is available
313                 }
314                 collectorJob?.ensureActive()
315                 collector.emit(newValue as T)
316             }
317         } finally {
318             freeSlot(slot)
319         }
320     }
321 
tryEmitnull322     override fun tryEmit(value: T): Boolean {
323         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
324         val emitted = synchronized(this) {
325             if (tryEmitLocked(value)) {
326                 resumes = findSlotsToResumeLocked(resumes)
327                 true
328             } else {
329                 false
330             }
331         }
332         for (cont in resumes) cont?.resume(Unit)
333         return emitted
334     }
335 
emitnull336     override suspend fun emit(value: T) {
337         if (tryEmit(value)) return // fast-path
338         emitSuspend(value)
339     }
340 
341     @Suppress("UNCHECKED_CAST")
tryEmitLockednull342     private fun tryEmitLocked(value: T): Boolean {
343         // Fast path without collectors -> no buffering
344         if (nCollectors == 0) return tryEmitNoCollectorsLocked(value) // always returns true
345         // With collectors we'll have to buffer
346         // cannot emit now if buffer is full & blocked by slow collectors
347         if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
348             when (onBufferOverflow) {
349                 BufferOverflow.SUSPEND -> return false // will suspend
350                 BufferOverflow.DROP_LATEST -> return true // just drop incoming
351                 BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
352             }
353         }
354         enqueueLocked(value)
355         bufferSize++ // value was added to buffer
356         // drop oldest from the buffer if it became more than bufferCapacity
357         if (bufferSize > bufferCapacity) dropOldestLocked()
358         // keep replaySize not larger that needed
359         if (replaySize > replay) { // increment replayIndex by one
360             updateBufferLocked(replayIndex + 1, minCollectorIndex, bufferEndIndex, queueEndIndex)
361         }
362         return true
363     }
364 
tryEmitNoCollectorsLockednull365     private fun tryEmitNoCollectorsLocked(value: T): Boolean {
366         assert { nCollectors == 0 }
367         if (replay == 0) return true // no need to replay, just forget it now
368         enqueueLocked(value) // enqueue to replayCache
369         bufferSize++ // value was added to buffer
370         // drop oldest from the buffer if it became more than replay
371         if (bufferSize > replay) dropOldestLocked()
372         minCollectorIndex = head + bufferSize // a default value (max allowed)
373         return true
374     }
375 
dropOldestLockednull376     private fun dropOldestLocked() {
377         buffer!!.setBufferAt(head, null)
378         bufferSize--
379         val newHead = head + 1
380         if (replayIndex < newHead) replayIndex = newHead
381         if (minCollectorIndex < newHead) correctCollectorIndexesOnDropOldest(newHead)
382         assert { head == newHead } // since head = minOf(minCollectorIndex, replayIndex) it should have updated
383     }
384 
correctCollectorIndexesOnDropOldestnull385     private fun correctCollectorIndexesOnDropOldest(newHead: Long) {
386         forEachSlotLocked { slot ->
387             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
388             if (slot.index >= 0 && slot.index < newHead) {
389                 slot.index = newHead // force move it up (this collector was too slow and missed the value at its index)
390             }
391         }
392         minCollectorIndex = newHead
393     }
394 
395     // enqueues item to buffer array, caller shall increment either bufferSize or queueSize
enqueueLockednull396     private fun enqueueLocked(item: Any?) {
397         val curSize = totalSize
398         val buffer = when (val curBuffer = buffer) {
399             null -> growBuffer(null, 0, 2)
400             else -> if (curSize >= curBuffer.size) growBuffer(curBuffer, curSize,curBuffer.size * 2) else curBuffer
401         }
402         buffer.setBufferAt(head + curSize, item)
403     }
404 
growBuffernull405     private fun growBuffer(curBuffer: Array<Any?>?, curSize: Int, newSize: Int): Array<Any?> {
406         check(newSize > 0) { "Buffer size overflow" }
407         val newBuffer = arrayOfNulls<Any?>(newSize).also { buffer = it }
408         if (curBuffer == null) return newBuffer
409         val head = head
410         for (i in 0 until curSize) {
411             newBuffer.setBufferAt(head + i, curBuffer.getBufferAt(head + i))
412         }
413         return newBuffer
414     }
415 
emitSuspendnull416     private suspend fun emitSuspend(value: T) = suspendCancellableCoroutine<Unit> sc@{ cont ->
417         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
418         val emitter = synchronized(this) lock@{
419             // recheck buffer under lock again (make sure it is really full)
420             if (tryEmitLocked(value)) {
421                 cont.resume(Unit)
422                 resumes = findSlotsToResumeLocked(resumes)
423                 return@lock null
424             }
425             // add suspended emitter to the buffer
426             Emitter(this, head + totalSize, value, cont).also {
427                 enqueueLocked(it)
428                 queueSize++ // added to queue of waiting emitters
429                 // synchronous shared flow might rendezvous with waiting emitter
430                 if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
431             }
432         }
433         // outside of the lock: register dispose on cancellation
434         emitter?.let { cont.disposeOnCancellation(it) }
435         // outside of the lock: resume slots if needed
436         for (cont in resumes) cont?.resume(Unit)
437     }
438 
<lambda>null439     private fun cancelEmitter(emitter: Emitter) = synchronized(this) {
440         if (emitter.index < head) return // already skipped past this index
441         val buffer = buffer!!
442         if (buffer.getBufferAt(emitter.index) !== emitter) return // already resumed
443         buffer.setBufferAt(emitter.index, NO_VALUE)
444         cleanupTailLocked()
445     }
446 
updateNewCollectorIndexLockednull447     internal fun updateNewCollectorIndexLocked(): Long {
448         val index = replayIndex
449         if (index < minCollectorIndex) minCollectorIndex = index
450         return index
451     }
452 
453     // Is called when a collector disappears or changes index, returns a list of continuations to resume after lock
updateCollectorIndexLockednull454     internal fun updateCollectorIndexLocked(oldIndex: Long): Array<Continuation<Unit>?> {
455         assert { oldIndex >= minCollectorIndex }
456         if (oldIndex > minCollectorIndex) return EMPTY_RESUMES // nothing changes, it was not min
457         // start computing new minimal index of active collectors
458         val head = head
459         var newMinCollectorIndex = head + bufferSize
460         // take into account a special case of sync shared flow that can go past 1st queued emitter
461         if (bufferCapacity == 0 && queueSize > 0) newMinCollectorIndex++
462         forEachSlotLocked { slot ->
463             @Suppress("ConvertTwoComparisonsToRangeCheck") // Bug in JS backend
464             if (slot.index >= 0 && slot.index < newMinCollectorIndex) newMinCollectorIndex = slot.index
465         }
466         assert { newMinCollectorIndex >= minCollectorIndex } // can only grow
467         if (newMinCollectorIndex <= minCollectorIndex) return EMPTY_RESUMES // nothing changes
468         // Compute new buffer size if we drop items we no longer need and no emitter is resumed:
469         // We must keep all the items from newMinIndex to the end of buffer
470         var newBufferEndIndex = bufferEndIndex // var to grow when waiters are resumed
471         val maxResumeCount = if (nCollectors > 0) {
472             // If we have collectors we can resume up to maxResumeCount waiting emitters
473             // a) queueSize -> that's how many waiting emitters we have
474             // b) bufferCapacity - newBufferSize0 -> that's how many we can afford to resume to add w/o exceeding bufferCapacity
475             val newBufferSize0 = (newBufferEndIndex - newMinCollectorIndex).toInt()
476             minOf(queueSize, bufferCapacity - newBufferSize0)
477         } else {
478             // If we don't have collectors anymore we must resume all waiting emitters
479             queueSize // that's how many waiting emitters we have (at most)
480         }
481         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
482         val newQueueEndIndex = newBufferEndIndex + queueSize
483         if (maxResumeCount > 0) { // collect emitters to resume if we have them
484             resumes = arrayOfNulls(maxResumeCount)
485             var resumeCount = 0
486             val buffer = buffer!!
487             for (curEmitterIndex in newBufferEndIndex until newQueueEndIndex) {
488                 val emitter = buffer.getBufferAt(curEmitterIndex)
489                 if (emitter !== NO_VALUE) {
490                     emitter as Emitter // must have Emitter class
491                     resumes[resumeCount++] = emitter.cont
492                     buffer.setBufferAt(curEmitterIndex, NO_VALUE) // make as canceled if we moved ahead
493                     buffer.setBufferAt(newBufferEndIndex, emitter.value)
494                     newBufferEndIndex++
495                     if (resumeCount >= maxResumeCount) break // enough resumed, done
496                 }
497             }
498         }
499         // Compute new buffer size -> how many values we now actually have after resume
500         val newBufferSize1 = (newBufferEndIndex - head).toInt()
501         // Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
502         // and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
503         // forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
504         // too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
505         // expression, which coerces values that are too big anyway.
506         if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
507         // Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
508         var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
509         // adjustment for synchronous case with cancelled emitter (NO_VALUE)
510         if (bufferCapacity == 0 && newReplayIndex < newQueueEndIndex && buffer!!.getBufferAt(newReplayIndex) == NO_VALUE) {
511             newBufferEndIndex++
512             newReplayIndex++
513         }
514         // Update buffer state
515         updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
516         // just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
517         cleanupTailLocked()
518         // We need to waken up suspended collectors if any emitters were resumed here
519         if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
520         return resumes
521     }
522 
updateBufferLockednull523     private fun updateBufferLocked(
524         newReplayIndex: Long,
525         newMinCollectorIndex: Long,
526         newBufferEndIndex: Long,
527         newQueueEndIndex: Long
528     ) {
529         // Compute new head value
530         val newHead = minOf(newMinCollectorIndex, newReplayIndex)
531         assert { newHead >= head }
532         // cleanup items we don't have to buffer anymore (because head is about to move)
533         for (index in head until newHead) buffer!!.setBufferAt(index, null)
534         // update all state variables to newly computed values
535         replayIndex = newReplayIndex
536         minCollectorIndex = newMinCollectorIndex
537         bufferSize = (newBufferEndIndex - newHead).toInt()
538         queueSize = (newQueueEndIndex - newBufferEndIndex).toInt()
539         // check our key invariants (just in case)
540         assert { bufferSize >= 0 }
541         assert { queueSize >= 0 }
542         assert { replayIndex <= this.head + bufferSize }
543     }
544 
545     // Removes all the NO_VALUE items from the end of the queue and reduces its size
cleanupTailLockednull546     private fun cleanupTailLocked() {
547         // If we have synchronous case, then keep one emitter queued
548         if (bufferCapacity == 0 && queueSize <= 1) return // return, don't clear it
549         val buffer = buffer!!
550         while (queueSize > 0 && buffer.getBufferAt(head + totalSize - 1) === NO_VALUE) {
551             queueSize--
552             buffer.setBufferAt(head + totalSize, null)
553         }
554     }
555 
556     // returns NO_VALUE if cannot take value without suspension
tryTakeValuenull557     private fun tryTakeValue(slot: SharedFlowSlot): Any? {
558         var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
559         val value = synchronized(this) {
560             val index = tryPeekLocked(slot)
561             if (index < 0) {
562                 NO_VALUE
563             } else {
564                 val oldIndex = slot.index
565                 val newValue = getPeekedValueLockedAt(index)
566                 slot.index = index + 1 // points to the next index after peeked one
567                 resumes = updateCollectorIndexLocked(oldIndex)
568                 newValue
569             }
570         }
571         for (resume in resumes) resume?.resume(Unit)
572         return value
573     }
574 
575     // returns -1 if cannot peek value without suspension
tryPeekLockednull576     private fun tryPeekLocked(slot: SharedFlowSlot): Long {
577         // return buffered value if possible
578         val index = slot.index
579         if (index < bufferEndIndex) return index
580         if (bufferCapacity > 0) return -1L // if there's a buffer, never try to rendezvous with emitters
581         // Synchronous shared flow (bufferCapacity == 0) tries to rendezvous
582         if (index > head) return -1L // ... but only with the first emitter (never look forward)
583         if (queueSize == 0) return -1L // nothing there to rendezvous with
584         return index // rendezvous with the first emitter
585     }
586 
getPeekedValueLockedAtnull587     private fun getPeekedValueLockedAt(index: Long): Any? =
588         when (val item = buffer!!.getBufferAt(index)) {
589             is Emitter -> item.value
590             else -> item
591         }
592 
awaitValuenull593     private suspend fun awaitValue(slot: SharedFlowSlot): Unit = suspendCancellableCoroutine { cont ->
594         synchronized(this) lock@{
595             val index = tryPeekLocked(slot) // recheck under this lock
596             if (index < 0) {
597                 slot.cont = cont // Ok -- suspending
598             } else {
599                 cont.resume(Unit) // has value, no need to suspend
600                 return@lock
601             }
602             slot.cont = cont // suspend, waiting
603         }
604     }
605 
findSlotsToResumeLockednull606     private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
607         var resumes: Array<Continuation<Unit>?> = resumesIn
608         var resumeCount = resumesIn.size
609         forEachSlotLocked loop@{ slot ->
610             val cont = slot.cont ?: return@loop // only waiting slots
611             if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
612             if (resumeCount >= resumes.size) resumes = resumes.copyOf(maxOf(2, 2 * resumes.size))
613             resumes[resumeCount++] = cont
614             slot.cont = null // not waiting anymore
615         }
616         return resumes
617     }
618 
createSlotnull619     override fun createSlot() = SharedFlowSlot()
620     override fun createSlotArray(size: Int): Array<SharedFlowSlot?> = arrayOfNulls(size)
621 
622     override fun resetReplayCache() = synchronized(this) {
623         // Update buffer state
624         updateBufferLocked(
625             newReplayIndex = bufferEndIndex,
626             newMinCollectorIndex = minCollectorIndex,
627             newBufferEndIndex = bufferEndIndex,
628             newQueueEndIndex = queueEndIndex
629         )
630     }
631 
fusenull632     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
633         fuseSharedFlow(context, capacity, onBufferOverflow)
634 
635     private class Emitter(
636         @JvmField val flow: SharedFlowImpl<*>,
637         @JvmField var index: Long,
638         @JvmField val value: Any?,
639         @JvmField val cont: Continuation<Unit>
640     ) : DisposableHandle {
641         override fun dispose() = flow.cancelEmitter(this)
642     }
643 }
644 
645 @SharedImmutable
646 @JvmField
647 internal val NO_VALUE = Symbol("NO_VALUE")
648 
Arraynull649 private fun Array<Any?>.getBufferAt(index: Long) = get(index.toInt() and (size - 1))
650 private fun Array<Any?>.setBufferAt(index: Long, item: Any?) = set(index.toInt() and (size - 1), item)
651 
652 internal fun <T> SharedFlow<T>.fuseSharedFlow(
653     context: CoroutineContext,
654     capacity: Int,
655     onBufferOverflow: BufferOverflow
656 ): Flow<T> {
657     // context is irrelevant for shared flow and making additional rendezvous is meaningless
658     // however, additional non-trivial buffering after shared flow could make sense for very slow subscribers
659     if ((capacity == Channel.RENDEZVOUS || capacity == Channel.OPTIONAL_CHANNEL) && onBufferOverflow == BufferOverflow.SUSPEND) {
660         return this
661     }
662     // Apply channel flow operator as usual
663     return ChannelFlowOperatorImpl(this, context, capacity, onBufferOverflow)
664 }
665