1 /*
<lambda>null2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:RestrictTo(RestrictTo.Scope.LIBRARY)
18
19 package androidx.paging
20
21 import androidx.annotation.RestrictTo
22 import androidx.paging.CombineSource.INITIAL
23 import androidx.paging.CombineSource.OTHER
24 import androidx.paging.CombineSource.RECEIVER
25 import androidx.paging.internal.AtomicInt
26 import kotlinx.coroutines.CompletableDeferred
27 import kotlinx.coroutines.Job
28 import kotlinx.coroutines.channels.SendChannel
29 import kotlinx.coroutines.flow.Flow
30 import kotlinx.coroutines.flow.FlowCollector
31 import kotlinx.coroutines.flow.collectLatest
32 import kotlinx.coroutines.flow.emitAll
33 import kotlinx.coroutines.flow.flow
34 import kotlinx.coroutines.launch
35 import kotlinx.coroutines.sync.Mutex
36 import kotlinx.coroutines.sync.withLock
37 import kotlinx.coroutines.yield
38
39 /**
40 * This File includes custom flow operators that we implement to avoid using experimental APIs from
41 * coroutines. Eventually, this file should be deleted once those APIs become stable.
42 */
43 private val NULL = Any()
44
45 /** Temporary `scan` operator on Flow without experimental APIs. */
46 internal fun <T, R> Flow<T>.simpleScan(
47 initial: R,
48 operation: suspend (accumulator: R, value: T) -> R
49 ): Flow<R> = flow {
50 var accumulator: R = initial
51 emit(accumulator)
52 collect { value ->
53 accumulator = operation(accumulator, value)
54 emit(accumulator)
55 }
56 }
57
58 /** Temporary `runningReduce` operator on Flow without experimental APIs. */
simpleRunningReducenull59 internal fun <T> Flow<T>.simpleRunningReduce(
60 operation: suspend (accumulator: T, value: T) -> T
61 ): Flow<T> = flow {
62 var accumulator: Any? = NULL
63 collect { value ->
64 accumulator =
65 if (accumulator === NULL) {
66 value
67 } else {
68 @Suppress("UNCHECKED_CAST") operation(accumulator as T, value)
69 }
70 @Suppress("UNCHECKED_CAST") emit(accumulator as T)
71 }
72 }
73
74 /** This is a similar implementation to transformLatest using a channel Flow. */
simpleTransformLatestnull75 internal fun <T, R> Flow<T>.simpleTransformLatest(
76 transform: suspend FlowCollector<R>.(value: T) -> Unit
77 ): Flow<R> = simpleChannelFlow {
78 val origin = this@simpleTransformLatest
79 val collector = ChannelFlowCollector(this@simpleChannelFlow)
80 origin.collectLatest { value -> collector.transform(value) }
81 }
82
83 /** flatMapLatest without experimental APIs */
simpleFlatMapLatestnull84 internal inline fun <T, R> Flow<T>.simpleFlatMapLatest(
85 crossinline transform: suspend (value: T) -> Flow<R>
86 ): Flow<R> = simpleTransformLatest { emitAll(transform(it)) }
87
88 /** mapLatest without experimental APIs */
simpleMapLatestnull89 internal inline fun <T, R> Flow<T>.simpleMapLatest(
90 crossinline transform: suspend (value: T) -> R
91 ): Flow<R> = simpleTransformLatest { emit(transform(it)) }
92
93 internal class ChannelFlowCollector<T>(val channel: SendChannel<T>) : FlowCollector<T> {
emitnull94 override suspend fun emit(value: T) {
95 channel.send(value)
96 }
97 }
98
99 /**
100 * Similar to [kotlinx.coroutines.flow.combine], except it never batches reads from its Flows, so
101 * [transform] is always guaranteed to get called for every emission from either Flow after the
102 * initial call (which waits for the first emission from both Flows).
103 *
104 * The emissions for both Flows are also guaranteed to get buffered, so if one Flow emits multiple
105 * times before the other does, [transform] will get called for each emission from the first Flow
106 * instead of just once with the latest values.
107 *
108 * @param transform The transform to apply on each update. This is first called after awaiting an
109 * initial emission from both Flows, and then is guaranteed to be called for every emission from
110 * either Flow.
111 *
112 * For convenience, [CombineSource] is also passed to the transform, which indicates the origin of
113 * the update with the following possible values:
114 * * [INITIAL]: Initial emission from both Flows
115 * * [RECEIVER]: Triggered by new emission from receiver
116 * * [OTHER]: Triggered by new emission from [otherFlow]
117 */
combineWithoutBatchingnull118 internal suspend inline fun <T1, T2, R> Flow<T1>.combineWithoutBatching(
119 otherFlow: Flow<T2>,
120 crossinline transform: suspend (T1, T2, updateFrom: CombineSource) -> R,
121 ): Flow<R> {
122 return simpleChannelFlow {
123 val incompleteFlows = AtomicInt(2)
124 val unbatchedFlowCombiner =
125 UnbatchedFlowCombiner<T1, T2> { t1, t2, updateFrom ->
126 send(transform(t1, t2, updateFrom))
127 }
128 val parentJob = Job()
129 arrayOf(this@combineWithoutBatching, otherFlow).forEachIndexed { index, flow ->
130 launch(parentJob) {
131 try {
132 flow.collect { value ->
133 unbatchedFlowCombiner.onNext(index, value)
134
135 // Make this more fair, giving the other flow a chance to emit.
136 yield()
137 }
138 } finally {
139 if (incompleteFlows.decrementAndGet() == 0) {
140 close()
141 }
142 }
143 }
144 }
145
146 awaitClose { parentJob.cancel() }
147 }
148 }
149
150 /**
151 * Helper class for [UnbatchedFlowCombiner], which handles dispatching the combined values in the
152 * correct order, and with [CombineSource].
153 *
154 * NOTE: This implementation relies on the fact that [onNext] is called in-order for emissions from
155 * the same Flow. This means that concurrently calling [onNext] with the same index will not work.
156 *
157 * @see combineWithoutBatching
158 */
159 internal class UnbatchedFlowCombiner<T1, T2>(
160 private val send: suspend (t1: T1, t2: T2, updateFrom: CombineSource) -> Unit
161 ) {
162 private val initialDispatched = CompletableDeferred<Unit>()
163 private val lock = Mutex()
<lambda>null164 private val valueReceived = Array(2) { CompletableDeferred<Unit>() }
<lambda>null165 private val values = Array<Any?>(2) { NULL }
166
onNextnull167 suspend fun onNext(index: Int, value: Any?) {
168 // Allow the first value to dispatch immediately, but for subsequent values, we should
169 // wait until the other flow emits, so that we don't overwrite the previous value.
170 if (valueReceived[index].isCompleted) {
171 // NOTE: We use a separate Completable here because just awaiting
172 // valueReceived[1 - index] could potentially allow multiple calls to onNext from the
173 // same Flow to trigger out of order.
174 initialDispatched.await()
175 } else {
176 valueReceived[index].complete(Unit)
177 }
178
179 lock.withLock {
180 val isInitial = values.any { it === NULL }
181 values[index] = value
182
183 if (values.none { it === NULL }) {
184 val updateFrom =
185 when {
186 isInitial -> INITIAL
187 index == 0 -> RECEIVER
188 else -> OTHER
189 }
190
191 @Suppress("UNCHECKED_CAST") send(values[0] as T1, values[1] as T2, updateFrom)
192 initialDispatched.complete(Unit)
193 }
194 }
195 }
196 }
197
198 /**
199 * Used to indicate which Flow emission triggered the transform block in [combineWithoutBatching].
200 *
201 * @see combineWithoutBatching
202 */
203 internal enum class CombineSource {
204 INITIAL,
205 RECEIVER,
206 OTHER,
207 }
208