1 /* <lambda>null2 * Copyright 2020 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.paging.AccessorState.BlockState.COMPLETED 20 import androidx.paging.AccessorState.BlockState.REQUIRES_REFRESH 21 import androidx.paging.AccessorState.BlockState.UNBLOCKED 22 import androidx.paging.RemoteMediator.MediatorResult 23 import androidx.paging.internal.ReentrantLock 24 import androidx.paging.internal.withLock 25 import kotlinx.coroutines.CoroutineScope 26 import kotlinx.coroutines.flow.MutableStateFlow 27 import kotlinx.coroutines.flow.StateFlow 28 import kotlinx.coroutines.launch 29 30 /** Interface provided to the snapshot to trigger load events. */ 31 internal interface RemoteMediatorConnection<Key : Any, Value : Any> { 32 fun requestRefreshIfAllowed(pagingState: PagingState<Key, Value>) 33 34 fun requestLoad(loadType: LoadType, pagingState: PagingState<Key, Value>) 35 36 fun retryFailed(pagingState: PagingState<Key, Value>) 37 38 /** Allow a single call to [requestRefreshIfAllowed] to successfully get enqueued. */ 39 fun allowRefresh() 40 } 41 42 @OptIn(ExperimentalPagingApi::class) 43 internal interface RemoteMediatorAccessor<Key : Any, Value : Any> : 44 RemoteMediatorConnection<Key, Value> { 45 val state: StateFlow<LoadStates> 46 initializenull47 suspend fun initialize(): RemoteMediator.InitializeAction 48 } 49 50 @Suppress("FunctionName") 51 @OptIn(ExperimentalPagingApi::class) 52 internal fun <Key : Any, Value : Any> RemoteMediatorAccessor( 53 scope: CoroutineScope, 54 delegate: RemoteMediator<Key, Value> 55 ): RemoteMediatorAccessor<Key, Value> = RemoteMediatorAccessImpl(scope, delegate) 56 57 /** Simple wrapper around the local state of accessor to ensure we don't concurrently change it. */ 58 private class AccessorStateHolder<Key : Any, Value : Any> { 59 private val lock = ReentrantLock() 60 61 private val _loadStates = MutableStateFlow(LoadStates.IDLE) 62 val loadStates 63 get(): StateFlow<LoadStates> = _loadStates 64 65 private val internalState = AccessorState<Key, Value>() 66 67 fun <R> use(block: (AccessorState<Key, Value>) -> R): R { 68 return lock.withLock { 69 block(internalState).also { _loadStates.value = internalState.computeLoadStates() } 70 } 71 } 72 } 73 74 /** 75 * The internal state of the accessor. 76 * 77 * It holds all pending requests, errors and whether certain types should be blocked (e.g. when 78 * append prepend needs refresh). 79 * 80 * It does not directly hold the LoadStates. Instead, LoadStates is computed from the previous 81 * information after each edit to keep them consistent. 82 */ 83 private class AccessorState<Key : Any, Value : Any> { 84 // TODO this can be a bit flag instead <lambda>null85 private val blockStates = Array<BlockState>(LoadType.values().size) { UNBLOCKED } 86 87 // keep these as error states to avoid recreating them all the time <lambda>null88 private val errors = Array<LoadState.Error?>(LoadType.values().size) { null } 89 private val pendingRequests = ArrayDeque<PendingRequest<Key, Value>>() 90 91 /** 92 * Whether [RemoteMediatorAccessor.requestLoad] with [LoadType.REFRESH] is allowed to 93 * successfully enqueue. 94 * 95 * NOTE: [refreshAllowed] must be tracked within [AccessorState] because it is 96 * multi-generational state that should only be flipped once 97 * [RemoteMediatorAccessor.requestLoad] has successfully been enqueued. 98 * 99 * NOTE: We may receive redundant calls to [RemoteMediatorAccessor.requestLoad] with 100 * [LoadType.REFRESH] because it must be triggered within each generation of 101 * [PageFetcherSnapshot], to prevent dropping remote emissions for [PageEvent.LoadStateUpdate] 102 * due to waiting for a valid emissions from [PagingSource]. 103 */ 104 var refreshAllowed: Boolean = false 105 computeLoadStatesnull106 fun computeLoadStates(): LoadStates { 107 return LoadStates( 108 refresh = computeLoadTypeState(LoadType.REFRESH), 109 append = computeLoadTypeState(LoadType.APPEND), 110 prepend = computeLoadTypeState(LoadType.PREPEND) 111 ) 112 } 113 computeLoadTypeStatenull114 private fun computeLoadTypeState(loadType: LoadType): LoadState { 115 val blockState = blockStates[loadType.ordinal] 116 val hasPending = pendingRequests.any { it.loadType == loadType } 117 // Boundary requests maybe queue in pendingRequest before getting launched later when 118 // refresh resolves if their block state is REQUIRES_REFRESH. 119 if (hasPending && blockState != REQUIRES_REFRESH) { 120 return LoadState.Loading 121 } 122 errors[loadType.ordinal]?.let { 123 return it 124 } 125 // now there are 3 cases here: 126 // a) it might be completed & blocked -> Blocked 127 // b) it might be blocked due to refresh being required first -> Incomplete 128 // c) it might have never run -> Incomplete 129 return when (blockState) { 130 COMPLETED -> 131 when (loadType) { 132 LoadType.REFRESH -> LoadState.NotLoading.Incomplete 133 else -> LoadState.NotLoading.Complete 134 } 135 REQUIRES_REFRESH -> LoadState.NotLoading.Incomplete 136 UNBLOCKED -> LoadState.NotLoading.Incomplete 137 } 138 } 139 140 /** 141 * Tries to add a new pending request for the provided [loadType], and launches it immediately 142 * if it should run. 143 * 144 * In cases where pending request for the provided [loadType] already exists, the [pagingState] 145 * will just be updated in the existing request instead of queuing up multiple requests. This 146 * effectively de-dupes requests by [loadType], but always keeps the most recent request. 147 * 148 * @return `true` if fetchers should be launched, `false` otherwise. 149 */ addnull150 fun add(loadType: LoadType, pagingState: PagingState<Key, Value>): Boolean { 151 val existing = pendingRequests.firstOrNull { it.loadType == loadType } 152 // De-dupe requests with the same LoadType, just update PagingState and return. 153 if (existing != null) { 154 existing.pagingState = pagingState 155 return false 156 } 157 158 val blockState = blockStates[loadType.ordinal] 159 // If blocked on REFRESH, queue up the request, but don't trigger yet. In cases where 160 // REFRESH returns endOfPaginationReached, we need to cancel the request. However, we 161 // need to queue up this request because it's possible REFRESH may not trigger 162 // invalidation even if it succeeds! 163 if (blockState == REQUIRES_REFRESH && loadType != LoadType.REFRESH) { 164 pendingRequests.add(PendingRequest(loadType, pagingState)) 165 return false 166 } 167 168 // Ignore block state for REFRESH as it is only sent in cases where we want to clear all 169 // AccessorState, but we cannot simply generate a new one for an existing PageFetcher as 170 // we need to cancel in-flight requests and prevent races between clearing state and 171 // triggering remote REFRESH by clearing state as part of handling the load request. 172 if (blockState != UNBLOCKED && loadType != LoadType.REFRESH) { 173 return false 174 } 175 176 if (loadType == LoadType.REFRESH) { 177 // for refresh, we ignore error states. see: b/173438474 178 setError(LoadType.REFRESH, null) 179 } 180 return if (errors[loadType.ordinal] == null) { 181 pendingRequests.add(PendingRequest(loadType, pagingState)) 182 } else { 183 false 184 } 185 } 186 187 /** 188 * Can be used to block - unblock certain request types based on the mediator state. 189 * 190 * Note that a load type can still be blocked if it last returned an error. 191 */ setBlockStatenull192 fun setBlockState(loadType: LoadType, state: BlockState) { 193 blockStates[loadType.ordinal] = state 194 } 195 getPendingRefreshnull196 fun getPendingRefresh() = 197 pendingRequests.firstOrNull { it.loadType == LoadType.REFRESH }?.pagingState 198 getPendingBoundarynull199 fun getPendingBoundary() = 200 pendingRequests 201 .firstOrNull { 202 it.loadType != LoadType.REFRESH && blockStates[it.loadType.ordinal] == UNBLOCKED 203 } <lambda>null204 ?.let { 205 // make a copy 206 it.loadType to it.pagingState 207 } 208 clearPendingRequestsnull209 fun clearPendingRequests() { 210 pendingRequests.clear() 211 } 212 clearPendingRequestnull213 fun clearPendingRequest(loadType: LoadType) { 214 pendingRequests.removeAll { it.loadType == loadType } 215 } 216 clearErrorsnull217 fun clearErrors() { 218 for (i in errors.indices) { 219 errors[i] = null 220 } 221 } 222 setErrornull223 fun setError(loadType: LoadType, errorState: LoadState.Error?) { 224 errors[loadType.ordinal] = errorState 225 } 226 227 class PendingRequest<Key : Any, Value : Any>( 228 val loadType: LoadType, 229 var pagingState: PagingState<Key, Value> 230 ) 231 232 enum class BlockState { 233 UNBLOCKED, 234 COMPLETED, 235 REQUIRES_REFRESH 236 } 237 } 238 239 @OptIn(ExperimentalPagingApi::class) 240 private class RemoteMediatorAccessImpl<Key : Any, Value : Any>( 241 private val scope: CoroutineScope, 242 private val remoteMediator: RemoteMediator<Key, Value> 243 ) : RemoteMediatorAccessor<Key, Value> { 244 // all internal state is kept in accessorState to avoid concurrent access 245 private val accessorState = AccessorStateHolder<Key, Value>() 246 247 // an isolation runner is used to ensure no concurrent requests are made to the remote mediator. 248 // it also handles cancelling lower priority calls with higher priority calls. 249 private val isolationRunner = SingleRunner(cancelPreviousInEqualPriority = false) 250 251 override val state: StateFlow<LoadStates> 252 get() = accessorState.loadStates 253 requestRefreshIfAllowednull254 override fun requestRefreshIfAllowed(pagingState: PagingState<Key, Value>) { 255 accessorState.use { 256 if (it.refreshAllowed) { 257 it.refreshAllowed = false 258 accessorState.requestLoad(LoadType.REFRESH, pagingState) 259 } 260 } 261 } 262 allowRefreshnull263 override fun allowRefresh() { 264 accessorState.use { it.refreshAllowed = true } 265 } 266 requestLoadnull267 override fun requestLoad(loadType: LoadType, pagingState: PagingState<Key, Value>) { 268 accessorState.requestLoad(loadType, pagingState) 269 } 270 AccessorStateHoldernull271 private fun AccessorStateHolder<Key, Value>.requestLoad( 272 loadType: LoadType, 273 pagingState: PagingState<Key, Value>, 274 ) { 275 val newRequest = use { it.add(loadType, pagingState) } 276 277 if (newRequest) { 278 when (loadType) { 279 LoadType.REFRESH -> launchRefresh() 280 else -> launchBoundary() 281 } 282 } 283 } 284 launchRefreshnull285 private fun launchRefresh() { 286 scope.launch { 287 var launchAppendPrepend = false 288 isolationRunner.runInIsolation(priority = PRIORITY_REFRESH) { 289 val pendingPagingState = accessorState.use { it.getPendingRefresh() } 290 pendingPagingState?.let { 291 val loadResult = remoteMediator.load(LoadType.REFRESH, pendingPagingState) 292 launchAppendPrepend = 293 when (loadResult) { 294 is MediatorResult.Success -> { 295 accessorState.use { 296 // First clear refresh from pending requests to update 297 // LoadState. 298 // Note: Only clear refresh request, allowing potentially 299 // out-of-date boundary requests as there's no guarantee that 300 // refresh will trigger invalidation, and clearing boundary 301 // requests 302 // here could prevent Paging from making progress. 303 it.clearPendingRequest(LoadType.REFRESH) 304 305 if (loadResult.endOfPaginationReached) { 306 it.setBlockState(LoadType.REFRESH, COMPLETED) 307 it.setBlockState(LoadType.PREPEND, COMPLETED) 308 it.setBlockState(LoadType.APPEND, COMPLETED) 309 310 // Now that blockState is updated, which should block 311 // new boundary requests, clear all requests since 312 // endOfPaginationReached from refresh should prevent 313 // prepend 314 // and append from triggering, even if they are queued up. 315 it.clearPendingRequests() 316 } else { 317 // Update block state for boundary requests now that we can 318 // handle them if they required refresh. 319 it.setBlockState(LoadType.PREPEND, UNBLOCKED) 320 it.setBlockState(LoadType.APPEND, UNBLOCKED) 321 } 322 323 // clean their errors 324 it.setError(LoadType.PREPEND, null) 325 it.setError(LoadType.APPEND, null) 326 327 // If there is a pending boundary, trigger its launch, allowing 328 // out-of-date requests in the case where queued requests were 329 // from previous generation. See b/176855944. 330 it.getPendingBoundary() != null 331 } 332 } 333 is MediatorResult.Error -> { 334 // if refresh failed, don't change append/prepend states so that if 335 // refresh is not required, they can run. 336 accessorState.use { 337 // only clear refresh. we can use append prepend 338 it.clearPendingRequest(LoadType.REFRESH) 339 it.setError( 340 LoadType.REFRESH, 341 LoadState.Error(loadResult.throwable) 342 ) 343 344 // If there is a pending boundary, trigger its launch, allowing 345 // out-of-date requests in the case where queued requests were 346 // from previous generation. See b/176855944. 347 it.getPendingBoundary() != null 348 } 349 } 350 } 351 } 352 } 353 // launch this after we leave the restricted scope otherwise append / prepend won't 354 // make it since they have a lower priority 355 if (launchAppendPrepend) { 356 launchBoundary() 357 } 358 } 359 } 360 launchBoundarynull361 private fun launchBoundary() { 362 scope.launch { 363 isolationRunner.runInIsolation(priority = PRIORITY_APPEND_PREPEND) { 364 while (true) { 365 val (loadType, pendingPagingState) = 366 accessorState.use { it.getPendingBoundary() } ?: break 367 when (val loadResult = remoteMediator.load(loadType, pendingPagingState)) { 368 is MediatorResult.Success -> { 369 accessorState.use { 370 it.clearPendingRequest(loadType) 371 if (loadResult.endOfPaginationReached) { 372 it.setBlockState(loadType, COMPLETED) 373 } 374 } 375 } 376 is MediatorResult.Error -> { 377 accessorState.use { 378 it.clearPendingRequest(loadType) 379 it.setError(loadType, LoadState.Error(loadResult.throwable)) 380 } 381 } 382 } 383 } 384 } 385 } 386 } 387 retryFailednull388 override fun retryFailed(pagingState: PagingState<Key, Value>) { 389 val toBeStarted = mutableListOf<LoadType>() 390 accessorState.use { accessorState -> 391 val loadStates = accessorState.computeLoadStates() 392 val willTriggerRefresh = loadStates.refresh is LoadState.Error 393 394 accessorState.clearErrors() 395 if (willTriggerRefresh) { 396 toBeStarted.add(LoadType.REFRESH) 397 accessorState.setBlockState(LoadType.REFRESH, UNBLOCKED) 398 } 399 if (loadStates.append is LoadState.Error) { 400 if (!willTriggerRefresh) { 401 toBeStarted.add(LoadType.APPEND) 402 } 403 accessorState.clearPendingRequest(LoadType.APPEND) 404 } 405 if (loadStates.prepend is LoadState.Error) { 406 if (!willTriggerRefresh) { 407 toBeStarted.add(LoadType.PREPEND) 408 } 409 accessorState.clearPendingRequest(LoadType.PREPEND) 410 } 411 } 412 413 toBeStarted.forEach { requestLoad(it, pagingState) } 414 } 415 initializenull416 override suspend fun initialize(): RemoteMediator.InitializeAction { 417 return remoteMediator.initialize().also { action -> 418 if (action == RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH) { 419 accessorState.use { 420 it.setBlockState(LoadType.APPEND, REQUIRES_REFRESH) 421 it.setBlockState(LoadType.PREPEND, REQUIRES_REFRESH) 422 } 423 } 424 } 425 } 426 427 companion object { 428 private const val PRIORITY_REFRESH = 2 429 private const val PRIORITY_APPEND_PREPEND = 1 430 } 431 } 432