1 /*
<lambda>null2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:JvmName("SnapshotStateKt")
18 @file:JvmMultifileClass
19
20 package androidx.compose.runtime
21
22 import androidx.collection.MutableScatterSet
23 import androidx.compose.runtime.collection.fastAny
24 import androidx.compose.runtime.snapshots.ReaderKind
25 import androidx.compose.runtime.snapshots.Snapshot
26 import androidx.compose.runtime.snapshots.StateObjectImpl
27 import kotlin.coroutines.CoroutineContext
28 import kotlin.coroutines.EmptyCoroutineContext
29 import kotlin.jvm.JvmMultifileClass
30 import kotlin.jvm.JvmName
31 import kotlinx.coroutines.channels.Channel
32 import kotlinx.coroutines.flow.Flow
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.flow
35 import kotlinx.coroutines.withContext
36
37 /**
38 * Collects values from this [StateFlow] and represents its latest value via [State]. The
39 * [StateFlow.value] is used as an initial value. Every time there would be new value posted into
40 * the [StateFlow] the returned [State] will be updated causing recomposition of every [State.value]
41 * usage.
42 *
43 * @sample androidx.compose.runtime.samples.StateFlowSample
44 * @param context [CoroutineContext] to use for collecting.
45 */
46 @Suppress("StateFlowValueCalledInComposition")
47 @Composable
48 fun <T> StateFlow<T>.collectAsState(context: CoroutineContext = EmptyCoroutineContext): State<T> =
49 collectAsState(value, context)
50
51 /**
52 * Collects values from this [Flow] and represents its latest value via [State]. Every time there
53 * would be new value posted into the [Flow] the returned [State] will be updated causing
54 * recomposition of every [State.value] usage.
55 *
56 * @sample androidx.compose.runtime.samples.FlowWithInitialSample
57 * @param initial the value of the state will have until the first flow value is emitted.
58 * @param context [CoroutineContext] to use for collecting.
59 */
60 @Composable
61 fun <T : R, R> Flow<T>.collectAsState(
62 initial: R,
63 context: CoroutineContext = EmptyCoroutineContext
64 ): State<R> =
65 produceState(initial, this, context) {
66 if (context == EmptyCoroutineContext) {
67 collect { value = it }
68 } else withContext(context) { collect { value = it } }
69 }
70
71 /**
72 * Create a [Flow] from observable [Snapshot] state. (e.g. state holders returned by
73 * [mutableStateOf][androidx.compose.runtime.mutableStateOf].)
74 *
75 * [snapshotFlow] creates a [Flow] that runs [block] when collected and emits the result, recording
76 * any snapshot state that was accessed. While collection continues, if a new [Snapshot] is applied
77 * that changes state accessed by [block], the flow will run [block] again, re-recording the
78 * snapshot state that was accessed. If the result of [block] is not [equal to][Any.equals] the
79 * previous result, the flow will emit that new result. (This behavior is similar to that of
80 * [Flow.distinctUntilChanged][kotlinx.coroutines.flow.distinctUntilChanged].) Collection will
81 * continue indefinitely unless it is explicitly cancelled or limited by the use of other [Flow]
82 * operators.
83 *
84 * @sample androidx.compose.runtime.samples.snapshotFlowSample
85 *
86 * [block] is run in a **read-only** [Snapshot] and may not modify snapshot data. If [block]
87 * attempts to modify snapshot data, flow collection will fail with [IllegalStateException].
88 *
89 * [block] may run more than once for equal sets of inputs or only once after many rapid snapshot
90 * changes; it should be idempotent and free of side effects.
91 *
92 * When working with [Snapshot] state it is useful to keep the distinction between **events** and
93 * **state** in mind. [snapshotFlow] models snapshot changes as events, but events **cannot** be
94 * effectively modeled as observable state. Observable state is a lossy compression of the events
95 * that produced that state.
96 *
97 * An observable **event** happens at a point in time and is discarded. All registered observers at
98 * the time the event occurred are notified. All individual events in a stream are assumed to be
99 * relevant and may build on one another; repeated equal events have meaning and therefore a
100 * registered observer must observe all events without skipping.
101 *
102 * Observable **state** raises change events when the state changes from one value to a new, unequal
103 * value. State change events are **conflated;** only the most recent state matters. Observers of
104 * state changes must therefore be **idempotent;** given the same state value the observer should
105 * produce the same result. It is valid for a state observer to both skip intermediate states as
106 * well as run multiple times for the same state and the result should be the same.
107 */
<lambda>null108 fun <T> snapshotFlow(block: () -> T): Flow<T> = flow {
109 // Objects read the last time block was run
110 val readSet = MutableScatterSet<Any>()
111 val readObserver: (Any) -> Unit = {
112 if (it is StateObjectImpl) {
113 it.recordReadIn(ReaderKind.SnapshotFlow)
114 }
115 readSet.add(it)
116 }
117
118 // This channel may not block or lose data on a trySend call.
119 val appliedChanges = Channel<Set<Any>>(Channel.UNLIMITED)
120
121 // Register the apply observer before running for the first time
122 // so that we don't miss updates.
123 val unregisterApplyObserver =
124 Snapshot.registerApplyObserver { changed, _ ->
125 val maybeObserved =
126 changed.fastAny { it !is StateObjectImpl || it.isReadIn(ReaderKind.SnapshotFlow) }
127
128 if (maybeObserved) {
129 appliedChanges.trySend(changed)
130 }
131 }
132
133 try {
134 var lastValue =
135 Snapshot.takeSnapshot(readObserver).run {
136 try {
137 enter(block)
138 } finally {
139 dispose()
140 }
141 }
142 emit(lastValue)
143
144 while (true) {
145 var found = false
146 var changedObjects = appliedChanges.receive()
147
148 // Poll for any other changes before running block to minimize the number of
149 // additional times it runs for the same data
150 while (true) {
151 // Assumption: readSet will typically be smaller than changed set
152 found = found || readSet.intersects(changedObjects)
153 changedObjects = appliedChanges.tryReceive().getOrNull() ?: break
154 }
155
156 if (found) {
157 readSet.clear()
158 val newValue =
159 Snapshot.takeSnapshot(readObserver).run {
160 try {
161 enter(block)
162 } finally {
163 dispose()
164 }
165 }
166
167 if (newValue != lastValue) {
168 lastValue = newValue
169 emit(newValue)
170 }
171 }
172 }
173 } finally {
174 unregisterApplyObserver.dispose()
175 }
176 }
177
<lambda>null178 private fun MutableScatterSet<Any>.intersects(set: Set<Any>) = any { it in set }
179