1 /*
<lambda>null2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.annotation.VisibleForTesting
20 import androidx.paging.CombineSource.RECEIVER
21 import androidx.paging.PageEvent.Drop
22 import androidx.paging.PageEvent.Insert
23 import androidx.paging.PageEvent.LoadStateUpdate
24 import androidx.paging.RemoteMediator.InitializeAction.LAUNCH_INITIAL_REFRESH
25 import androidx.paging.internal.BUGANIZER_URL
26 import kotlinx.coroutines.Job
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.filterNotNull
29 import kotlinx.coroutines.flow.onEach
30 import kotlinx.coroutines.flow.onStart
31 
32 internal class PageFetcher<Key : Any, Value : Any>(
33     private val pagingSourceFactory: suspend () -> PagingSource<Key, Value>,
34     private val initialKey: Key?,
35     private val config: PagingConfig,
36     @OptIn(ExperimentalPagingApi::class) remoteMediator: RemoteMediator<Key, Value>? = null
37 ) {
38     /**
39      * Channel of refresh signals that would trigger a new instance of [PageFetcherSnapshot].
40      * Signals sent to this channel should be `true` if a remote REFRESH load should be triggered,
41      * `false` otherwise.
42      *
43      * NOTE: This channel is conflated, which means it has a buffer size of 1, and will always
44      * broadcast the latest value received.
45      */
46     private val refreshEvents = ConflatedEventBus<Boolean>()
47 
48     private val retryEvents = ConflatedEventBus<Unit>()
49 
50     // The object built by paging builder can maintain the scope so that on rotation we don't stop
51     // the paging.
52     val flow: Flow<PagingData<Value>> = simpleChannelFlow {
53         @OptIn(ExperimentalPagingApi::class)
54         val remoteMediatorAccessor = remoteMediator?.let { RemoteMediatorAccessor(this, it) }
55 
56         refreshEvents.flow
57             .onStart {
58                 @OptIn(ExperimentalPagingApi::class)
59                 emit(remoteMediatorAccessor?.initialize() == LAUNCH_INITIAL_REFRESH)
60             }
61             .simpleScan(null) {
62                 previousGeneration: GenerationInfo<Key, Value>?,
63                 triggerRemoteRefresh: Boolean ->
64                 // Enable refresh if this is the first generation and we have LAUNCH_INITIAL_REFRESH
65                 // or if this generation was started due to [refresh] being invoked.
66                 if (triggerRemoteRefresh) {
67                     remoteMediatorAccessor?.allowRefresh()
68                 }
69 
70                 val pagingSource =
71                     generateNewPagingSource(
72                         previousPagingSource = previousGeneration?.snapshot?.pagingSource
73                     )
74 
75                 var previousPagingState = previousGeneration?.snapshot?.currentPagingState()
76 
77                 // If cached PagingState had pages loaded, but previous generation didn't, use
78                 // the cached PagingState to handle cases where invalidation happens too quickly,
79                 // so that getRefreshKey and remote refresh at least have some data to work with.
80                 if (
81                     previousPagingState?.pages.isNullOrEmpty() &&
82                         previousGeneration?.state?.pages?.isNotEmpty() == true
83                 ) {
84                     previousPagingState = previousGeneration.state
85                 }
86 
87                 // If previous generation was invalidated before anchorPosition was established,
88                 // re-use last PagingState that successfully loaded pages and has an anchorPosition.
89                 // This prevents rapid invalidation from deleting the anchorPosition if the
90                 // previous generation didn't have time to load before getting invalidated.
91                 if (
92                     previousPagingState?.anchorPosition == null &&
93                         previousGeneration?.state?.anchorPosition != null
94                 ) {
95                     previousPagingState = previousGeneration.state
96                 }
97 
98                 val initialKey: Key? =
99                     when (previousPagingState) {
100                         null -> initialKey
101                         else ->
102                             pagingSource.getRefreshKey(previousPagingState).also {
103                                 log(DEBUG) {
104                                     "Refresh key $it returned from PagingSource $pagingSource"
105                                 }
106                             }
107                     }
108 
109                 previousGeneration?.snapshot?.close()
110                 previousGeneration?.job?.cancel()
111 
112                 GenerationInfo(
113                     snapshot =
114                         PageFetcherSnapshot(
115                             initialKey = initialKey,
116                             pagingSource = pagingSource,
117                             config = config,
118                             retryFlow = retryEvents.flow,
119                             // Only trigger remote refresh on refresh signals that do not originate
120                             // from
121                             // initialization or PagingSource invalidation.
122                             remoteMediatorConnection = remoteMediatorAccessor,
123                             jumpCallback = this@PageFetcher::refresh,
124                             previousPagingState = previousPagingState,
125                         ),
126                     state = previousPagingState,
127                     job = Job(),
128                 )
129             }
130             .filterNotNull()
131             .simpleMapLatest { generation ->
132                 val downstreamFlow =
133                     generation.snapshot
134                         .injectRemoteEvents(generation.job, remoteMediatorAccessor)
135                         .onEach { log(VERBOSE) { "Sent $it" } }
136 
137                 PagingData(
138                     flow = downstreamFlow,
139                     uiReceiver = PagerUiReceiver(retryEvents),
140                     hintReceiver = PagerHintReceiver(generation.snapshot)
141                 )
142             }
143             .collect(::send)
144     }
145 
146     fun refresh() {
147         refreshEvents.send(true)
148     }
149 
150     private fun invalidate() {
151         refreshEvents.send(false)
152     }
153 
154     private fun PageFetcherSnapshot<Key, Value>.injectRemoteEvents(
155         job: Job,
156         accessor: RemoteMediatorAccessor<Key, Value>?
157     ): Flow<PageEvent<Value>> {
158         if (accessor == null) return pageEventFlow
159 
160         val sourceStates = MutableLoadStateCollection()
161         // We wrap this in a cancelableChannelFlow to allow co-operative cancellation, otherwise
162         // RemoteMediatorAccessor's StateFlow will keep this Flow running on old generations.
163         return cancelableChannelFlow(job) {
164             accessor.state
165                 // Note: Combine waits for PageFetcherSnapshot to emit an event before sending
166                 // anything. This avoids sending the initial idle state, since it would cause
167                 // load state flickering on rapid invalidation.
168                 .combineWithoutBatching(pageEventFlow) { remoteState, sourceEvent, updateFrom ->
169                     if (updateFrom != RECEIVER) {
170                         when (sourceEvent) {
171                             is Insert -> {
172                                 sourceStates.set(sourceEvent.sourceLoadStates)
173                                 @Suppress("DATA_CLASS_INVISIBLE_COPY_USAGE_WARNING")
174                                 sourceEvent.copy(
175                                     sourceLoadStates = sourceEvent.sourceLoadStates,
176                                     mediatorLoadStates = remoteState,
177                                 )
178                             }
179                             is Drop -> {
180                                 sourceStates.set(
181                                     type = sourceEvent.loadType,
182                                     state = LoadState.NotLoading.Incomplete
183                                 )
184                                 sourceEvent
185                             }
186                             is LoadStateUpdate -> {
187                                 sourceStates.set(sourceEvent.source)
188                                 LoadStateUpdate(
189                                     source = sourceEvent.source,
190                                     mediator = remoteState,
191                                 )
192                             }
193                             is PageEvent.StaticList -> {
194                                 throw IllegalStateException(
195                                     """Paging generated an event to display a static list that
196                                         | originated from a paginated source. If you see this
197                                         | exception, it is most likely a bug in the library.
198                                         | Please file a bug so we can fix it at:
199                                         | $BUGANIZER_URL"""
200                                         .trimMargin()
201                                 )
202                             }
203                         }
204                     } else {
205                         LoadStateUpdate(
206                             source = sourceStates.snapshot(),
207                             mediator = remoteState,
208                         )
209                     }
210                 }
211                 .collect { send(it) }
212         }
213     }
214 
215     private suspend fun generateNewPagingSource(
216         previousPagingSource: PagingSource<Key, Value>?
217     ): PagingSource<Key, Value> {
218         val pagingSource = pagingSourceFactory()
219         if (pagingSource is CompatLegacyPagingSource) {
220             pagingSource.setPageSize(config.pageSize)
221         }
222         // Ensure pagingSourceFactory produces a new instance of PagingSource.
223         check(pagingSource !== previousPagingSource) {
224             """
225             An instance of PagingSource was re-used when Pager expected to create a new
226             instance. Ensure that the pagingSourceFactory passed to Pager always returns a
227             new instance of PagingSource.
228             """
229                 .trimIndent()
230         }
231 
232         // Hook up refresh signals from PagingSource.
233         pagingSource.registerInvalidatedCallback(::invalidate)
234         previousPagingSource?.unregisterInvalidatedCallback(::invalidate)
235         previousPagingSource?.invalidate() // Note: Invalidate is idempotent.
236         log(DEBUG) { "Generated new PagingSource $pagingSource" }
237 
238         return pagingSource
239     }
240 
241     inner class PagerUiReceiver(private val retryEventBus: ConflatedEventBus<Unit>) : UiReceiver {
242         override fun retry() {
243             retryEventBus.send(Unit)
244         }
245 
246         override fun refresh() = this@PageFetcher.refresh()
247     }
248 
249     inner class PagerHintReceiver<Key : Any, Value : Any>
250     constructor(
251         @get:VisibleForTesting internal val pageFetcherSnapshot: PageFetcherSnapshot<Key, Value>,
252     ) : HintReceiver {
253 
254         override fun accessHint(viewportHint: ViewportHint) {
255             pageFetcherSnapshot.accessHint(viewportHint)
256         }
257     }
258 
259     private class GenerationInfo<Key : Any, Value : Any>(
260         val snapshot: PageFetcherSnapshot<Key, Value>,
261         val state: PagingState<Key, Value>?,
262         val job: Job,
263     )
264 }
265