• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.channels.*
10 import kotlinx.coroutines.flow.internal.*
11 import kotlinx.coroutines.internal.*
12 import kotlin.coroutines.*
13 import kotlin.native.concurrent.*
14 
15 /**
16  * A [SharedFlow] that represents a read-only state with a single updatable data [value] that emits updates
17  * to the value to its collectors. A state flow is a _hot_ flow because its active instance exists independently
18  * of the presence of collectors. Its current value can be retrieved via the [value] property.
19  *
20  * **State flow never completes**. A call to [Flow.collect] on a state flow never completes normally, and
21  * neither does a coroutine started by the [Flow.launchIn] function. An active collector of a state flow is called a _subscriber_.
22  *
23  * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with
24  * the initial value. The value of mutable state flow can be  updated by setting its [value] property.
25  * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates,
26  * but always collects the most recently emitted value.
27  *
28  * [StateFlow] is useful as a data-model class to represent any kind of state.
29  * Derived values can be defined using various operators on the flows, with [combine] operator being especially
30  * useful to combine values from multiple state flows using arbitrary functions.
31  *
32  * For example, the following class encapsulates an integer state and increments its value on each call to `inc`:
33  *
34  * ```
35  * class CounterModel {
36  *     private val _counter = MutableStateFlow(0) // private mutable state flow
37  *     val counter = _counter.asStateFlow() // publicly exposed as read-only state flow
38  *
39  *     fun inc() {
40  *         _counter.value++
41  *     }
42  * }
43  * ```
44  *
45  * Having two instances of the above `CounterModel` class one can define the sum of their counters like this:
46  *
47  * ```
48  * val aModel = CounterModel()
49  * val bModel = CounterModel()
50  * val sumFlow: Flow<Int> = aModel.counter.combine(bModel.counter) { a, b -> a + b }
51  * ```
52  *
53  * As an alternative to the above usage with the `MutableStateFlow(...)` constructor function,
54  * any _cold_ [Flow] can be converted to a state flow using the [stateIn] operator.
55  *
56  * ### Strong equality-based conflation
57  *
58  * Values in state flow are conflated using [Any.equals] comparison in a similar way to
59  * [distinctUntilChanged] operator. It is used to conflate incoming updates
60  * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors
61  * when new value is equal to the previously emitted one. State flow behavior with classes that violate
62  * the contract for [Any.equals] is unspecified.
63  *
64  * ### State flow is a shared flow
65  *
66  * State flow is a special-purpose, high-performance, and efficient implementation of [SharedFlow] for the narrow,
67  * but widely used case of sharing a state. See the [SharedFlow] documentation for the basic rules,
68  * constraints, and operators that are applicable to all shared flows.
69  *
70  * State flow always has an initial value, replays one most recent value to new subscribers, does not buffer any
71  * more values, but keeps the last emitted one, and does not support [resetReplayCache][MutableSharedFlow.resetReplayCache].
72  * A state flow behaves identically to a shared flow when it is created
73  * with the following parameters and the [distinctUntilChanged] operator is applied to it:
74  *
75  * ```
76  * // MutableStateFlow(initialValue) is a shared flow with the following parameters:
77  * val shared = MutableSharedFlow(
78  *     replay = 1,
79  *     onBufferOverflow = BufferOverflow.DROP_OLDEST
80  * )
81  * shared.tryEmit(initialValue) // emit the initial value
82  * val state = shared.distinctUntilChanged() // get StateFlow-like behavior
83  * ```
84  *
85  * Use [SharedFlow] when you need a [StateFlow] with tweaks in its behavior such as extra buffering, replaying more
86  * values, or omitting the initial value.
87  *
88  * ### StateFlow vs ConflatedBroadcastChannel
89  *
90  * Conceptually, state flow is similar to [ConflatedBroadcastChannel]
91  * and is designed to completely replace `ConflatedBroadcastChannel` in the future.
92  * It has the following important differences:
93  *
94  * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows
95  *   for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that
96  *   allocates objects on each emitted value.
97  * * `StateFlow` always has a value which can be safely read at any time via [value] property.
98  *    Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value.
99  * * `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow].
100  * * `StateFlow` conflation is based on equality like [distinctUntilChanged] operator,
101  *    unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity.
102  * * `StateFlow` cannot be closed like `ConflatedBroadcastChannel` and can never represent a failure.
103  *    All errors and completion signals should be explicitly _materialized_ if needed.
104  *
105  * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking
106  * more pragmatic design choices for the sake of convenience.
107  *
108  * To migrate [ConflatedBroadcastChannel] usage to [StateFlow], start by replacing usages of the `ConflatedBroadcastChannel()`
109  * constructor with `MutableStateFlow(initialValue)`, using `null` as an initial value if you don't have one.
110  * Replace [send][ConflatedBroadcastChannel.send] and [offer][ConflatedBroadcastChannel.offer] calls
111  * with updates to the state flow's [MutableStateFlow.value], and convert subscribers' code to flow operators.
112  * You can use the [filterNotNull] operator to mimic behavior of a `ConflatedBroadcastChannel` without initial value.
113  *
114  * ### Concurrency
115  *
116  * All methods of state flow are **thread-safe** and can be safely invoked from concurrent coroutines without
117  * external synchronization.
118  *
119  * ### Operator fusion
120  *
121  * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate],
122  * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity,
123  * [distinctUntilChanged][Flow.distinctUntilChanged], or [cancellable] operators to a state flow has no effect.
124  *
125  * ### Implementation notes
126  *
127  * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure
128  * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when
129  * using unconfined coroutines. Adding new subscribers has `O(1)` amortized cost, but updating a [value] has `O(N)`
130  * cost, where `N` is the number of active subscribers.
131  *
132  * ### Not stable for inheritance
133  *
134  * **`The StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
135  * might be added to this interface in the future, but is stable for use.
136  * Use the `MutableStateFlow(value)` constructor function to create an implementation.
137  */
138 public interface StateFlow<out T> : SharedFlow<T> {
139     /**
140      * The current value of this state flow.
141      */
142     public val value: T
143 }
144 
145 /**
146  * A mutable [StateFlow] that provides a setter for [value].
147  * An instance of `MutableStateFlow` with the given initial `value` can be created using
148  * `MutableStateFlow(value)` constructor function.
149  *
150  * See the [StateFlow] documentation for details on state flows.
151  *
152  * ### Not stable for inheritance
153  *
154  * **The `MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods
155  * might be added to this interface in the future, but is stable for use.
156  * Use the `MutableStateFlow()` constructor function to create an implementation.
157  */
158 public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
159     /**
160      * The current value of this state flow.
161      *
162      * Setting a value that is [equal][Any.equals] to the previous one does nothing.
163      */
164     public override var value: T
165 
166     /**
167      * Atomically compares the current [value] with [expect] and sets it to [update] if it is equal to [expect].
168      * The result is `true` if the [value] was set to [update] and `false` otherwise.
169      *
170      * This function use a regular comparison using [Any.equals]. If both [expect] and [update] are equal to the
171      * current [value], this function returns `true`, but it does not actually change the reference that is
172      * stored in the [value].
173      */
compareAndSetnull174     public fun compareAndSet(expect: T, update: T): Boolean
175 }
176 
177 /**
178  * Creates a [MutableStateFlow] with the given initial [value].
179  */
180 @Suppress("FunctionName")
181 public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = StateFlowImpl(value ?: NULL)
182 
183 // ------------------------------------ Implementation ------------------------------------
184 
185 @SharedImmutable
186 private val NONE = Symbol("NONE")
187 
188 @SharedImmutable
189 private val PENDING = Symbol("PENDING")
190 
191 // StateFlow slots are allocated for its collectors
192 private class StateFlowSlot : AbstractSharedFlowSlot<StateFlowImpl<*>>() {
193     /**
194      * Each slot can have one of the following states:
195      *
196      * * `null` -- it is not used right now. Can [allocateLocked] to new collector.
197      * * `NONE` -- used by a collector, but neither suspended nor has pending value.
198      * * `PENDING` -- pending to process new value.
199      * * `CancellableContinuationImpl<Unit>` -- suspended waiting for new value.
200      *
201      * It is important that default `null` value is used, because there can be a race between allocation
202      * of a new slot and trying to do [makePending] on this slot.
203      */
204     private val _state = atomic<Any?>(null)
205 
206     override fun allocateLocked(flow: StateFlowImpl<*>): Boolean {
207         // No need for atomic check & update here, since allocated happens under StateFlow lock
208         if (_state.value != null) return false // not free
209         _state.value = NONE // allocated
210         return true
211     }
212 
213     override fun freeLocked(flow: StateFlowImpl<*>): Array<Continuation<Unit>?> {
214         _state.value = null // free now
215         return EMPTY_RESUMES // nothing more to do
216     }
217 
218     @Suppress("UNCHECKED_CAST")
219     fun makePending() {
220         _state.loop { state ->
221             when {
222                 state == null -> return // this slot is free - skip it
223                 state === PENDING -> return // already pending, nothing to do
224                 state === NONE -> { // mark as pending
225                     if (_state.compareAndSet(state, PENDING)) return
226                 }
227                 else -> { // must be a suspend continuation state
228                     // we must still use CAS here since continuation may get cancelled and free the slot at any time
229                     if (_state.compareAndSet(state, NONE)) {
230                         (state as CancellableContinuationImpl<Unit>).resume(Unit)
231                         return
232                     }
233                 }
234             }
235         }
236     }
237 
238     fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state ->
239         assert { state !is CancellableContinuationImpl<*> }
240         return state === PENDING
241     }
242 
243     @Suppress("UNCHECKED_CAST")
244     suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont ->
245         assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING
246         if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending
247         // CAS failed -- the only possible reason is that it is already in pending state now
248         assert { _state.value === PENDING }
249         cont.resume(Unit)
250     }
251 }
252 
253 private class StateFlowImpl<T>(
254     initialState: Any // T | NULL
255 ) : AbstractSharedFlow<StateFlowSlot>(), MutableStateFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
256     private val _state = atomic(initialState) // T | NULL
257     private var sequence = 0 // serializes updates, value update is in process when sequence is odd
258 
259     @Suppress("UNCHECKED_CAST")
260     public override var value: T
261         get() = NULL.unbox(_state.value)
262         set(value) { updateState(null, value ?: NULL) }
263 
compareAndSetnull264     override fun compareAndSet(expect: T, update: T): Boolean =
265         updateState(expect ?: NULL, update ?: NULL)
266 
267     private fun updateState(expectedState: Any?, newState: Any): Boolean {
268         var curSequence = 0
269         var curSlots: Array<StateFlowSlot?>? = this.slots // benign race, we will not use it
270         synchronized(this) {
271             val oldState = _state.value
272             if (expectedState != null && oldState != expectedState) return false // CAS support
273             if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
274             _state.value = newState
275             curSequence = sequence
276             if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
277                 curSequence++ // make it odd
278                 sequence = curSequence
279             } else {
280                 // update is already in process, notify it, and return
281                 sequence = curSequence + 2 // change sequence to notify, keep it odd
282                 return true // updated
283             }
284             curSlots = slots // read current reference to collectors under lock
285         }
286         /*
287            Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines.
288            Loop until we're done firing all the changes. This is a sort of simple flat combining that
289            ensures sequential firing of concurrent updates and avoids the storm of collector resumes
290            when updates happen concurrently from many threads.
291          */
292         while (true) {
293             // Benign race on element read from array
294             curSlots?.forEach {
295                 it?.makePending()
296             }
297             // check if the value was updated again while we were updating the old one
298             synchronized(this) {
299                 if (sequence == curSequence) { // nothing changed, we are done
300                     sequence = curSequence + 1 // make sequence even again
301                     return true // done, updated
302                 }
303                 // reread everything for the next loop under the lock
304                 curSequence = sequence
305                 curSlots = slots
306             }
307         }
308     }
309 
310     override val replayCache: List<T>
311         get() = listOf(value)
312 
tryEmitnull313     override fun tryEmit(value: T): Boolean {
314         this.value = value
315         return true
316     }
317 
emitnull318     override suspend fun emit(value: T) {
319         this.value = value
320     }
321 
322     @Suppress("UNCHECKED_CAST")
resetReplayCachenull323     override fun resetReplayCache() {
324         throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
325     }
326 
collectnull327     override suspend fun collect(collector: FlowCollector<T>) {
328         val slot = allocateSlot()
329         try {
330             if (collector is SubscribedFlowCollector) collector.onSubscription()
331             val collectorJob = currentCoroutineContext()[Job]
332             var oldState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
333             // The loop is arranged so that it starts delivering current value without waiting first
334             while (true) {
335                 // Here the coroutine could have waited for a while to be dispatched,
336                 // so we use the most recent state here to ensure the best possible conflation of stale values
337                 val newState = _state.value
338                 // always check for cancellation
339                 collectorJob?.ensureActive()
340                 // Conflate value emissions using equality
341                 if (oldState == null || oldState != newState) {
342                     collector.emit(NULL.unbox(newState))
343                     oldState = newState
344                 }
345                 // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
346                 if (!slot.takePending()) { // try fast-path without suspending first
347                     slot.awaitPending() // only suspend for new values when needed
348                 }
349             }
350         } finally {
351             freeSlot(slot)
352         }
353     }
354 
createSlotnull355     override fun createSlot() = StateFlowSlot()
356     override fun createSlotArray(size: Int): Array<StateFlowSlot?> = arrayOfNulls(size)
357 
358     override fun fuse(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow) =
359         fuseStateFlow(context, capacity, onBufferOverflow)
360 }
361 
362 internal fun MutableStateFlow<Int>.increment(delta: Int) {
363     while (true) { // CAS loop
364         val current = value
365         if (compareAndSet(current, current + delta)) return
366     }
367 }
368 
fuseStateFlownull369 internal fun <T> StateFlow<T>.fuseStateFlow(
370     context: CoroutineContext,
371     capacity: Int,
372     onBufferOverflow: BufferOverflow
373 ): Flow<T> {
374     // state flow is always conflated so additional conflation does not have any effect
375     assert { capacity != Channel.CONFLATED } // should be desugared by callers
376     if ((capacity in 0..1 || capacity == Channel.BUFFERED) && onBufferOverflow == BufferOverflow.DROP_OLDEST) {
377         return this
378     }
379     return fuseSharedFlow(context, capacity, onBufferOverflow)
380 }
381