1 /*
<lambda>null2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 @file:RestrictTo(RestrictTo.Scope.LIBRARY)
18
19 package androidx.paging
20
21 import androidx.annotation.RestrictTo
22 import androidx.annotation.VisibleForTesting
23 import androidx.paging.LoadState.Error
24 import androidx.paging.LoadState.Loading
25 import androidx.paging.LoadState.NotLoading
26 import androidx.paging.LoadType.APPEND
27 import androidx.paging.LoadType.PREPEND
28 import androidx.paging.LoadType.REFRESH
29 import androidx.paging.PageEvent.LoadStateUpdate
30 import androidx.paging.PagingSource.LoadParams
31 import androidx.paging.PagingSource.LoadResult
32 import androidx.paging.PagingSource.LoadResult.Page
33 import androidx.paging.PagingSource.LoadResult.Page.Companion.COUNT_UNDEFINED
34 import androidx.paging.internal.AtomicBoolean
35 import kotlinx.coroutines.CoroutineScope
36 import kotlinx.coroutines.Job
37 import kotlinx.coroutines.channels.Channel
38 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
39 import kotlinx.coroutines.channels.ClosedSendChannelException
40 import kotlinx.coroutines.flow.Flow
41 import kotlinx.coroutines.flow.conflate
42 import kotlinx.coroutines.flow.consumeAsFlow
43 import kotlinx.coroutines.flow.drop
44 import kotlinx.coroutines.flow.firstOrNull
45 import kotlinx.coroutines.flow.flowOf
46 import kotlinx.coroutines.flow.map
47 import kotlinx.coroutines.flow.merge
48 import kotlinx.coroutines.flow.onStart
49 import kotlinx.coroutines.launch
50
51 /**
52 * Holds a generation of pageable data, a snapshot of data loaded by [PagingSource]. An instance of
53 * [PageFetcherSnapshot] and its corresponding [PageFetcherSnapshotState] should be launched within
54 * a scope that is cancelled when [PagingSource.invalidate] is called.
55 */
56 internal class PageFetcherSnapshot<Key : Any, Value : Any>(
57 internal val initialKey: Key?,
58 internal val pagingSource: PagingSource<Key, Value>,
59 private val config: PagingConfig,
60 private val retryFlow: Flow<Unit>,
61 val remoteMediatorConnection: RemoteMediatorConnection<Key, Value>? = null,
62 private val previousPagingState: PagingState<Key, Value>? = null,
63 private val jumpCallback: () -> Unit = {},
64 ) {
65 init {
<lambda>null66 require(config.jumpThreshold == COUNT_UNDEFINED || pagingSource.jumpingSupported) {
67 "PagingConfig.jumpThreshold was set, but the associated PagingSource has not marked " +
68 "support for jumps by overriding PagingSource.jumpingSupported to true."
69 }
70 }
71
72 private val hintHandler = HintHandler()
73
74 private val pageEventChCollected = AtomicBoolean(false)
75 private val pageEventCh = Channel<PageEvent<Value>>(BUFFERED)
76 private val stateHolder = PageFetcherSnapshotState.Holder<Key, Value>(config = config)
77
78 private val pageEventChannelFlowJob = Job()
79
80 val pageEventFlow: Flow<PageEvent<Value>> =
<lambda>null81 cancelableChannelFlow<PageEvent<Value>>(pageEventChannelFlowJob) {
82 check(pageEventChCollected.compareAndSet(false, true)) {
83 "Attempt to collect twice from pageEventFlow, which is an illegal operation. Did you " +
84 "forget to call Flow<PagingData<*>>.cachedIn(coroutineScope)?"
85 }
86
87 // Start collection on pageEventCh, which the rest of this class uses to send
88 // PageEvents
89 // to this flow.
90 launch {
91 pageEventCh.consumeAsFlow().collect {
92 // Protect against races where a subsequent call to submitData invoked
93 // close(),
94 // but a pageEvent arrives after closing causing ClosedSendChannelException.
95 try {
96 send(it)
97 } catch (e: ClosedSendChannelException) {
98 // Safe to drop PageEvent here, since collection has been cancelled.
99 }
100 }
101 }
102
103 // Wrap collection behind a RendezvousChannel to prevent the RetryChannel from
104 // buffering
105 // retry signals.
106 val retryChannel = Channel<Unit>(Channel.RENDEZVOUS)
107 launch { retryFlow.collect { retryChannel.trySend(it) } }
108
109 // Start collection on retry signals.
110 launch {
111 retryChannel.consumeAsFlow().collect {
112 val (sourceLoadStates, remotePagingState) =
113 stateHolder.withLock { state ->
114 state.sourceLoadStates.snapshot() to
115 state.currentPagingState(hintHandler.lastAccessHint)
116 }
117 // tell remote mediator to retry and it will trigger necessary work / change
118 // its state as necessary.
119 remoteMediatorConnection?.retryFailed(remotePagingState)
120 // change source (local) states
121 sourceLoadStates.forEach { loadType, loadState ->
122 if (loadState !is Error) return@forEach
123
124 // Reset error state before sending hint.
125 if (loadType != REFRESH) {
126 stateHolder.withLock { state -> state.setLoading(loadType) }
127 }
128
129 retryLoadError(
130 loadType = loadType,
131 viewportHint =
132 when (loadType) {
133 // ViewportHint is only used when retrying source PREPEND /
134 // APPEND.
135 REFRESH -> null
136 else ->
137 stateHolder.withLock { state ->
138 state.failedHintsByLoadType[loadType]
139 }
140 }
141 )
142
143 // If retrying REFRESH from PagingSource succeeds, start collection on
144 // ViewportHints for PREPEND / APPEND loads.
145 if (loadType == REFRESH) {
146 val newRefreshState =
147 stateHolder.withLock { state ->
148 state.sourceLoadStates.get(REFRESH)
149 }
150
151 if (newRefreshState !is Error) {
152 startConsumingHints()
153 }
154 }
155 }
156 }
157 }
158
159 // NOTE: We always try to enqueue on init, but this request will only go through if
160 // [RemoteMediatorConnection.refreshEnabled] is `true`. It is important for it to be
161 // done
162 // this way to ensure that we always have a valid [LoadStates] for [PagingSource]
163 // before we
164 // trigger remote load, in case remote emits multiple [LoadStates] before
165 // [PagingSource]
166 // starts, which would cause us to drop remote [LoadStates] emissions since we wait
167 // for
168 // valid events from both.
169 remoteMediatorConnection?.let {
170 val pagingState =
171 previousPagingState
172 ?: stateHolder.withLock { state -> state.currentPagingState(null) }
173 it.requestRefreshIfAllowed(pagingState)
174 }
175
176 // Setup finished, start the initial load even if RemoteMediator throws an error.
177 doInitialLoad()
178
179 // Only start collection on ViewportHints if the initial load succeeded.
180 if (
181 stateHolder.withLock { state -> state.sourceLoadStates.get(REFRESH) } !is Error
182 ) {
183 startConsumingHints()
184 }
185 }
<lambda>null186 .onStart {
187 // Immediately emit the initial load state when creating a new generation to give
188 // PageFetcher a real event from source side as soon as possible. This allows
189 // PageFetcher
190 // to operate on this stream in a way that waits for a real event (for example, by
191 // using
192 // Flow.combine) without the consequence of getting "stuck".
193 emit(LoadStateUpdate(stateHolder.withLock { it.sourceLoadStates.snapshot() }))
194 }
195
196 @Suppress("SuspendFunctionOnCoroutineScope")
retryLoadErrornull197 private suspend fun retryLoadError(loadType: LoadType, viewportHint: ViewportHint?) {
198 when (loadType) {
199 REFRESH -> {
200 doInitialLoad()
201 }
202 else -> {
203 check(viewportHint != null) {
204 "Cannot retry APPEND / PREPEND load on PagingSource without ViewportHint"
205 }
206
207 hintHandler.forceSetHint(loadType, viewportHint)
208 }
209 }
210 }
211
accessHintnull212 fun accessHint(viewportHint: ViewportHint) {
213 hintHandler.processHint(viewportHint)
214 }
215
closenull216 fun close() {
217 pageEventChannelFlowJob.cancel()
218 }
219
currentPagingStatenull220 suspend fun currentPagingState(): PagingState<Key, Value> {
221 return stateHolder.withLock { state ->
222 state.currentPagingState(hintHandler.lastAccessHint)
223 }
224 }
225
startConsumingHintsnull226 private fun CoroutineScope.startConsumingHints() {
227 // Pseudo-tiling via invalidation on jumps.
228 if (config.jumpThreshold != COUNT_UNDEFINED) {
229 launch {
230 val jumpHint =
231 merge(hintHandler.hintFor(APPEND), hintHandler.hintFor(PREPEND)).firstOrNull {
232 hint ->
233 hint.presentedItemsBefore * -1 > config.jumpThreshold ||
234 hint.presentedItemsAfter * -1 > config.jumpThreshold
235 }
236 if (jumpHint != null) {
237 log(DEBUG) { "Jump triggered on PagingSource $pagingSource by $jumpHint" }
238 jumpCallback()
239 }
240 }
241 }
242
243 launch {
244 stateHolder
245 .withLock { state -> state.consumePrependGenerationIdAsFlow() }
246 .collectAsGenerationalViewportHints(PREPEND)
247 }
248
249 launch {
250 stateHolder
251 .withLock { state -> state.consumeAppendGenerationIdAsFlow() }
252 .collectAsGenerationalViewportHints(APPEND)
253 }
254 }
255
256 /**
257 * Maps a [Flow] of generation ids from [PageFetcherSnapshotState] to [ViewportHint]s from
258 * [hintHandler] with back-pressure handling via conflation by prioritizing hints which either
259 * update presenter state or those that would load the most items.
260 *
261 * @param loadType [PREPEND] or [APPEND]
262 */
collectAsGenerationalViewportHintsnull263 private suspend fun Flow<Int>.collectAsGenerationalViewportHints(loadType: LoadType) =
264 simpleFlatMapLatest { generationId ->
265 // Reset state to Idle and setup a new flow for consuming incoming load hints.
266 // Subsequent generationIds are normally triggered by cancellation.
267 stateHolder.withLock { state ->
268 // Skip this generationId of loads if there is no more to load in this
269 // direction. In the case of the terminal page getting dropped, a new
270 // generationId will be sent after load state is updated to Idle.
271 if (state.sourceLoadStates.get(loadType) == NotLoading.Complete) {
272 return@simpleFlatMapLatest flowOf()
273 } else if (state.sourceLoadStates.get(loadType) !is Error) {
274 state.sourceLoadStates.set(loadType, NotLoading.Incomplete)
275 }
276 }
277
278 hintHandler
279 .hintFor(loadType)
280 // Prevent infinite loop when competing PREPEND / APPEND cancel each other
281 .drop(if (generationId == 0) 0 else 1)
282 .map { hint -> GenerationalViewportHint(generationId, hint) }
283 }
nextnull284 .simpleRunningReduce { previous, next ->
285 // Prioritize new hints that would load the maximum number of items.
286 if (next.shouldPrioritizeOver(previous, loadType)) next else previous
287 }
288 .conflate()
generationalHintnull289 .collect { generationalHint -> doLoad(loadType, generationalHint) }
290
loadParamsnull291 private fun loadParams(loadType: LoadType, key: Key?) =
292 LoadParams.create(
293 loadType = loadType,
294 key = key,
295 loadSize = if (loadType == REFRESH) config.initialLoadSize else config.pageSize,
296 placeholdersEnabled = config.enablePlaceholders,
297 )
298
299 private suspend fun doInitialLoad() {
300 stateHolder.withLock { state -> state.setLoading(REFRESH) }
301
302 val params = loadParams(REFRESH, initialKey)
303
304 log(DEBUG) { "Start REFRESH with loadKey $initialKey on $pagingSource" }
305
306 when (val result = pagingSource.load(params)) {
307 is Page<Key, Value> -> {
308 // Atomically update load states + pages while still holding the mutex, otherwise
309 // remote state can race here and lead to confusing load states.
310 val insertApplied =
311 stateHolder.withLock { state ->
312 val insertApplied = state.insert(0, REFRESH, result)
313
314 // Update loadStates which are sent along with this load's Insert PageEvent.
315 state.sourceLoadStates.set(type = REFRESH, state = NotLoading.Incomplete)
316 if (result.prevKey == null) {
317 state.sourceLoadStates.set(type = PREPEND, state = NotLoading.Complete)
318 }
319 if (result.nextKey == null) {
320 state.sourceLoadStates.set(type = APPEND, state = NotLoading.Complete)
321 }
322
323 insertApplied
324 }
325
326 // Send insert event after load state updates, so that endOfPaginationReached is
327 // correctly reflected in the insert event. Note that we only send the event if the
328 // insert was successfully applied in the case of cancellation due to page dropping.
329 if (insertApplied) {
330 log(DEBUG) { loadResultLog(REFRESH, initialKey, result) }
331
332 stateHolder.withLock { state ->
333 with(state) { pageEventCh.send(result.toPageEvent(REFRESH)) }
334 }
335 } else {
336 log(VERBOSE) { loadResultLog(REFRESH, initialKey, null) }
337 }
338
339 // Launch any RemoteMediator boundary calls after applying initial insert.
340 if (remoteMediatorConnection != null) {
341 if (result.prevKey == null || result.nextKey == null) {
342 val pagingState =
343 stateHolder.withLock { state ->
344 state.currentPagingState(hintHandler.lastAccessHint)
345 }
346
347 if (result.prevKey == null) {
348 remoteMediatorConnection.requestLoad(PREPEND, pagingState)
349 }
350
351 if (result.nextKey == null) {
352 remoteMediatorConnection.requestLoad(APPEND, pagingState)
353 }
354 }
355 }
356 }
357 is LoadResult.Error -> {
358 log(VERBOSE) { loadResultLog(REFRESH, initialKey, result) }
359 stateHolder.withLock { state ->
360 val loadState = Error(result.throwable)
361 state.setError(loadType = REFRESH, error = loadState)
362 }
363 }
364 is LoadResult.Invalid -> {
365 log(VERBOSE) { loadResultLog(REFRESH, initialKey, result) }
366 onInvalidLoad()
367 }
368 }
369 }
370
371 // TODO: Consider making this a transform operation which emits PageEvents
doLoadnull372 private suspend fun doLoad(loadType: LoadType, generationalHint: GenerationalViewportHint) {
373 require(loadType != REFRESH) { "Use doInitialLoad for LoadType == REFRESH" }
374
375 // If placeholder counts differ between the hint and PageFetcherSnapshotState, then
376 // assume fetcher is ahead of presenter and account for the difference in itemsLoaded.
377 var itemsLoaded = 0
378 stateHolder.withLock { state ->
379 when (loadType) {
380 PREPEND -> {
381 var firstPageIndex =
382 state.initialPageIndex + generationalHint.hint.originalPageOffsetFirst - 1
383
384 // If the pages before the first page in presenter have been dropped in
385 // fetcher, then we cannot count them towards loadedItems.
386 if (firstPageIndex > state.pages.lastIndex) {
387 itemsLoaded += config.pageSize * (firstPageIndex - state.pages.lastIndex)
388 firstPageIndex = state.pages.lastIndex
389 }
390
391 for (pageIndex in 0..firstPageIndex) {
392 itemsLoaded += state.pages[pageIndex].data.size
393 }
394 }
395 APPEND -> {
396 var lastPageIndex =
397 state.initialPageIndex + generationalHint.hint.originalPageOffsetLast + 1
398
399 // If the pages after the last page in presenter have been dropped in
400 // fetcher, then we cannot count them towards loadedItems.
401 if (lastPageIndex < 0) {
402 itemsLoaded += config.pageSize * -lastPageIndex
403 lastPageIndex = 0
404 }
405
406 for (pageIndex in lastPageIndex..state.pages.lastIndex) {
407 itemsLoaded += state.pages[pageIndex].data.size
408 }
409 }
410 REFRESH -> throw IllegalStateException("Use doInitialLoad for LoadType == REFRESH")
411 }
412 }
413
414 var loadKey: Key? =
415 stateHolder.withLock { state ->
416 state
417 .nextLoadKeyOrNull(
418 loadType,
419 generationalHint.generationId,
420 generationalHint.hint.presentedItemsBeyondAnchor(loadType) + itemsLoaded,
421 )
422 ?.also { state.setLoading(loadType) }
423 }
424
425 // Keep track of whether endOfPaginationReached so we can update LoadState accordingly when
426 // this load loop terminates due to fulfilling prefetchDistance.
427 var endOfPaginationReached = false
428 loop@ while (loadKey != null) {
429 val params = loadParams(loadType, loadKey)
430 log(DEBUG) { "Start $loadType with loadKey $loadKey on $pagingSource" }
431 val result: LoadResult<Key, Value> = pagingSource.load(params)
432 when (result) {
433 is Page<Key, Value> -> {
434 // First, check for common error case where the same key is re-used to load
435 // new pages, often resulting in infinite loops.
436 val nextKey =
437 when (loadType) {
438 PREPEND -> result.prevKey
439 APPEND -> result.nextKey
440 else ->
441 throw IllegalArgumentException(
442 "Use doInitialLoad for LoadType == REFRESH"
443 )
444 }
445
446 check(pagingSource.keyReuseSupported || nextKey != loadKey) {
447 val keyFieldName = if (loadType == PREPEND) "prevKey" else "nextKey"
448 """The same value, $loadKey, was passed as the $keyFieldName in two
449 | sequential Pages loaded from a PagingSource. Re-using load keys in
450 | PagingSource is often an error, and must be explicitly enabled by
451 | overriding PagingSource.keyReuseSupported.
452 """
453 .trimMargin()
454 }
455
456 val insertApplied =
457 stateHolder.withLock { state ->
458 state.insert(generationalHint.generationId, loadType, result)
459 }
460
461 // Break if insert was skipped due to cancellation
462 if (!insertApplied) {
463 log(VERBOSE) { loadResultLog(loadType, loadKey, null) }
464 break@loop
465 }
466
467 log(DEBUG) { loadResultLog(loadType, loadKey, result) }
468
469 itemsLoaded += result.data.size
470
471 // Set endOfPaginationReached to false if no more data to load in current
472 // direction.
473 if (
474 (loadType == PREPEND && result.prevKey == null) ||
475 (loadType == APPEND && result.nextKey == null)
476 ) {
477 endOfPaginationReached = true
478 }
479 }
480 is LoadResult.Error -> {
481 log(VERBOSE) { loadResultLog(loadType, loadKey, result) }
482 stateHolder.withLock { state ->
483 val loadState = Error(result.throwable)
484 state.setError(loadType = loadType, error = loadState)
485
486 // Save the hint for retry on incoming retry signal, typically sent from
487 // user interaction.
488 state.failedHintsByLoadType[loadType] = generationalHint.hint
489 }
490 return
491 }
492 is LoadResult.Invalid -> {
493 log(VERBOSE) { loadResultLog(loadType, loadKey, result) }
494 onInvalidLoad()
495 return
496 }
497 }
498
499 val dropType =
500 when (loadType) {
501 PREPEND -> APPEND
502 else -> PREPEND
503 }
504
505 stateHolder.withLock { state ->
506 state.dropEventOrNull(dropType, generationalHint.hint)?.let { event ->
507 state.drop(event)
508 pageEventCh.send(event)
509 }
510
511 loadKey =
512 state.nextLoadKeyOrNull(
513 loadType,
514 generationalHint.generationId,
515 generationalHint.hint.presentedItemsBeyondAnchor(loadType) + itemsLoaded,
516 )
517
518 // Update load state to success if this is the final load result for this
519 // load hint, and only if we didn't error out.
520 if (loadKey == null && state.sourceLoadStates.get(loadType) !is Error) {
521 state.sourceLoadStates.set(
522 type = loadType,
523 state =
524 when {
525 endOfPaginationReached -> NotLoading.Complete
526 else -> NotLoading.Incomplete
527 }
528 )
529 }
530
531 // Send page event for successful insert, now that PagerState has been updated.
532 val pageEvent = with(state) { result.toPageEvent(loadType) }
533
534 pageEventCh.send(pageEvent)
535 }
536
537 val endsPrepend = params is LoadParams.Prepend && result.prevKey == null
538 val endsAppend = params is LoadParams.Append && result.nextKey == null
539 if (remoteMediatorConnection != null && (endsPrepend || endsAppend)) {
540 val pagingState =
541 stateHolder.withLock { state ->
542 state.currentPagingState(hintHandler.lastAccessHint)
543 }
544
545 if (endsPrepend) {
546 remoteMediatorConnection.requestLoad(PREPEND, pagingState)
547 }
548
549 if (endsAppend) {
550 remoteMediatorConnection.requestLoad(APPEND, pagingState)
551 }
552 }
553 }
554 }
555
loadResultLognull556 private fun loadResultLog(
557 loadType: LoadType,
558 loadKey: Key?,
559 result: LoadResult<Key, Value>?
560 ): String {
561 return if (result == null) {
562 "End $loadType with loadkey $loadKey. Load CANCELLED."
563 } else {
564 "End $loadType with loadKey $loadKey. Returned $result"
565 }
566 }
567
setLoadingnull568 private suspend fun PageFetcherSnapshotState<Key, Value>.setLoading(loadType: LoadType) {
569 if (sourceLoadStates.get(loadType) != Loading) {
570 sourceLoadStates.set(type = loadType, state = Loading)
571 pageEventCh.send(
572 LoadStateUpdate(
573 source = sourceLoadStates.snapshot(),
574 mediator = null,
575 )
576 )
577 }
578 }
579
setErrornull580 private suspend fun PageFetcherSnapshotState<Key, Value>.setError(
581 loadType: LoadType,
582 error: Error
583 ) {
584 if (sourceLoadStates.get(loadType) != error) {
585 sourceLoadStates.set(type = loadType, state = error)
586 pageEventCh.send(
587 LoadStateUpdate(
588 source = sourceLoadStates.snapshot(),
589 mediator = null,
590 )
591 )
592 }
593 }
594
595 /** The next load key for a [loadType] or `null` if we should stop loading in that direction. */
nextLoadKeyOrNullnull596 private fun PageFetcherSnapshotState<Key, Value>.nextLoadKeyOrNull(
597 loadType: LoadType,
598 generationId: Int,
599 presentedItemsBeyondAnchor: Int
600 ): Key? {
601 if (generationId != generationId(loadType)) return null
602 // Skip load if in error state, unless retrying.
603 if (sourceLoadStates.get(loadType) is Error) return null
604
605 // Skip loading if prefetchDistance has been fulfilled.
606 if (presentedItemsBeyondAnchor >= config.prefetchDistance) return null
607
608 return if (loadType == PREPEND) {
609 pages.first().prevKey
610 } else {
611 pages.last().nextKey
612 }
613 }
614
615 // the handler for LoadResult.Invalid for both doInitialLoad and doLoad
onInvalidLoadnull616 private fun onInvalidLoad() {
617 close()
618 pagingSource.invalidate()
619 }
620 }
621
622 /**
623 * Generation of cancel token not [PageFetcherSnapshot]. [generationId] is used to differentiate
624 * between loads from jobs that have been cancelled, but continued to run to completion.
625 */
626 @VisibleForTesting
627 internal data class GenerationalViewportHint(val generationId: Int, val hint: ViewportHint)
628
629 /**
630 * Helper for [GenerationalViewportHint] prioritization in cases where item accesses are being sent
631 * to [PageFetcherSnapshot] faster than they can be processed. A [GenerationalViewportHint] is
632 * prioritized if it represents an update to presenter state or if it would cause
633 * [PageFetcherSnapshot] to load more items.
634 *
635 * @param previous [GenerationalViewportHint] that would normally be processed next if [this]
636 * [GenerationalViewportHint] was not sent.
637 * @return `true` if [this] [GenerationalViewportHint] should be prioritized over [previous].
638 */
shouldPrioritizeOvernull639 internal fun GenerationalViewportHint.shouldPrioritizeOver(
640 previous: GenerationalViewportHint,
641 loadType: LoadType
642 ): Boolean {
643 return when {
644 // Prioritize hints from new generations, which increments after dropping.
645 generationId > previous.generationId -> true
646 generationId < previous.generationId -> false
647 else -> hint.shouldPrioritizeOver(previous.hint, loadType)
648 }
649 }
650