1 /*
<lambda>null2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.annotation.VisibleForTesting
20 import kotlinx.coroutines.CoroutineScope
21 import kotlinx.coroutines.CoroutineStart
22 import kotlinx.coroutines.channels.BufferOverflow
23 import kotlinx.coroutines.channels.Channel
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.MutableSharedFlow
26 import kotlinx.coroutines.flow.flow
27 import kotlinx.coroutines.flow.onSubscription
28 import kotlinx.coroutines.flow.takeWhile
29 import kotlinx.coroutines.flow.withIndex
30 import kotlinx.coroutines.launch
31 import kotlinx.coroutines.sync.Mutex
32 import kotlinx.coroutines.sync.withLock
33 
34 /**
35  * An intermediate flow producer that flattens previous page events and gives any new downstream
36  * just those events instead of the full history.
37  */
38 internal class CachedPageEventFlow<T : Any>(src: Flow<PageEvent<T>>, scope: CoroutineScope) {
39     private val pageController = FlattenedPageController<T>()
40 
41     /**
42      * Shared flow for downstreams where we dispatch each event coming from upstream. This only has
43      * reply = 1 so it does not keep the previous events. Meanwhile, it still buffers them for
44      * active subscribers. A final `null` value is emitted as the end of stream message once the job
45      * is complete.
46      */
47     private val mutableSharedSrc =
48         MutableSharedFlow<IndexedValue<PageEvent<T>>?>(
49             replay = 1,
50             extraBufferCapacity = Channel.UNLIMITED,
51             onBufferOverflow = BufferOverflow.SUSPEND
52         )
53 
54     /**
55      * Shared flow used for downstream which also sends the history. Each downstream connects to
56      * this where it first receives a history event and then any other event that was emitted by the
57      * upstream.
58      */
59     private val sharedForDownstream =
60         mutableSharedSrc.onSubscription {
61             val history = pageController.getStateAsEvents()
62             // start the job if it has not started yet. We do this after capturing the history so
63             // that
64             // the first subscriber does not receive any history.
65             job.start()
66             history.forEach { emit(it) }
67         }
68 
69     /** The actual job that collects the upstream. */
70     private val job =
71         scope
72             .launch(start = CoroutineStart.LAZY) {
73                 src.withIndex().collect {
74                     mutableSharedSrc.emit(it)
75                     pageController.record(it)
76                 }
77             }
78             .also {
79                 it.invokeOnCompletion {
80                     // Emit a final `null` message to the mutable shared flow.
81                     // Even though, this tryEmit might technically fail, it shouldn't because we
82                     // have
83                     // unlimited buffer in the shared flow.
84                     mutableSharedSrc.tryEmit(null)
85                 }
86             }
87 
88     fun close() {
89         job.cancel()
90     }
91 
92     val downstreamFlow = flow {
93         // track max event index we've seen to avoid race condition between history and the shared
94         // stream
95         var maxEventIndex = Int.MIN_VALUE
96         sharedForDownstream
97             .takeWhile {
98                 // shared flow cannot finish hence we have a special marker to finish it
99                 it != null
100             }
101             .collect { indexedValue ->
102                 // we take until null so this cannot be null
103                 if (indexedValue!!.index > maxEventIndex) {
104                     emit(indexedValue.value)
105                     maxEventIndex = indexedValue.index
106                 }
107             }
108     }
109 
110     /**
111      * Returns cached data as PageEvent.Insert. Null if cached data is empty (for example on initial
112      * refresh).
113      */
114     internal fun getCachedEvent(): PageEvent.Insert<T>? = pageController.getCachedEvent()
115 }
116 
117 private class FlattenedPageController<T : Any> {
118     private val list = FlattenedPageEventStorage<T>()
119     private val lock = Mutex()
120     private var maxEventIndex = -1
121 
122     /** Record the event. */
recordnull123     suspend fun record(event: IndexedValue<PageEvent<T>>) {
124         lock.withLock {
125             maxEventIndex = event.index
126             list.add(event.value)
127         }
128     }
129 
130     /** Create a list of events that represents the current state of the list. */
getStateAsEventsnull131     suspend fun getStateAsEvents(): List<IndexedValue<PageEvent<T>>> {
132         return lock.withLock {
133             // condensed events to bring downstream up to the current state
134             val catchupEvents = list.getAsEvents()
135             val startEventIndex = maxEventIndex - catchupEvents.size + 1
136             catchupEvents.mapIndexed { index, pageEvent ->
137                 IndexedValue(index = startEventIndex + index, value = pageEvent)
138             }
139         }
140     }
141 
getCachedEventnull142     fun getCachedEvent(): PageEvent.Insert<T>? =
143         list.getAsEvents().firstOrNull()?.let {
144             if (it is PageEvent.Insert && it.loadType == LoadType.REFRESH) it else null
145         }
146 }
147 
148 /**
149  * Keeps a list of page events and can dispatch them at once as PageEvent instead of multiple
150  * events.
151  *
152  * There is no synchronization in this code so it should be used with locks around if necessary.
153  */
154 @VisibleForTesting
155 internal class FlattenedPageEventStorage<T : Any> {
156     private var placeholdersBefore: Int = 0
157     private var placeholdersAfter: Int = 0
158     private val pages = ArrayDeque<TransformablePage<T>>()
159 
160     /**
161      * Note - this is initialized without remote state, since we don't know if we have remote data
162      * once we start getting events. This is fine, since downstream needs to handle this anyway -
163      * remote state being added after initial, empty, PagingData.
164      */
165     private val sourceStates = MutableLoadStateCollection()
166     private var mediatorStates: LoadStates? = null
167 
168     /**
169      * Tracks if we ever received an event from upstream to prevent sending the initial IDLE state
170      * to new downstream subscribers.
171      */
172     private var receivedFirstEvent: Boolean = false
173 
addnull174     fun add(event: PageEvent<T>) {
175         receivedFirstEvent = true
176         when (event) {
177             is PageEvent.Insert<T> -> handleInsert(event)
178             is PageEvent.Drop<T> -> handlePageDrop(event)
179             is PageEvent.LoadStateUpdate<T> -> handleLoadStateUpdate(event)
180             is PageEvent.StaticList -> handleStaticList(event)
181         }
182     }
183 
handlePageDropnull184     private fun handlePageDrop(event: PageEvent.Drop<T>) {
185         // TODO: include state in drop event for simplicity, instead of reconstructing behavior.
186         //  This allows upstream to control how drop affects states (e.g. letting drop affect both
187         //  remote and local)
188         sourceStates.set(event.loadType, LoadState.NotLoading.Incomplete)
189 
190         when (event.loadType) {
191             LoadType.PREPEND -> {
192                 placeholdersBefore = event.placeholdersRemaining
193                 repeat(event.pageCount) { pages.removeFirst() }
194             }
195             LoadType.APPEND -> {
196                 placeholdersAfter = event.placeholdersRemaining
197                 repeat(event.pageCount) { pages.removeLast() }
198             }
199             else -> throw IllegalArgumentException("Page drop type must be prepend or append")
200         }
201     }
202 
handleInsertnull203     private fun handleInsert(event: PageEvent.Insert<T>) {
204         sourceStates.set(event.sourceLoadStates)
205         mediatorStates = event.mediatorLoadStates
206 
207         when (event.loadType) {
208             LoadType.REFRESH -> {
209                 pages.clear()
210                 placeholdersAfter = event.placeholdersAfter
211                 placeholdersBefore = event.placeholdersBefore
212                 pages.addAll(event.pages)
213             }
214             LoadType.PREPEND -> {
215                 placeholdersBefore = event.placeholdersBefore
216                 (event.pages.size - 1 downTo 0).forEach { pages.addFirst(event.pages[it]) }
217             }
218             LoadType.APPEND -> {
219                 placeholdersAfter = event.placeholdersAfter
220                 pages.addAll(event.pages)
221             }
222         }
223     }
224 
handleLoadStateUpdatenull225     private fun handleLoadStateUpdate(event: PageEvent.LoadStateUpdate<T>) {
226         sourceStates.set(event.source)
227         mediatorStates = event.mediator
228     }
229 
handleStaticListnull230     private fun handleStaticList(event: PageEvent.StaticList<T>) {
231         if (event.sourceLoadStates != null) {
232             sourceStates.set(event.sourceLoadStates)
233         }
234 
235         if (event.mediatorLoadStates != null) {
236             mediatorStates = event.mediatorLoadStates
237         }
238 
239         pages.clear()
240         placeholdersAfter = 0
241         placeholdersBefore = 0
242         pages.add(TransformablePage(originalPageOffset = 0, data = event.data))
243     }
244 
getAsEventsnull245     fun getAsEvents(): List<PageEvent<T>> {
246         if (!receivedFirstEvent) {
247             return emptyList()
248         }
249         val events = mutableListOf<PageEvent<T>>()
250         val source = sourceStates.snapshot()
251         if (pages.isNotEmpty()) {
252             events.add(
253                 PageEvent.Insert.Refresh(
254                     pages = pages.toList(),
255                     placeholdersBefore = placeholdersBefore,
256                     placeholdersAfter = placeholdersAfter,
257                     sourceLoadStates = source,
258                     mediatorLoadStates = mediatorStates
259                 )
260             )
261         } else {
262             events.add(PageEvent.LoadStateUpdate(source = source, mediator = mediatorStates))
263         }
264 
265         return events
266     }
267 }
268