1 /*
<lambda>null2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.paging.AccessorState.BlockState.COMPLETED
20 import androidx.paging.AccessorState.BlockState.REQUIRES_REFRESH
21 import androidx.paging.AccessorState.BlockState.UNBLOCKED
22 import androidx.paging.RemoteMediator.MediatorResult
23 import androidx.paging.internal.ReentrantLock
24 import androidx.paging.internal.withLock
25 import kotlinx.coroutines.CoroutineScope
26 import kotlinx.coroutines.flow.MutableStateFlow
27 import kotlinx.coroutines.flow.StateFlow
28 import kotlinx.coroutines.launch
29 
30 /** Interface provided to the snapshot to trigger load events. */
31 internal interface RemoteMediatorConnection<Key : Any, Value : Any> {
32     fun requestRefreshIfAllowed(pagingState: PagingState<Key, Value>)
33 
34     fun requestLoad(loadType: LoadType, pagingState: PagingState<Key, Value>)
35 
36     fun retryFailed(pagingState: PagingState<Key, Value>)
37 
38     /** Allow a single call to [requestRefreshIfAllowed] to successfully get enqueued. */
39     fun allowRefresh()
40 }
41 
42 @OptIn(ExperimentalPagingApi::class)
43 internal interface RemoteMediatorAccessor<Key : Any, Value : Any> :
44     RemoteMediatorConnection<Key, Value> {
45     val state: StateFlow<LoadStates>
46 
initializenull47     suspend fun initialize(): RemoteMediator.InitializeAction
48 }
49 
50 @Suppress("FunctionName")
51 @OptIn(ExperimentalPagingApi::class)
52 internal fun <Key : Any, Value : Any> RemoteMediatorAccessor(
53     scope: CoroutineScope,
54     delegate: RemoteMediator<Key, Value>
55 ): RemoteMediatorAccessor<Key, Value> = RemoteMediatorAccessImpl(scope, delegate)
56 
57 /** Simple wrapper around the local state of accessor to ensure we don't concurrently change it. */
58 private class AccessorStateHolder<Key : Any, Value : Any> {
59     private val lock = ReentrantLock()
60 
61     private val _loadStates = MutableStateFlow(LoadStates.IDLE)
62     val loadStates
63         get(): StateFlow<LoadStates> = _loadStates
64 
65     private val internalState = AccessorState<Key, Value>()
66 
67     fun <R> use(block: (AccessorState<Key, Value>) -> R): R {
68         return lock.withLock {
69             block(internalState).also { _loadStates.value = internalState.computeLoadStates() }
70         }
71     }
72 }
73 
74 /**
75  * The internal state of the accessor.
76  *
77  * It holds all pending requests, errors and whether certain types should be blocked (e.g. when
78  * append prepend needs refresh).
79  *
80  * It does not directly hold the LoadStates. Instead, LoadStates is computed from the previous
81  * information after each edit to keep them consistent.
82  */
83 private class AccessorState<Key : Any, Value : Any> {
84     // TODO this can be a bit flag instead
<lambda>null85     private val blockStates = Array<BlockState>(LoadType.values().size) { UNBLOCKED }
86 
87     // keep these as error states to avoid recreating them all the time
<lambda>null88     private val errors = Array<LoadState.Error?>(LoadType.values().size) { null }
89     private val pendingRequests = ArrayDeque<PendingRequest<Key, Value>>()
90 
91     /**
92      * Whether [RemoteMediatorAccessor.requestLoad] with [LoadType.REFRESH] is allowed to
93      * successfully enqueue.
94      *
95      * NOTE: [refreshAllowed] must be tracked within [AccessorState] because it is
96      * multi-generational state that should only be flipped once
97      * [RemoteMediatorAccessor.requestLoad] has successfully been enqueued.
98      *
99      * NOTE: We may receive redundant calls to [RemoteMediatorAccessor.requestLoad] with
100      * [LoadType.REFRESH] because it must be triggered within each generation of
101      * [PageFetcherSnapshot], to prevent dropping remote emissions for [PageEvent.LoadStateUpdate]
102      * due to waiting for a valid emissions from [PagingSource].
103      */
104     var refreshAllowed: Boolean = false
105 
computeLoadStatesnull106     fun computeLoadStates(): LoadStates {
107         return LoadStates(
108             refresh = computeLoadTypeState(LoadType.REFRESH),
109             append = computeLoadTypeState(LoadType.APPEND),
110             prepend = computeLoadTypeState(LoadType.PREPEND)
111         )
112     }
113 
computeLoadTypeStatenull114     private fun computeLoadTypeState(loadType: LoadType): LoadState {
115         val blockState = blockStates[loadType.ordinal]
116         val hasPending = pendingRequests.any { it.loadType == loadType }
117         // Boundary requests maybe queue in pendingRequest before getting launched later when
118         // refresh resolves if their block state is REQUIRES_REFRESH.
119         if (hasPending && blockState != REQUIRES_REFRESH) {
120             return LoadState.Loading
121         }
122         errors[loadType.ordinal]?.let {
123             return it
124         }
125         // now there are 3 cases here:
126         // a) it might be completed & blocked -> Blocked
127         // b) it might be blocked due to refresh being required first -> Incomplete
128         // c) it might have never run -> Incomplete
129         return when (blockState) {
130             COMPLETED ->
131                 when (loadType) {
132                     LoadType.REFRESH -> LoadState.NotLoading.Incomplete
133                     else -> LoadState.NotLoading.Complete
134                 }
135             REQUIRES_REFRESH -> LoadState.NotLoading.Incomplete
136             UNBLOCKED -> LoadState.NotLoading.Incomplete
137         }
138     }
139 
140     /**
141      * Tries to add a new pending request for the provided [loadType], and launches it immediately
142      * if it should run.
143      *
144      * In cases where pending request for the provided [loadType] already exists, the [pagingState]
145      * will just be updated in the existing request instead of queuing up multiple requests. This
146      * effectively de-dupes requests by [loadType], but always keeps the most recent request.
147      *
148      * @return `true` if fetchers should be launched, `false` otherwise.
149      */
addnull150     fun add(loadType: LoadType, pagingState: PagingState<Key, Value>): Boolean {
151         val existing = pendingRequests.firstOrNull { it.loadType == loadType }
152         // De-dupe requests with the same LoadType, just update PagingState and return.
153         if (existing != null) {
154             existing.pagingState = pagingState
155             return false
156         }
157 
158         val blockState = blockStates[loadType.ordinal]
159         // If blocked on REFRESH, queue up the request, but don't trigger yet. In cases where
160         // REFRESH returns endOfPaginationReached, we need to cancel the request. However, we
161         // need to queue up this request because it's possible REFRESH may not trigger
162         // invalidation even if it succeeds!
163         if (blockState == REQUIRES_REFRESH && loadType != LoadType.REFRESH) {
164             pendingRequests.add(PendingRequest(loadType, pagingState))
165             return false
166         }
167 
168         // Ignore block state for REFRESH as it is only sent in cases where we want to clear all
169         // AccessorState, but we cannot simply generate a new one for an existing PageFetcher as
170         // we need to cancel in-flight requests and prevent races between clearing state and
171         // triggering remote REFRESH by clearing state as part of handling the load request.
172         if (blockState != UNBLOCKED && loadType != LoadType.REFRESH) {
173             return false
174         }
175 
176         if (loadType == LoadType.REFRESH) {
177             // for refresh, we ignore error states. see: b/173438474
178             setError(LoadType.REFRESH, null)
179         }
180         return if (errors[loadType.ordinal] == null) {
181             pendingRequests.add(PendingRequest(loadType, pagingState))
182         } else {
183             false
184         }
185     }
186 
187     /**
188      * Can be used to block - unblock certain request types based on the mediator state.
189      *
190      * Note that a load type can still be blocked if it last returned an error.
191      */
setBlockStatenull192     fun setBlockState(loadType: LoadType, state: BlockState) {
193         blockStates[loadType.ordinal] = state
194     }
195 
getPendingRefreshnull196     fun getPendingRefresh() =
197         pendingRequests.firstOrNull { it.loadType == LoadType.REFRESH }?.pagingState
198 
getPendingBoundarynull199     fun getPendingBoundary() =
200         pendingRequests
201             .firstOrNull {
202                 it.loadType != LoadType.REFRESH && blockStates[it.loadType.ordinal] == UNBLOCKED
203             }
<lambda>null204             ?.let {
205                 // make a copy
206                 it.loadType to it.pagingState
207             }
208 
clearPendingRequestsnull209     fun clearPendingRequests() {
210         pendingRequests.clear()
211     }
212 
clearPendingRequestnull213     fun clearPendingRequest(loadType: LoadType) {
214         pendingRequests.removeAll { it.loadType == loadType }
215     }
216 
clearErrorsnull217     fun clearErrors() {
218         for (i in errors.indices) {
219             errors[i] = null
220         }
221     }
222 
setErrornull223     fun setError(loadType: LoadType, errorState: LoadState.Error?) {
224         errors[loadType.ordinal] = errorState
225     }
226 
227     class PendingRequest<Key : Any, Value : Any>(
228         val loadType: LoadType,
229         var pagingState: PagingState<Key, Value>
230     )
231 
232     enum class BlockState {
233         UNBLOCKED,
234         COMPLETED,
235         REQUIRES_REFRESH
236     }
237 }
238 
239 @OptIn(ExperimentalPagingApi::class)
240 private class RemoteMediatorAccessImpl<Key : Any, Value : Any>(
241     private val scope: CoroutineScope,
242     private val remoteMediator: RemoteMediator<Key, Value>
243 ) : RemoteMediatorAccessor<Key, Value> {
244     // all internal state is kept in accessorState to avoid concurrent access
245     private val accessorState = AccessorStateHolder<Key, Value>()
246 
247     // an isolation runner is used to ensure no concurrent requests are made to the remote mediator.
248     // it also handles cancelling lower priority calls with higher priority calls.
249     private val isolationRunner = SingleRunner(cancelPreviousInEqualPriority = false)
250 
251     override val state: StateFlow<LoadStates>
252         get() = accessorState.loadStates
253 
requestRefreshIfAllowednull254     override fun requestRefreshIfAllowed(pagingState: PagingState<Key, Value>) {
255         accessorState.use {
256             if (it.refreshAllowed) {
257                 it.refreshAllowed = false
258                 accessorState.requestLoad(LoadType.REFRESH, pagingState)
259             }
260         }
261     }
262 
allowRefreshnull263     override fun allowRefresh() {
264         accessorState.use { it.refreshAllowed = true }
265     }
266 
requestLoadnull267     override fun requestLoad(loadType: LoadType, pagingState: PagingState<Key, Value>) {
268         accessorState.requestLoad(loadType, pagingState)
269     }
270 
AccessorStateHoldernull271     private fun AccessorStateHolder<Key, Value>.requestLoad(
272         loadType: LoadType,
273         pagingState: PagingState<Key, Value>,
274     ) {
275         val newRequest = use { it.add(loadType, pagingState) }
276 
277         if (newRequest) {
278             when (loadType) {
279                 LoadType.REFRESH -> launchRefresh()
280                 else -> launchBoundary()
281             }
282         }
283     }
284 
launchRefreshnull285     private fun launchRefresh() {
286         scope.launch {
287             var launchAppendPrepend = false
288             isolationRunner.runInIsolation(priority = PRIORITY_REFRESH) {
289                 val pendingPagingState = accessorState.use { it.getPendingRefresh() }
290                 pendingPagingState?.let {
291                     val loadResult = remoteMediator.load(LoadType.REFRESH, pendingPagingState)
292                     launchAppendPrepend =
293                         when (loadResult) {
294                             is MediatorResult.Success -> {
295                                 accessorState.use {
296                                     // First clear refresh from pending requests to update
297                                     // LoadState.
298                                     // Note: Only clear refresh request, allowing potentially
299                                     // out-of-date boundary requests as there's no guarantee that
300                                     // refresh will trigger invalidation, and clearing boundary
301                                     // requests
302                                     // here could prevent Paging from making progress.
303                                     it.clearPendingRequest(LoadType.REFRESH)
304 
305                                     if (loadResult.endOfPaginationReached) {
306                                         it.setBlockState(LoadType.REFRESH, COMPLETED)
307                                         it.setBlockState(LoadType.PREPEND, COMPLETED)
308                                         it.setBlockState(LoadType.APPEND, COMPLETED)
309 
310                                         // Now that blockState is updated, which should block
311                                         // new boundary requests, clear all requests since
312                                         // endOfPaginationReached from refresh should prevent
313                                         // prepend
314                                         // and append from triggering, even if they are queued up.
315                                         it.clearPendingRequests()
316                                     } else {
317                                         // Update block state for boundary requests now that we can
318                                         // handle them if they required refresh.
319                                         it.setBlockState(LoadType.PREPEND, UNBLOCKED)
320                                         it.setBlockState(LoadType.APPEND, UNBLOCKED)
321                                     }
322 
323                                     // clean their errors
324                                     it.setError(LoadType.PREPEND, null)
325                                     it.setError(LoadType.APPEND, null)
326 
327                                     // If there is a pending boundary, trigger its launch, allowing
328                                     // out-of-date requests in the case where queued requests were
329                                     // from previous generation. See b/176855944.
330                                     it.getPendingBoundary() != null
331                                 }
332                             }
333                             is MediatorResult.Error -> {
334                                 // if refresh failed, don't change append/prepend states so that if
335                                 // refresh is not required, they can run.
336                                 accessorState.use {
337                                     // only clear refresh. we can use append prepend
338                                     it.clearPendingRequest(LoadType.REFRESH)
339                                     it.setError(
340                                         LoadType.REFRESH,
341                                         LoadState.Error(loadResult.throwable)
342                                     )
343 
344                                     // If there is a pending boundary, trigger its launch, allowing
345                                     // out-of-date requests in the case where queued requests were
346                                     // from previous generation. See b/176855944.
347                                     it.getPendingBoundary() != null
348                                 }
349                             }
350                         }
351                 }
352             }
353             // launch this after we leave the restricted scope otherwise append / prepend won't
354             // make it since they have a lower priority
355             if (launchAppendPrepend) {
356                 launchBoundary()
357             }
358         }
359     }
360 
launchBoundarynull361     private fun launchBoundary() {
362         scope.launch {
363             isolationRunner.runInIsolation(priority = PRIORITY_APPEND_PREPEND) {
364                 while (true) {
365                     val (loadType, pendingPagingState) =
366                         accessorState.use { it.getPendingBoundary() } ?: break
367                     when (val loadResult = remoteMediator.load(loadType, pendingPagingState)) {
368                         is MediatorResult.Success -> {
369                             accessorState.use {
370                                 it.clearPendingRequest(loadType)
371                                 if (loadResult.endOfPaginationReached) {
372                                     it.setBlockState(loadType, COMPLETED)
373                                 }
374                             }
375                         }
376                         is MediatorResult.Error -> {
377                             accessorState.use {
378                                 it.clearPendingRequest(loadType)
379                                 it.setError(loadType, LoadState.Error(loadResult.throwable))
380                             }
381                         }
382                     }
383                 }
384             }
385         }
386     }
387 
retryFailednull388     override fun retryFailed(pagingState: PagingState<Key, Value>) {
389         val toBeStarted = mutableListOf<LoadType>()
390         accessorState.use { accessorState ->
391             val loadStates = accessorState.computeLoadStates()
392             val willTriggerRefresh = loadStates.refresh is LoadState.Error
393 
394             accessorState.clearErrors()
395             if (willTriggerRefresh) {
396                 toBeStarted.add(LoadType.REFRESH)
397                 accessorState.setBlockState(LoadType.REFRESH, UNBLOCKED)
398             }
399             if (loadStates.append is LoadState.Error) {
400                 if (!willTriggerRefresh) {
401                     toBeStarted.add(LoadType.APPEND)
402                 }
403                 accessorState.clearPendingRequest(LoadType.APPEND)
404             }
405             if (loadStates.prepend is LoadState.Error) {
406                 if (!willTriggerRefresh) {
407                     toBeStarted.add(LoadType.PREPEND)
408                 }
409                 accessorState.clearPendingRequest(LoadType.PREPEND)
410             }
411         }
412 
413         toBeStarted.forEach { requestLoad(it, pagingState) }
414     }
415 
initializenull416     override suspend fun initialize(): RemoteMediator.InitializeAction {
417         return remoteMediator.initialize().also { action ->
418             if (action == RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH) {
419                 accessorState.use {
420                     it.setBlockState(LoadType.APPEND, REQUIRES_REFRESH)
421                     it.setBlockState(LoadType.PREPEND, REQUIRES_REFRESH)
422                 }
423             }
424         }
425     }
426 
427     companion object {
428         private const val PRIORITY_REFRESH = 2
429         private const val PRIORITY_APPEND_PREPEND = 1
430     }
431 }
432