1 /*
<lambda>null2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:JvmName("SnapshotStateKt")
18 @file:JvmMultifileClass
19 
20 package androidx.compose.runtime
21 
22 import androidx.collection.MutableScatterSet
23 import androidx.compose.runtime.collection.fastAny
24 import androidx.compose.runtime.snapshots.ReaderKind
25 import androidx.compose.runtime.snapshots.Snapshot
26 import androidx.compose.runtime.snapshots.StateObjectImpl
27 import kotlin.coroutines.CoroutineContext
28 import kotlin.coroutines.EmptyCoroutineContext
29 import kotlin.jvm.JvmMultifileClass
30 import kotlin.jvm.JvmName
31 import kotlinx.coroutines.channels.Channel
32 import kotlinx.coroutines.flow.Flow
33 import kotlinx.coroutines.flow.StateFlow
34 import kotlinx.coroutines.flow.flow
35 import kotlinx.coroutines.withContext
36 
37 /**
38  * Collects values from this [StateFlow] and represents its latest value via [State]. The
39  * [StateFlow.value] is used as an initial value. Every time there would be new value posted into
40  * the [StateFlow] the returned [State] will be updated causing recomposition of every [State.value]
41  * usage.
42  *
43  * @sample androidx.compose.runtime.samples.StateFlowSample
44  * @param context [CoroutineContext] to use for collecting.
45  */
46 @Suppress("StateFlowValueCalledInComposition")
47 @Composable
48 fun <T> StateFlow<T>.collectAsState(context: CoroutineContext = EmptyCoroutineContext): State<T> =
49     collectAsState(value, context)
50 
51 /**
52  * Collects values from this [Flow] and represents its latest value via [State]. Every time there
53  * would be new value posted into the [Flow] the returned [State] will be updated causing
54  * recomposition of every [State.value] usage.
55  *
56  * @sample androidx.compose.runtime.samples.FlowWithInitialSample
57  * @param initial the value of the state will have until the first flow value is emitted.
58  * @param context [CoroutineContext] to use for collecting.
59  */
60 @Composable
61 fun <T : R, R> Flow<T>.collectAsState(
62     initial: R,
63     context: CoroutineContext = EmptyCoroutineContext
64 ): State<R> =
65     produceState(initial, this, context) {
66         if (context == EmptyCoroutineContext) {
67             collect { value = it }
68         } else withContext(context) { collect { value = it } }
69     }
70 
71 /**
72  * Create a [Flow] from observable [Snapshot] state. (e.g. state holders returned by
73  * [mutableStateOf][androidx.compose.runtime.mutableStateOf].)
74  *
75  * [snapshotFlow] creates a [Flow] that runs [block] when collected and emits the result, recording
76  * any snapshot state that was accessed. While collection continues, if a new [Snapshot] is applied
77  * that changes state accessed by [block], the flow will run [block] again, re-recording the
78  * snapshot state that was accessed. If the result of [block] is not [equal to][Any.equals] the
79  * previous result, the flow will emit that new result. (This behavior is similar to that of
80  * [Flow.distinctUntilChanged][kotlinx.coroutines.flow.distinctUntilChanged].) Collection will
81  * continue indefinitely unless it is explicitly cancelled or limited by the use of other [Flow]
82  * operators.
83  *
84  * @sample androidx.compose.runtime.samples.snapshotFlowSample
85  *
86  * [block] is run in a **read-only** [Snapshot] and may not modify snapshot data. If [block]
87  * attempts to modify snapshot data, flow collection will fail with [IllegalStateException].
88  *
89  * [block] may run more than once for equal sets of inputs or only once after many rapid snapshot
90  * changes; it should be idempotent and free of side effects.
91  *
92  * When working with [Snapshot] state it is useful to keep the distinction between **events** and
93  * **state** in mind. [snapshotFlow] models snapshot changes as events, but events **cannot** be
94  * effectively modeled as observable state. Observable state is a lossy compression of the events
95  * that produced that state.
96  *
97  * An observable **event** happens at a point in time and is discarded. All registered observers at
98  * the time the event occurred are notified. All individual events in a stream are assumed to be
99  * relevant and may build on one another; repeated equal events have meaning and therefore a
100  * registered observer must observe all events without skipping.
101  *
102  * Observable **state** raises change events when the state changes from one value to a new, unequal
103  * value. State change events are **conflated;** only the most recent state matters. Observers of
104  * state changes must therefore be **idempotent;** given the same state value the observer should
105  * produce the same result. It is valid for a state observer to both skip intermediate states as
106  * well as run multiple times for the same state and the result should be the same.
107  */
<lambda>null108 fun <T> snapshotFlow(block: () -> T): Flow<T> = flow {
109     // Objects read the last time block was run
110     val readSet = MutableScatterSet<Any>()
111     val readObserver: (Any) -> Unit = {
112         if (it is StateObjectImpl) {
113             it.recordReadIn(ReaderKind.SnapshotFlow)
114         }
115         readSet.add(it)
116     }
117 
118     // This channel may not block or lose data on a trySend call.
119     val appliedChanges = Channel<Set<Any>>(Channel.UNLIMITED)
120 
121     // Register the apply observer before running for the first time
122     // so that we don't miss updates.
123     val unregisterApplyObserver =
124         Snapshot.registerApplyObserver { changed, _ ->
125             val maybeObserved =
126                 changed.fastAny { it !is StateObjectImpl || it.isReadIn(ReaderKind.SnapshotFlow) }
127 
128             if (maybeObserved) {
129                 appliedChanges.trySend(changed)
130             }
131         }
132 
133     try {
134         var lastValue =
135             Snapshot.takeSnapshot(readObserver).run {
136                 try {
137                     enter(block)
138                 } finally {
139                     dispose()
140                 }
141             }
142         emit(lastValue)
143 
144         while (true) {
145             var found = false
146             var changedObjects = appliedChanges.receive()
147 
148             // Poll for any other changes before running block to minimize the number of
149             // additional times it runs for the same data
150             while (true) {
151                 // Assumption: readSet will typically be smaller than changed set
152                 found = found || readSet.intersects(changedObjects)
153                 changedObjects = appliedChanges.tryReceive().getOrNull() ?: break
154             }
155 
156             if (found) {
157                 readSet.clear()
158                 val newValue =
159                     Snapshot.takeSnapshot(readObserver).run {
160                         try {
161                             enter(block)
162                         } finally {
163                             dispose()
164                         }
165                     }
166 
167                 if (newValue != lastValue) {
168                     lastValue = newValue
169                     emit(newValue)
170                 }
171             }
172         }
173     } finally {
174         unregisterApplyObserver.dispose()
175     }
176 }
177 
<lambda>null178 private fun MutableScatterSet<Any>.intersects(set: Set<Any>) = any { it in set }
179