1 /*
<lambda>null2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:RestrictTo(RestrictTo.Scope.LIBRARY)
18 
19 package androidx.paging
20 
21 import androidx.annotation.RestrictTo
22 import androidx.paging.CombineSource.INITIAL
23 import androidx.paging.CombineSource.OTHER
24 import androidx.paging.CombineSource.RECEIVER
25 import androidx.paging.internal.AtomicInt
26 import kotlinx.coroutines.CompletableDeferred
27 import kotlinx.coroutines.Job
28 import kotlinx.coroutines.channels.SendChannel
29 import kotlinx.coroutines.flow.Flow
30 import kotlinx.coroutines.flow.FlowCollector
31 import kotlinx.coroutines.flow.collectLatest
32 import kotlinx.coroutines.flow.emitAll
33 import kotlinx.coroutines.flow.flow
34 import kotlinx.coroutines.launch
35 import kotlinx.coroutines.sync.Mutex
36 import kotlinx.coroutines.sync.withLock
37 import kotlinx.coroutines.yield
38 
39 /**
40  * This File includes custom flow operators that we implement to avoid using experimental APIs from
41  * coroutines. Eventually, this file should be deleted once those APIs become stable.
42  */
43 private val NULL = Any()
44 
45 /** Temporary `scan` operator on Flow without experimental APIs. */
46 internal fun <T, R> Flow<T>.simpleScan(
47     initial: R,
48     operation: suspend (accumulator: R, value: T) -> R
49 ): Flow<R> = flow {
50     var accumulator: R = initial
51     emit(accumulator)
52     collect { value ->
53         accumulator = operation(accumulator, value)
54         emit(accumulator)
55     }
56 }
57 
58 /** Temporary `runningReduce` operator on Flow without experimental APIs. */
simpleRunningReducenull59 internal fun <T> Flow<T>.simpleRunningReduce(
60     operation: suspend (accumulator: T, value: T) -> T
61 ): Flow<T> = flow {
62     var accumulator: Any? = NULL
63     collect { value ->
64         accumulator =
65             if (accumulator === NULL) {
66                 value
67             } else {
68                 @Suppress("UNCHECKED_CAST") operation(accumulator as T, value)
69             }
70         @Suppress("UNCHECKED_CAST") emit(accumulator as T)
71     }
72 }
73 
74 /** This is a similar implementation to transformLatest using a channel Flow. */
simpleTransformLatestnull75 internal fun <T, R> Flow<T>.simpleTransformLatest(
76     transform: suspend FlowCollector<R>.(value: T) -> Unit
77 ): Flow<R> = simpleChannelFlow {
78     val origin = this@simpleTransformLatest
79     val collector = ChannelFlowCollector(this@simpleChannelFlow)
80     origin.collectLatest { value -> collector.transform(value) }
81 }
82 
83 /** flatMapLatest without experimental APIs */
simpleFlatMapLatestnull84 internal inline fun <T, R> Flow<T>.simpleFlatMapLatest(
85     crossinline transform: suspend (value: T) -> Flow<R>
86 ): Flow<R> = simpleTransformLatest { emitAll(transform(it)) }
87 
88 /** mapLatest without experimental APIs */
simpleMapLatestnull89 internal inline fun <T, R> Flow<T>.simpleMapLatest(
90     crossinline transform: suspend (value: T) -> R
91 ): Flow<R> = simpleTransformLatest { emit(transform(it)) }
92 
93 internal class ChannelFlowCollector<T>(val channel: SendChannel<T>) : FlowCollector<T> {
emitnull94     override suspend fun emit(value: T) {
95         channel.send(value)
96     }
97 }
98 
99 /**
100  * Similar to [kotlinx.coroutines.flow.combine], except it never batches reads from its Flows, so
101  * [transform] is always guaranteed to get called for every emission from either Flow after the
102  * initial call (which waits for the first emission from both Flows).
103  *
104  * The emissions for both Flows are also guaranteed to get buffered, so if one Flow emits multiple
105  * times before the other does, [transform] will get called for each emission from the first Flow
106  * instead of just once with the latest values.
107  *
108  * @param transform The transform to apply on each update. This is first called after awaiting an
109  *   initial emission from both Flows, and then is guaranteed to be called for every emission from
110  *   either Flow.
111  *
112  * For convenience, [CombineSource] is also passed to the transform, which indicates the origin of
113  * the update with the following possible values:
114  * * [INITIAL]: Initial emission from both Flows
115  * * [RECEIVER]: Triggered by new emission from receiver
116  * * [OTHER]: Triggered by new emission from [otherFlow]
117  */
combineWithoutBatchingnull118 internal suspend inline fun <T1, T2, R> Flow<T1>.combineWithoutBatching(
119     otherFlow: Flow<T2>,
120     crossinline transform: suspend (T1, T2, updateFrom: CombineSource) -> R,
121 ): Flow<R> {
122     return simpleChannelFlow {
123         val incompleteFlows = AtomicInt(2)
124         val unbatchedFlowCombiner =
125             UnbatchedFlowCombiner<T1, T2> { t1, t2, updateFrom ->
126                 send(transform(t1, t2, updateFrom))
127             }
128         val parentJob = Job()
129         arrayOf(this@combineWithoutBatching, otherFlow).forEachIndexed { index, flow ->
130             launch(parentJob) {
131                 try {
132                     flow.collect { value ->
133                         unbatchedFlowCombiner.onNext(index, value)
134 
135                         // Make this more fair, giving the other flow a chance to emit.
136                         yield()
137                     }
138                 } finally {
139                     if (incompleteFlows.decrementAndGet() == 0) {
140                         close()
141                     }
142                 }
143             }
144         }
145 
146         awaitClose { parentJob.cancel() }
147     }
148 }
149 
150 /**
151  * Helper class for [UnbatchedFlowCombiner], which handles dispatching the combined values in the
152  * correct order, and with [CombineSource].
153  *
154  * NOTE: This implementation relies on the fact that [onNext] is called in-order for emissions from
155  * the same Flow. This means that concurrently calling [onNext] with the same index will not work.
156  *
157  * @see combineWithoutBatching
158  */
159 internal class UnbatchedFlowCombiner<T1, T2>(
160     private val send: suspend (t1: T1, t2: T2, updateFrom: CombineSource) -> Unit
161 ) {
162     private val initialDispatched = CompletableDeferred<Unit>()
163     private val lock = Mutex()
<lambda>null164     private val valueReceived = Array(2) { CompletableDeferred<Unit>() }
<lambda>null165     private val values = Array<Any?>(2) { NULL }
166 
onNextnull167     suspend fun onNext(index: Int, value: Any?) {
168         // Allow the first value to dispatch immediately, but for subsequent values, we should
169         // wait until the other flow emits, so that we don't overwrite the previous value.
170         if (valueReceived[index].isCompleted) {
171             // NOTE: We use a separate Completable here because just awaiting
172             // valueReceived[1 - index] could potentially allow multiple calls to onNext from the
173             // same Flow to trigger out of order.
174             initialDispatched.await()
175         } else {
176             valueReceived[index].complete(Unit)
177         }
178 
179         lock.withLock {
180             val isInitial = values.any { it === NULL }
181             values[index] = value
182 
183             if (values.none { it === NULL }) {
184                 val updateFrom =
185                     when {
186                         isInitial -> INITIAL
187                         index == 0 -> RECEIVER
188                         else -> OTHER
189                     }
190 
191                 @Suppress("UNCHECKED_CAST") send(values[0] as T1, values[1] as T2, updateFrom)
192                 initialDispatched.complete(Unit)
193             }
194         }
195     }
196 }
197 
198 /**
199  * Used to indicate which Flow emission triggered the transform block in [combineWithoutBatching].
200  *
201  * @see combineWithoutBatching
202  */
203 internal enum class CombineSource {
204     INITIAL,
205     RECEIVER,
206     OTHER,
207 }
208