1 /* <lambda>null2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.annotation.VisibleForTesting 20 import androidx.paging.CombineSource.RECEIVER 21 import androidx.paging.PageEvent.Drop 22 import androidx.paging.PageEvent.Insert 23 import androidx.paging.PageEvent.LoadStateUpdate 24 import androidx.paging.RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH 25 import androidx.paging.internal.BUGANIZER_URL 26 import kotlinx.coroutines.Job 27 import kotlinx.coroutines.flow.Flow 28 import kotlinx.coroutines.flow.filterNotNull 29 import kotlinx.coroutines.flow.onEach 30 import kotlinx.coroutines.flow.onStart 31 32 internal class PageFetcher<Key : Any, Value : Any>( 33 private val pagingSourceFactory: suspend () -> PagingSource<Key, Value>, 34 private val initialKey: Key?, 35 private val config: PagingConfig, 36 @OptIn(ExperimentalPagingApi::class) remoteMediator: RemoteMediator<Key, Value>? = null 37 ) { 38 /** 39 * Channel of refresh signals that would trigger a new instance of [PageFetcherSnapshot]. 40 * Signals sent to this channel should be `true` if a remote REFRESH load should be triggered, 41 * `false` otherwise. 42 * 43 * NOTE: This channel is conflated, which means it has a buffer size of 1, and will always 44 * broadcast the latest value received. 45 */ 46 private val refreshEvents = ConflatedEventBus<Boolean>() 47 48 private val retryEvents = ConflatedEventBus<Unit>() 49 50 // The object built by paging builder can maintain the scope so that on rotation we don't stop 51 // the paging. 52 val flow: Flow<PagingData<Value>> = simpleChannelFlow { 53 @OptIn(ExperimentalPagingApi::class) 54 val remoteMediatorAccessor = remoteMediator?.let { RemoteMediatorAccessor(this, it) } 55 56 refreshEvents.flow 57 .onStart { 58 @OptIn(ExperimentalPagingApi::class) 59 emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH) 60 } 61 .simpleScan(null) { 62 previousGeneration: GenerationInfo<Key, Value>?, 63 triggerRemoteRefresh: Boolean -> 64 // Enable refresh if this is the first generation and we have LAUNCH_INITIAL_REFRESH 65 // or if this generation was started due to [refresh] being invoked. 66 if (triggerRemoteRefresh) { 67 remoteMediatorAccessor?.allowRefresh() 68 } 69 70 val pagingSource = 71 generateNewPagingSource( 72 previousPagingSource = previousGeneration?.snapshot?.pagingSource 73 ) 74 75 var previousPagingState = previousGeneration?.snapshot?.currentPagingState() 76 77 // If cached PagingState had pages loaded, but previous generation didn't, use 78 // the cached PagingState to handle cases where invalidation happens too quickly, 79 // so that getRefreshKey and remote refresh at least have some data to work with. 80 if ( 81 previousPagingState?.pages.isNullOrEmpty() && 82 previousGeneration?.state?.pages?.isNotEmpty() == true 83 ) { 84 previousPagingState = previousGeneration.state 85 } 86 87 // If previous generation was invalidated before anchorPosition was established, 88 // re-use last PagingState that successfully loaded pages and has an anchorPosition. 89 // This prevents rapid invalidation from deleting the anchorPosition if the 90 // previous generation didn't have time to load before getting invalidated. 91 if ( 92 previousPagingState?.anchorPosition == null && 93 previousGeneration?.state?.anchorPosition != null 94 ) { 95 previousPagingState = previousGeneration.state 96 } 97 98 val initialKey: Key? = 99 when (previousPagingState) { 100 null -> initialKey 101 else -> 102 pagingSource.getRefreshKey(previousPagingState).also { 103 log(DEBUG) { 104 "Refresh key $it returned from PagingSource $pagingSource" 105 } 106 } 107 } 108 109 previousGeneration?.snapshot?.close() 110 previousGeneration?.job?.cancel() 111 112 GenerationInfo( 113 snapshot = 114 PageFetcherSnapshot( 115 initialKey = initialKey, 116 pagingSource = pagingSource, 117 config = config, 118 retryFlow = retryEvents.flow, 119 // Only trigger remote refresh on refresh signals that do not originate 120 // from 121 // initialization or PagingSource invalidation. 122 remoteMediatorConnection = remoteMediatorAccessor, 123 jumpCallback = this@PageFetcher::refresh, 124 previousPagingState = previousPagingState, 125 ), 126 state = previousPagingState, 127 job = Job(), 128 ) 129 } 130 .filterNotNull() 131 .simpleMapLatest { generation -> 132 val downstreamFlow = 133 generation.snapshot 134 .injectRemoteEvents(generation.job, remoteMediatorAccessor) 135 .onEach { log(VERBOSE) { "Sent $it" } } 136 137 PagingData( 138 flow = downstreamFlow, 139 uiReceiver = PagerUiReceiver(retryEvents), 140 hintReceiver = PagerHintReceiver(generation.snapshot) 141 ) 142 } 143 .collect(::send) 144 } 145 146 fun refresh() { 147 refreshEvents.send(true) 148 } 149 150 private fun invalidate() { 151 refreshEvents.send(false) 152 } 153 154 private fun PageFetcherSnapshot<Key, Value>.injectRemoteEvents( 155 job: Job, 156 accessor: RemoteMediatorAccessor<Key, Value>? 157 ): Flow<PageEvent<Value>> { 158 if (accessor == null) return pageEventFlow 159 160 val sourceStates = MutableLoadStateCollection() 161 // We wrap this in a cancelableChannelFlow to allow co-operative cancellation, otherwise 162 // RemoteMediatorAccessor's StateFlow will keep this Flow running on old generations. 163 return cancelableChannelFlow(job) { 164 accessor.state 165 // Note: Combine waits for PageFetcherSnapshot to emit an event before sending 166 // anything. This avoids sending the initial idle state, since it would cause 167 // load state flickering on rapid invalidation. 168 .combineWithoutBatching(pageEventFlow) { remoteState, sourceEvent, updateFrom -> 169 if (updateFrom != RECEIVER) { 170 when (sourceEvent) { 171 is Insert -> { 172 sourceStates.set(sourceEvent.sourceLoadStates) 173 @Suppress("DATA_CLASS_INVISIBLE_COPY_USAGE_WARNING") 174 sourceEvent.copy( 175 sourceLoadStates = sourceEvent.sourceLoadStates, 176 mediatorLoadStates = remoteState, 177 ) 178 } 179 is Drop -> { 180 sourceStates.set( 181 type = sourceEvent.loadType, 182 state = LoadState.NotLoading.Incomplete 183 ) 184 sourceEvent 185 } 186 is LoadStateUpdate -> { 187 sourceStates.set(sourceEvent.source) 188 LoadStateUpdate( 189 source = sourceEvent.source, 190 mediator = remoteState, 191 ) 192 } 193 is PageEvent.StaticList -> { 194 throw IllegalStateException( 195 """Paging generated an event to display a static list that 196 | originated from a paginated source. If you see this 197 | exception, it is most likely a bug in the library. 198 | Please file a bug so we can fix it at: 199 | $BUGANIZER_URL""" 200 .trimMargin() 201 ) 202 } 203 } 204 } else { 205 LoadStateUpdate( 206 source = sourceStates.snapshot(), 207 mediator = remoteState, 208 ) 209 } 210 } 211 .collect { send(it) } 212 } 213 } 214 215 private suspend fun generateNewPagingSource( 216 previousPagingSource: PagingSource<Key, Value>? 217 ): PagingSource<Key, Value> { 218 val pagingSource = pagingSourceFactory() 219 if (pagingSource is CompatLegacyPagingSource) { 220 pagingSource.setPageSize(config.pageSize) 221 } 222 // Ensure pagingSourceFactory produces a new instance of PagingSource. 223 check(pagingSource !== previousPagingSource) { 224 """ 225 An instance of PagingSource was re-used when Pager expected to create a new 226 instance. Ensure that the pagingSourceFactory passed to Pager always returns a 227 new instance of PagingSource. 228 """ 229 .trimIndent() 230 } 231 232 // Hook up refresh signals from PagingSource. 233 pagingSource.registerInvalidatedCallback(::invalidate) 234 previousPagingSource?.unregisterInvalidatedCallback(::invalidate) 235 previousPagingSource?.invalidate() // Note: Invalidate is idempotent. 236 log(DEBUG) { "Generated new PagingSource $pagingSource" } 237 238 return pagingSource 239 } 240 241 inner class PagerUiReceiver(private val retryEventBus: ConflatedEventBus<Unit>) : UiReceiver { 242 override fun retry() { 243 retryEventBus.send(Unit) 244 } 245 246 override fun refresh() = this@PageFetcher.refresh() 247 } 248 249 inner class PagerHintReceiver<Key : Any, Value : Any> 250 constructor( 251 @get:VisibleForTesting internal val pageFetcherSnapshot: PageFetcherSnapshot<Key, Value>, 252 ) : HintReceiver { 253 254 override fun accessHint(viewportHint: ViewportHint) { 255 pageFetcherSnapshot.accessHint(viewportHint) 256 } 257 } 258 259 private class GenerationInfo<Key : Any, Value : Any>( 260 val snapshot: PageFetcherSnapshot<Key, Value>, 261 val state: PagingState<Key, Value>?, 262 val job: Job, 263 ) 264 } 265