1 /* <lambda>null2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.annotation.VisibleForTesting 20 import kotlinx.coroutines.CoroutineScope 21 import kotlinx.coroutines.CoroutineStart 22 import kotlinx.coroutines.channels.BufferOverflow 23 import kotlinx.coroutines.channels.Channel 24 import kotlinx.coroutines.flow.Flow 25 import kotlinx.coroutines.flow.MutableSharedFlow 26 import kotlinx.coroutines.flow.flow 27 import kotlinx.coroutines.flow.onSubscription 28 import kotlinx.coroutines.flow.takeWhile 29 import kotlinx.coroutines.flow.withIndex 30 import kotlinx.coroutines.launch 31 import kotlinx.coroutines.sync.Mutex 32 import kotlinx.coroutines.sync.withLock 33 34 /** 35 * An intermediate flow producer that flattens previous page events and gives any new downstream 36 * just those events instead of the full history. 37 */ 38 internal class CachedPageEventFlow<T : Any>(src: Flow<PageEvent<T>>, scope: CoroutineScope) { 39 private val pageController = FlattenedPageController<T>() 40 41 /** 42 * Shared flow for downstreams where we dispatch each event coming from upstream. This only has 43 * reply = 1 so it does not keep the previous events. Meanwhile, it still buffers them for 44 * active subscribers. A final `null` value is emitted as the end of stream message once the job 45 * is complete. 46 */ 47 private val mutableSharedSrc = 48 MutableSharedFlow<IndexedValue<PageEvent<T>>?>( 49 replay = 1, 50 extraBufferCapacity = Channel.UNLIMITED, 51 onBufferOverflow = BufferOverflow.SUSPEND 52 ) 53 54 /** 55 * Shared flow used for downstream which also sends the history. Each downstream connects to 56 * this where it first receives a history event and then any other event that was emitted by the 57 * upstream. 58 */ 59 private val sharedForDownstream = 60 mutableSharedSrc.onSubscription { 61 val history = pageController.getStateAsEvents() 62 // start the job if it has not started yet. We do this after capturing the history so 63 // that 64 // the first subscriber does not receive any history. 65 job.start() 66 history.forEach { emit(it) } 67 } 68 69 /** The actual job that collects the upstream. */ 70 private val job = 71 scope 72 .launch(start = CoroutineStart.LAZY) { 73 src.withIndex().collect { 74 mutableSharedSrc.emit(it) 75 pageController.record(it) 76 } 77 } 78 .also { 79 it.invokeOnCompletion { 80 // Emit a final `null` message to the mutable shared flow. 81 // Even though, this tryEmit might technically fail, it shouldn't because we 82 // have 83 // unlimited buffer in the shared flow. 84 mutableSharedSrc.tryEmit(null) 85 } 86 } 87 88 fun close() { 89 job.cancel() 90 } 91 92 val downstreamFlow = flow { 93 // track max event index we've seen to avoid race condition between history and the shared 94 // stream 95 var maxEventIndex = Int.MIN_VALUE 96 sharedForDownstream 97 .takeWhile { 98 // shared flow cannot finish hence we have a special marker to finish it 99 it != null 100 } 101 .collect { indexedValue -> 102 // we take until null so this cannot be null 103 if (indexedValue!!.index > maxEventIndex) { 104 emit(indexedValue.value) 105 maxEventIndex = indexedValue.index 106 } 107 } 108 } 109 110 /** 111 * Returns cached data as PageEvent.Insert. Null if cached data is empty (for example on initial 112 * refresh). 113 */ 114 internal fun getCachedEvent(): PageEvent.Insert<T>? = pageController.getCachedEvent() 115 } 116 117 private class FlattenedPageController<T : Any> { 118 private val list = FlattenedPageEventStorage<T>() 119 private val lock = Mutex() 120 private var maxEventIndex = -1 121 122 /** Record the event. */ recordnull123 suspend fun record(event: IndexedValue<PageEvent<T>>) { 124 lock.withLock { 125 maxEventIndex = event.index 126 list.add(event.value) 127 } 128 } 129 130 /** Create a list of events that represents the current state of the list. */ getStateAsEventsnull131 suspend fun getStateAsEvents(): List<IndexedValue<PageEvent<T>>> { 132 return lock.withLock { 133 // condensed events to bring downstream up to the current state 134 val catchupEvents = list.getAsEvents() 135 val startEventIndex = maxEventIndex - catchupEvents.size + 1 136 catchupEvents.mapIndexed { index, pageEvent -> 137 IndexedValue(index = startEventIndex + index, value = pageEvent) 138 } 139 } 140 } 141 getCachedEventnull142 fun getCachedEvent(): PageEvent.Insert<T>? = 143 list.getAsEvents().firstOrNull()?.let { 144 if (it is PageEvent.Insert && it.loadType == LoadType.REFRESH) it else null 145 } 146 } 147 148 /** 149 * Keeps a list of page events and can dispatch them at once as PageEvent instead of multiple 150 * events. 151 * 152 * There is no synchronization in this code so it should be used with locks around if necessary. 153 */ 154 @VisibleForTesting 155 internal class FlattenedPageEventStorage<T : Any> { 156 private var placeholdersBefore: Int = 0 157 private var placeholdersAfter: Int = 0 158 private val pages = ArrayDeque<TransformablePage<T>>() 159 160 /** 161 * Note - this is initialized without remote state, since we don't know if we have remote data 162 * once we start getting events. This is fine, since downstream needs to handle this anyway - 163 * remote state being added after initial, empty, PagingData. 164 */ 165 private val sourceStates = MutableLoadStateCollection() 166 private var mediatorStates: LoadStates? = null 167 168 /** 169 * Tracks if we ever received an event from upstream to prevent sending the initial IDLE state 170 * to new downstream subscribers. 171 */ 172 private var receivedFirstEvent: Boolean = false 173 addnull174 fun add(event: PageEvent<T>) { 175 receivedFirstEvent = true 176 when (event) { 177 is PageEvent.Insert<T> -> handleInsert(event) 178 is PageEvent.Drop<T> -> handlePageDrop(event) 179 is PageEvent.LoadStateUpdate<T> -> handleLoadStateUpdate(event) 180 is PageEvent.StaticList -> handleStaticList(event) 181 } 182 } 183 handlePageDropnull184 private fun handlePageDrop(event: PageEvent.Drop<T>) { 185 // TODO: include state in drop event for simplicity, instead of reconstructing behavior. 186 // This allows upstream to control how drop affects states (e.g. letting drop affect both 187 // remote and local) 188 sourceStates.set(event.loadType, LoadState.NotLoading.Incomplete) 189 190 when (event.loadType) { 191 LoadType.PREPEND -> { 192 placeholdersBefore = event.placeholdersRemaining 193 repeat(event.pageCount) { pages.removeFirst() } 194 } 195 LoadType.APPEND -> { 196 placeholdersAfter = event.placeholdersRemaining 197 repeat(event.pageCount) { pages.removeLast() } 198 } 199 else -> throw IllegalArgumentException("Page drop type must be prepend or append") 200 } 201 } 202 handleInsertnull203 private fun handleInsert(event: PageEvent.Insert<T>) { 204 sourceStates.set(event.sourceLoadStates) 205 mediatorStates = event.mediatorLoadStates 206 207 when (event.loadType) { 208 LoadType.REFRESH -> { 209 pages.clear() 210 placeholdersAfter = event.placeholdersAfter 211 placeholdersBefore = event.placeholdersBefore 212 pages.addAll(event.pages) 213 } 214 LoadType.PREPEND -> { 215 placeholdersBefore = event.placeholdersBefore 216 (event.pages.size - 1 downTo 0).forEach { pages.addFirst(event.pages[it]) } 217 } 218 LoadType.APPEND -> { 219 placeholdersAfter = event.placeholdersAfter 220 pages.addAll(event.pages) 221 } 222 } 223 } 224 handleLoadStateUpdatenull225 private fun handleLoadStateUpdate(event: PageEvent.LoadStateUpdate<T>) { 226 sourceStates.set(event.source) 227 mediatorStates = event.mediator 228 } 229 handleStaticListnull230 private fun handleStaticList(event: PageEvent.StaticList<T>) { 231 if (event.sourceLoadStates != null) { 232 sourceStates.set(event.sourceLoadStates) 233 } 234 235 if (event.mediatorLoadStates != null) { 236 mediatorStates = event.mediatorLoadStates 237 } 238 239 pages.clear() 240 placeholdersAfter = 0 241 placeholdersBefore = 0 242 pages.add(TransformablePage(originalPageOffset = 0, data = event.data)) 243 } 244 getAsEventsnull245 fun getAsEvents(): List<PageEvent<T>> { 246 if (!receivedFirstEvent) { 247 return emptyList() 248 } 249 val events = mutableListOf<PageEvent<T>>() 250 val source = sourceStates.snapshot() 251 if (pages.isNotEmpty()) { 252 events.add( 253 PageEvent.Insert.Refresh( 254 pages = pages.toList(), 255 placeholdersBefore = placeholdersBefore, 256 placeholdersAfter = placeholdersAfter, 257 sourceLoadStates = source, 258 mediatorLoadStates = mediatorStates 259 ) 260 ) 261 } else { 262 events.add(PageEvent.LoadStateUpdate(source = source, mediator = mediatorStates)) 263 } 264 265 return events 266 } 267 } 268