1 /*
2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 @file:JvmName("PagingRx")
18 @file:JvmMultifileClass
19 
20 package androidx.paging.rxjava2
21 
22 import androidx.paging.Pager
23 import androidx.paging.PagingData
24 import androidx.paging.cachedIn
25 import io.reactivex.BackpressureStrategy
26 import io.reactivex.Flowable
27 import io.reactivex.Observable
28 import kotlinx.coroutines.CoroutineScope
29 import kotlinx.coroutines.ExperimentalCoroutinesApi
30 import kotlinx.coroutines.flow.conflate
31 import kotlinx.coroutines.reactive.asFlow
32 import kotlinx.coroutines.rx2.asFlowable
33 import kotlinx.coroutines.rx2.asObservable
34 
35 /**
36  * An [Observable] of [PagingData], which mirrors the stream provided by [Pager.flow], but exposes
37  * it as an [Observable].
38  *
39  * NOTE: Instances of [PagingData] emitted by this [Observable] are not re-usable and cannot be
40  * submitted multiple times. This is especially relevant for transforms, which would replay the
41  * latest value downstream. To ensure you get a new instance of [PagingData] for each downstream
42  * observer, you should use the [cachedIn] operator which multicasts the [Observable] in a way that
43  * returns a new instance of [PagingData] with cached data pre-loaded.
44  */
45 val <Key : Any, Value : Any> Pager<Key, Value>.observable: Observable<PagingData<Value>>
46     get() = flow.conflate().asObservable()
47 
48 /**
49  * A [Flowable] of [PagingData], which mirrors the stream provided by [Pager.flow], but exposes it
50  * as a [Flowable].
51  *
52  * NOTE: Instances of [PagingData] emitted by this [Flowable] are not re-usable and cannot be
53  * submitted multiple times. This is especially relevant for transforms, which would replay the
54  * latest value downstream. To ensure you get a new instance of [PagingData] for each downstream
55  * observer, you should use the [cachedIn] operator which multicasts the [Flowable] in a way that
56  * returns a new instance of [PagingData] with cached data pre-loaded.
57  */
58 val <Key : Any, Value : Any> Pager<Key, Value>.flowable: Flowable<PagingData<Value>>
59     get() = flow.conflate().asFlowable()
60 
61 /**
62  * Operator which caches an [Observable] of [PagingData] within a [CoroutineScope].
63  *
64  * [cachedIn] multicasts pages loaded and transformed by a [PagingData], allowing multiple observers
65  * on the same instance of [PagingData] to receive the same events, avoiding redundant work, but
66  * comes at the cost of buffering those pages in memory.
67  *
68  * Calling [cachedIn] is required to allow calling
69  * [submitData][androidx.paging.AsyncPagingDataAdapter] on the same instance of [PagingData] emitted
70  * by [Pager] or any of its transformed derivatives, as reloading data from scratch on the same
71  * generation of [PagingData] is an unsupported operation.
72  *
73  * @param scope The [CoroutineScope] where the page cache will be kept alive. Typically this would
74  *   be a managed scope such as `ViewModel.viewModelScope`, which automatically cancels after the
75  *   [PagingData] stream is no longer needed. Otherwise, the provided [CoroutineScope] must be
76  *   manually cancelled to avoid memory leaks.
77  */
78 @ExperimentalCoroutinesApi
cachedInnull79 fun <T : Any> Observable<PagingData<T>>.cachedIn(scope: CoroutineScope): Observable<PagingData<T>> {
80     return toFlowable(BackpressureStrategy.LATEST).asFlow().cachedIn(scope).asObservable()
81 }
82 
83 /**
84  * Operator which caches a [Flowable] of [PagingData] within a [CoroutineScope].
85  *
86  * [cachedIn] multicasts pages loaded and transformed by a [PagingData], allowing multiple observers
87  * on the same instance of [PagingData] to receive the same events, avoiding redundant work, but
88  * comes at the cost of buffering those pages in memory.
89  *
90  * Calling [cachedIn] is required to allow calling
91  * [submitData][androidx.paging.AsyncPagingDataAdapter] on the same instance of [PagingData] emitted
92  * by [Pager] or any of its transformed derivatives, as reloading data from scratch on the same
93  * generation of [PagingData] is an unsupported operation.
94  *
95  * @param scope The [CoroutineScope] where the page cache will be kept alive. Typically this would
96  *   be a managed scope such as `ViewModel.viewModelScope`, which automatically cancels after the
97  *   [PagingData] stream is no longer needed. Otherwise, the provided [CoroutineScope] must be
98  *   manually cancelled to avoid memory leaks.
99  */
100 @ExperimentalCoroutinesApi
cachedInnull101 fun <T : Any> Flowable<PagingData<T>>.cachedIn(scope: CoroutineScope): Flowable<PagingData<T>> {
102     return asFlow().cachedIn(scope).asFlowable()
103 }
104