1 /*
<lambda>null2 * Copyright 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package androidx.paging
18
19 import androidx.annotation.CheckResult
20 import androidx.annotation.VisibleForTesting
21 import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW
22 import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW
23 import kotlinx.coroutines.CoroutineScope
24 import kotlinx.coroutines.flow.Flow
25 import kotlinx.coroutines.flow.SharingStarted
26 import kotlinx.coroutines.flow.map
27 import kotlinx.coroutines.flow.onCompletion
28 import kotlinx.coroutines.flow.onStart
29 import kotlinx.coroutines.flow.shareIn
30
31 /**
32 * A PagingData wrapper that makes it "efficiently" share-able between multiple downstreams. It
33 * flattens all previous pages such that a new subscriber will get all of them at once (and also not
34 * deal with dropped pages, intermediate loading state changes etc).
35 */
36 private class MulticastedPagingData<T : Any>(
37 val scope: CoroutineScope,
38 val parent: PagingData<T>,
39 // used in tests
40 val tracker: ActiveFlowTracker? = null
41 ) {
42 private val accumulated =
43 CachedPageEventFlow(src = parent.flow, scope = scope).also {
44 tracker?.onNewCachedEventFlow(it)
45 }
46
47 fun asPagingData() =
48 PagingData(
49 flow =
50 accumulated.downstreamFlow
51 .onStart { tracker?.onStart(PAGE_EVENT_FLOW) }
52 .onCompletion { tracker?.onComplete(PAGE_EVENT_FLOW) },
53 uiReceiver = parent.uiReceiver,
54 hintReceiver = parent.hintReceiver,
55 cachedPageEvent = { accumulated.getCachedEvent() }
56 )
57
58 suspend fun close() = accumulated.close()
59 }
60
61 /**
62 * Caches the [PagingData] such that any downstream collection from this flow will share the same
63 * [PagingData].
64 *
65 * The flow is kept active as long as the given [scope] is active. To avoid leaks, make sure to use
66 * a [scope] that is already managed (like a ViewModel scope) or manually cancel it when you don't
67 * need paging anymore.
68 *
69 * A common use case for this caching is to cache [PagingData] in a ViewModel. This can ensure that,
70 * upon configuration change (e.g. rotation), then new Activity will receive the existing data
71 * immediately rather than fetching it from scratch.
72 *
73 * Calling [cachedIn] is required to allow calling
74 * [submitData][androidx.paging.AsyncPagingDataAdapter] on the same instance of [PagingData] emitted
75 * by [Pager] or any of its transformed derivatives, as reloading data from scratch on the same
76 * generation of [PagingData] is an unsupported operation.
77 *
78 * Note that this does not turn the `Flow<PagingData>` into a hot stream. It won't execute any
79 * unnecessary code unless it is being collected.
80 *
81 * @sample androidx.paging.samples.cachedInSample
82 * @param scope The coroutine scope where this page cache will be kept alive.
83 */
84 @CheckResult
cachedInnull85 public fun <T : Any> Flow<PagingData<T>>.cachedIn(scope: CoroutineScope): Flow<PagingData<T>> =
86 cachedIn(scope, null)
87
88 internal fun <T : Any> Flow<PagingData<T>>.cachedIn(
89 scope: CoroutineScope,
90 // used in tests
91 tracker: ActiveFlowTracker? = null
92 ): Flow<PagingData<T>> {
93 return this.simpleMapLatest {
94 MulticastedPagingData(scope = scope, parent = it, tracker = tracker)
95 }
96 .simpleRunningReduce { prev, next ->
97 prev.close()
98 next
99 }
100 .map { it.asPagingData() }
101 .onStart { tracker?.onStart(PAGED_DATA_FLOW) }
102 .onCompletion { tracker?.onComplete(PAGED_DATA_FLOW) }
103 .shareIn(
104 scope = scope,
105 started = SharingStarted.Lazily,
106 // replay latest multicasted paging data since it is re-connectable.
107 replay = 1
108 )
109 }
110
111 /** This is only used for testing to ensure we don't leak resources */
112 @VisibleForTesting
113 internal interface ActiveFlowTracker {
onNewCachedEventFlownull114 fun onNewCachedEventFlow(cachedPageEventFlow: CachedPageEventFlow<*>)
115
116 suspend fun onStart(flowType: FlowType)
117
118 suspend fun onComplete(flowType: FlowType)
119
120 enum class FlowType {
121 PAGED_DATA_FLOW,
122 PAGE_EVENT_FLOW
123 }
124 }
125