1 /*
<lambda>null2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.annotation.CheckResult
20 import androidx.annotation.VisibleForTesting
21 import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW
22 import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW
23 import kotlinx.coroutines.CoroutineScope
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.SharingStarted
26 import kotlinx.coroutines.flow.map
27 import kotlinx.coroutines.flow.onCompletion
28 import kotlinx.coroutines.flow.onStart
29 import kotlinx.coroutines.flow.shareIn
30 
31 /**
32  * A PagingData wrapper that makes it "efficiently" share-able between multiple downstreams. It
33  * flattens all previous pages such that a new subscriber will get all of them at once (and also not
34  * deal with dropped pages, intermediate loading state changes etc).
35  */
36 private class MulticastedPagingData<T : Any>(
37     val scope: CoroutineScope,
38     val parent: PagingData<T>,
39     // used in tests
40     val tracker: ActiveFlowTracker? = null
41 ) {
42     private val accumulated =
43         CachedPageEventFlow(src = parent.flow, scope = scope).also {
44             tracker?.onNewCachedEventFlow(it)
45         }
46 
47     fun asPagingData() =
48         PagingData(
49             flow =
50                 accumulated.downstreamFlow
51                     .onStart { tracker?.onStart(PAGE_EVENT_FLOW) }
52                     .onCompletion { tracker?.onComplete(PAGE_EVENT_FLOW) },
53             uiReceiver = parent.uiReceiver,
54             hintReceiver = parent.hintReceiver,
55             cachedPageEvent = { accumulated.getCachedEvent() }
56         )
57 
58     suspend fun close() = accumulated.close()
59 }
60 
61 /**
62  * Caches the [PagingData] such that any downstream collection from this flow will share the same
63  * [PagingData].
64  *
65  * The flow is kept active as long as the given [scope] is active. To avoid leaks, make sure to use
66  * a [scope] that is already managed (like a ViewModel scope) or manually cancel it when you don't
67  * need paging anymore.
68  *
69  * A common use case for this caching is to cache [PagingData] in a ViewModel. This can ensure that,
70  * upon configuration change (e.g. rotation), then new Activity will receive the existing data
71  * immediately rather than fetching it from scratch.
72  *
73  * Calling [cachedIn] is required to allow calling
74  * [submitData][androidx.paging.AsyncPagingDataAdapter] on the same instance of [PagingData] emitted
75  * by [Pager] or any of its transformed derivatives, as reloading data from scratch on the same
76  * generation of [PagingData] is an unsupported operation.
77  *
78  * Note that this does not turn the `Flow<PagingData>` into a hot stream. It won't execute any
79  * unnecessary code unless it is being collected.
80  *
81  * @sample androidx.paging.samples.cachedInSample
82  * @param scope The coroutine scope where this page cache will be kept alive.
83  */
84 @CheckResult
cachedInnull85 public fun <T : Any> Flow<PagingData<T>>.cachedIn(scope: CoroutineScope): Flow<PagingData<T>> =
86     cachedIn(scope, null)
87 
88 internal fun <T : Any> Flow<PagingData<T>>.cachedIn(
89     scope: CoroutineScope,
90     // used in tests
91     tracker: ActiveFlowTracker? = null
92 ): Flow<PagingData<T>> {
93     return this.simpleMapLatest {
94             MulticastedPagingData(scope = scope, parent = it, tracker = tracker)
95         }
96         .simpleRunningReduce { prev, next ->
97             prev.close()
98             next
99         }
100         .map { it.asPagingData() }
101         .onStart { tracker?.onStart(PAGED_DATA_FLOW) }
102         .onCompletion { tracker?.onComplete(PAGED_DATA_FLOW) }
103         .shareIn(
104             scope = scope,
105             started = SharingStarted.Lazily,
106             // replay latest multicasted paging data since it is re-connectable.
107             replay = 1
108         )
109 }
110 
111 /** This is only used for testing to ensure we don't leak resources */
112 @VisibleForTesting
113 internal interface ActiveFlowTracker {
onNewCachedEventFlownull114     fun onNewCachedEventFlow(cachedPageEventFlow: CachedPageEventFlow<*>)
115 
116     suspend fun onStart(flowType: FlowType)
117 
118     suspend fun onComplete(flowType: FlowType)
119 
120     enum class FlowType {
121         PAGED_DATA_FLOW,
122         PAGE_EVENT_FLOW
123     }
124 }
125