• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.flow
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.channels.*
6 import kotlinx.coroutines.flow.internal.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 
10 /**
11  * A [SharedFlow] that represents a read-only state with a single updatable data [value] that emits updates
12  * to the value to its collectors. A state flow is a _hot_ flow because its active instance exists independently
13  * of the presence of collectors. Its current value can be retrieved via the [value] property.
14  *
15  * **State flow never completes**. A call to [Flow.collect] on a state flow never completes normally, and
16  * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_.
17  *
18  * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with
19  * the initial value. The value of mutable state flow can be updated by setting its [value] property.
20  * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates,
21  * but always collects the most recently emitted value.
22  *
23  * [StateFlow] is useful as a data-model class to represent any kind of state.
24  * Derived values can be defined using various operators on the flows, with [combine] operator being especially
25  * useful to combine values from multiple state flows using arbitrary functions.
26  *
27  * For example, the following class encapsulates an integer state and increments its value on each call to `inc`:
28  *
29  * ```
30  * class CounterModel {
31  *     private val _counter = MutableStateFlow(0) // private mutable state flow
32  *     val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
33  *
34  *     fun inc() {
35  *         _counter.update { count -> count + 1 } // atomic, safe for concurrent use
36  *     }
37  * }
38  * ```
39  *
40  * Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
41  *
42  * ```
43  * val aModel = CounterModel()
44  * val bModel = CounterModel()
45  * val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
46  * ```
47  *
48  * As an alternative to the above usage with the `MutableStateFlow(...)` constructor function,
49  * any _cold_ [Flow] can be converted to a state flow using the [stateIn] operator.
50  *
51  * ### Strong equality-based conflation
52  *
53  * Values in state flow are conflated using [Any.equals] comparison in a similar way to
54  * [distinctUntilChanged] operator. It is used to conflate incoming updates
55  * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors
56  * when new value is equal to the previously emitted one. State flow behavior with classes that violate
57  * the contract for [Any.equals] is unspecified.
58  *
59  * ### State flow is a shared flow
60  *
61  * State flow is a special-purpose, high-performance, and efficient implementation of [SharedFlow] for the narrow,
62  * but widely used case of sharing a state. See the [SharedFlow] documentation for the basic rules,
63  * constraints, and operators that are applicable to all shared flows.
64  *
65  * State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any
66  * more values, but keeps the last emitted one, and does not support [resetReplayCache][MutableSharedFlow.resetReplayCache].
67  * A state flow behaves identically to a shared flow when it is created
68  * with the following parameters and the [distinctUntilChanged] operator is applied to it:
69  *
70  * ```
71  * // MutableStateFlow(initialValue) is a shared flow with the following parameters:
72  * val shared = MutableSharedFlow(
73  *     replay = 1,
74  *     onBufferOverflow = BufferOverflow.DROP_OLDEST
75  * )
76  * shared.tryEmit(initialValue) // emit the initial value
77  * val state = shared.distinctUntilChanged() // get StateFlow-like behavior
78  * ```
79  *
80  * Use [SharedFlow] when you need a [StateFlow] with tweaks in its behavior such as extra buffering, replaying more
81  * values, or omitting the initial value.
82  *
83  * ### StateFlow vs ConflatedBroadcastChannel
84  *
85  * Conceptually, state flow is similar to [ConflatedBroadcastChannel]
86  * and is designed to completely replace it.
87  * It has the following important differences:
88  *
89  * - `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
90  *   for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that
91  *   allocates objects on each emitted value.
92  * - `StateFlow` always has a value which can be safely read at any time via [value] property.
93  *   Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value.
94  * - `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow].
95  * - `StateFlow` conflation is based on equality like [distinctUntilChanged] operator,
96  *   unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity.
97  * - `StateFlow` cannot be closed like `ConflatedBroadcastChannel` and can never represent a failure.
98  *   All errors and completion signals should be explicitly _materialized_ if needed.
99  *
100  * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking
101  * more pragmatic design choices for the sake of convenience.
102  *
103  * To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
104  * constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
105  * Replace [send][ConflatedBroadcastChannel.send] and [trySend][ConflatedBroadcastChannel.trySend] calls
106  * with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
107  * You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
108  *
109  * ### Concurrency
110  *
111  * All methods of state flow are **thread-safe** and can be safely invoked from concurrent coroutines without
112  * external synchronization.
113  *
114  * ### Operator fusion
115  *
116  * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
117  * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
118  * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
119  *
120  * ### Implementation notes
121  *
122  * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
123  * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
124  * using unconfined coroutines. Adding new subscribers has `O(1)` amortized cost, but updating a [value] has `O(N)`
125  * cost, where `N` is the number of active subscribers.
126  *
127  * ### Not stable for inheritance
128  *
129  * **`The StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
130  * might be added to this interface in the future, but is stable for use.
131  * Use the `MutableStateFlow(value)` constructor function to create an implementation.
132  */
133 @OptIn(ExperimentalSubclassOptIn::class)
134 @SubclassOptInRequired(ExperimentalForInheritanceCoroutinesApi::class)
135 public interface StateFlow<out T> : SharedFlow<T> {
136     /**
137      * The current value of this state flow.
138      */
139     public val value: T
140 }
141 
142 /**
143  * A mutable [StateFlow] that provides a setter for [value].
144  * An instance of `MutableStateFlow` with the given initial `value` can be created using
145  * `MutableStateFlow(value)` constructor function.
146 
147  * See the [StateFlow] documentation for details on state flows.
148  * Note that all emission-related operators, such as [value]'s setter, [emit], and [tryEmit], are conflated using [Any.equals].
149  *
150  * ### Not stable for inheritance
151  *
152  * **The `MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
153  * might be added to this interface in the future, but is stable for use.
154  * Use the `MutableStateFlow()` constructor function to create an implementation.
155  */
156 @OptIn(ExperimentalSubclassOptIn::class)
157 @SubclassOptInRequired(ExperimentalForInheritanceCoroutinesApi::class)
158 public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
159     /**
160      * The current value of this state flow.
161      *
162      * Setting a value that is [equal][Any.equals] to the previous one does nothing.
163      *
164      * This property is **thread-safe** and can be safely updated from concurrent coroutines without
165      * external synchronization.
166      */
167     public override var value: T
168 
169     /**
170      * Atomically compares the current [value] with [expect] and sets it to [update] if it is equal to [expect].
171      * The result is `true` if the [value] was set to [update] and `false` otherwise.
172      *
173      * This function use a regular comparison using [Any.equals]. If both [expect] and [update] are equal to the
174      * current [value], this function returns `true`, but it does not actually change the reference that is
175      * stored in the [value].
176      *
177      * This method is **thread-safe** and can be safely invoked from concurrent coroutines without
178      * external synchronization.
179      */
compareAndSetnull180     public fun compareAndSet(expect: T, update: T): Boolean
181 }
182 
183 /**
184  * Creates a [MutableStateFlow] with the given initial [value].
185  */
186 @Suppress("FunctionName")
187 public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
188 
189 // ------------------------------------ Update methods ------------------------------------
190 
191 /**
192  * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns the new
193  * value.
194  *
195  * [function] may be evaluated multiple times, if [value] is being concurrently updated.
196  */
197 public inline fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T {
198     while (true) {
199         val prevValue = value
200         val nextValue = function(prevValue)
201         if (compareAndSet(prevValue, nextValue)) {
202             return nextValue
203         }
204     }
205 }
206 
207 /**
208  * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value, and returns its
209  * prior value.
210  *
211  * [function] may be evaluated multiple times, if [value] is being concurrently updated.
212  */
getAndUpdatenull213 public inline fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T {
214     while (true) {
215         val prevValue = value
216         val nextValue = function(prevValue)
217         if (compareAndSet(prevValue, nextValue)) {
218             return prevValue
219         }
220     }
221 }
222 
223 
224 /**
225  * Updates the [MutableStateFlow.value] atomically using the specified [function] of its value.
226  *
227  * [function] may be evaluated multiple times, if [value] is being concurrently updated.
228  */
updatenull229 public inline fun <T> MutableStateFlow<T>.update(function: (T) -> T) {
230     while (true) {
231         val prevValue = value
232         val nextValue = function(prevValue)
233         if (compareAndSet(prevValue, nextValue)) {
234             return
235         }
236     }
237 }
238 
239 // ------------------------------------ Implementation ------------------------------------
240 
241 private val NONE = Symbol("NONE")
242 
243 private val PENDING = Symbol("PENDING")
244 
245 // StateFlow slots are allocated for its collectors
246 private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
247     /**
248      * Each slot can have one of the following states:
249      *
250      * - `null` -- it is not used right now. Can [allocateLocked] to new collector.
251      * - `NONE` -- used by a collector, but neither suspended nor has pending value.
252      * - `PENDING` -- pending to process new value.
253      * - `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
254      *
255      * It is important that default `null` value is used, because there can be a race between allocation
256      * of a new slot and trying to do [makePending] on this slot.
257      *
258      * ===
259      * This should be `atomic<Any?>(null)` instead of the atomic reference, but because of #3820
260      * it is used as a **temporary** solution starting from 1.8.1 version.
261      * Depending on the fix rollout on Android, it will be removed in 1.9.0 or 2.0.0.
262      * See https://issuetracker.google.com/issues/325123736
263      */
264     private val _state = WorkaroundAtomicReference<Any?>(null)
265 
allocateLockednull266     override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
267         // No need for atomic check & update here, since allocated happens under StateFlow lock
268         if (_state.value != null) return false // not free
269         _state.value = NONE // allocated
270         return true
271     }
272 
freeLockednull273     override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
274         _state.value = null // free now
275         return EMPTY_RESUMES // nothing more to do
276     }
277 
278     @Suppress("UNCHECKED_CAST")
makePendingnull279     fun makePending() {
280         _state.loop { state ->
281             when {
282                 state == null -> return // this slot is free - skip it
283                 state === PENDING -> return // already pending, nothing to do
284                 state === NONE -> { // mark as pending
285                     if (_state.compareAndSet(state, PENDING)) return
286                 }
287                 else -> { // must be a suspend continuation state
288                     // we must still use CAS here since continuation may get cancelled and free the slot at any time
289                     if (_state.compareAndSet(state, NONE)) {
290                         (state as CancellableContinuationImpl<Unit>).resume(Unit)
291                         return
292                     }
293                 }
294             }
295         }
296     }
297 
statenull298     fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
299         assert { state !is CancellableContinuationImpl<*> }
300         return state === PENDING
301     }
302 
awaitPendingnull303     suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
304         assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
305         if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
306         // CAS failed -- the only possible reason is that it is already in pending state now
307         assert { _state.value === PENDING }
308         cont.resume(Unit)
309     }
310 }
311 
312 @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
313 private class StateFlowImpl<T>(
314     initialState: Any // T | NULL
315 ) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
316     private val _state = atomic(initialState) // T | NULL
317     private var sequence = 0 // serializes updates, value update is in process when sequence is odd
318 
319     public override var value: T
320         get() = NULL.unbox(_state.value)
321         set(value) { updateState(null, value ?: NULL) }
322 
compareAndSetnull323     override fun compareAndSet(expect: T, update: T): Boolean =
324         updateState(expect ?: NULL, update ?: NULL)
325 
326     private fun updateState(expectedState: Any?, newState: Any): Boolean {
327         var curSequence: Int
328         var curSlots: Array<StateFlowSlot?>? // benign race, we will not use it
329         synchronized(this) {
330             val oldState = _state.value
331             if (expectedState != null && oldState != expectedState) return false // CAS support
332             if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
333             _state.value = newState
334             curSequence = sequence
335             if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
336                 curSequence++ // make it odd
337                 sequence = curSequence
338             } else {
339                 // update is already in process, notify it, and return
340                 sequence = curSequence + 2 // change sequence to notify, keep it odd
341                 return true // updated
342             }
343             curSlots = slots // read current reference to collectors under lock
344         }
345         /*
346            Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
347            Loop until we're done firing all the changes. This is a sort of simple flat combining that
348            ensures sequential firing of concurrent updates and avoids the storm of collector resumes
349            when updates happen concurrently from many threads.
350          */
351         while (true) {
352             // Benign race on element read from array
353             curSlots?.forEach {
354                 it?.makePending()
355             }
356             // check if the value was updated again while we were updating the old one
357             synchronized(this) {
358                 if (sequence == curSequence) { // nothing changed, we are done
359                     sequence = curSequence + 1 // make sequence even again
360                     return true // done, updated
361                 }
362                 // reread everything for the next loop under the lock
363                 curSequence = sequence
364                 curSlots = slots
365             }
366         }
367     }
368 
369     override val replayCache: List<T>
370         get() = listOf(value)
371 
tryEmitnull372     override fun tryEmit(value: T): Boolean {
373         this.value = value
374         return true
375     }
376 
emitnull377     override suspend fun emit(value: T) {
378         this.value = value
379     }
380 
381     @Suppress("UNCHECKED_CAST")
resetReplayCachenull382     override fun resetReplayCache() {
383         throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
384     }
385 
collectnull386     override suspend fun collect(collector: FlowCollector<T>): Nothing {
387         val slot = allocateSlot()
388         try {
389             if (collector is SubscribedFlowCollector) collector.onSubscription()
390             val collectorJob = currentCoroutineContext()[Job]
391             var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
392             // The loop is arranged so that it starts delivering current value without waiting first
393             while (true) {
394                 // Here the coroutine could have waited for a while to be dispatched,
395                 // so we use the most recent state here to ensure the best possible conflation of stale values
396                 val newState = _state.value
397                 // always check for cancellation
398                 collectorJob?.ensureActive()
399                 // Conflate value emissions using equality
400                 if (oldState == null || oldState != newState) {
401                     collector.emit(NULL.unbox(newState))
402                     oldState = newState
403                 }
404                 // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
405                 if (!slot.takePending()) { // try fast-path without suspending first
406                     slot.awaitPending() // only suspend for new values when needed
407                 }
408             }
409         } finally {
410             freeSlot(slot)
411         }
412     }
413 
createSlotnull414     override fun createSlot() = StateFlowSlot()
415     override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
416 
417     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
418         fuseStateFlow(context, capacity, onBufferOverflow)
419 }
420 
421 internal fun <T> StateFlow<T>.fuseStateFlow(
422     context: CoroutineContext,
423     capacity: Int,
424     onBufferOverflow: BufferOverflow
425 ): Flow<T> {
426     // state flow is always conflated so additional conflation does not have any effect
427     assert { capacity != Channel.CONFLATED } // should be desugared by callers
428     if ((capacity in 0..1 || capacity == Channel.BUFFERED) && onBufferOverflow == BufferOverflow.DROP_OLDEST) {
429         return this
430     }
431     return fuseSharedFlow(context, capacity, onBufferOverflow)
432 }
433