1 /*
2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.kruth.assertThat
20 import kotlin.test.Test
21 import kotlinx.coroutines.CoroutineScope
22 import kotlinx.coroutines.ExperimentalCoroutinesApi
23 import kotlinx.coroutines.Job
24 import kotlinx.coroutines.channels.Channel
25 import kotlinx.coroutines.delay
26 import kotlinx.coroutines.flow.Flow
27 import kotlinx.coroutines.flow.consumeAsFlow
28 import kotlinx.coroutines.flow.onEach
29 import kotlinx.coroutines.launch
30 import kotlinx.coroutines.test.TestScope
31 import kotlinx.coroutines.test.UnconfinedTestDispatcher
32 import kotlinx.coroutines.test.advanceTimeBy
33 import kotlinx.coroutines.test.advanceUntilIdle
34 import kotlinx.coroutines.test.runCurrent
35 import kotlinx.coroutines.test.runTest
36 
37 @OptIn(ExperimentalCoroutinesApi::class)
38 class CachedPageEventFlowTest {
39     private val testScope = TestScope(UnconfinedTestDispatcher())
40 
slowFastCollectors_CloseUpstreamnull41     @Test fun slowFastCollectors_CloseUpstream() = slowFastCollectors(TerminationType.CloseUpstream)
42 
43     @Test
44     fun slowFastCollectors_CloseCachedEventFlow() =
45         slowFastCollectors(TerminationType.CloseCachedEventFlow)
46 
47     private fun slowFastCollectors(terminationType: TerminationType) =
48         testScope.runTest {
49             val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED)
50             val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope)
51             val fastCollector = PageCollector(subject.downstreamFlow)
52             fastCollector.collectIn(testScope)
53             val slowCollector = PageCollector(subject.downstreamFlow.onEach { delay(1_000) })
54             slowCollector.collectIn(testScope)
55             val refreshEvent =
56                 localRefresh(
57                     listOf(TransformablePage(listOf("a", "b", "c"))),
58                 )
59             upstream.send(refreshEvent)
60             runCurrent()
61             assertThat(fastCollector.items()).containsExactly(refreshEvent)
62             assertThat(slowCollector.items()).isEmpty()
63 
64             val appendEvent =
65                 localAppend(
66                     listOf(TransformablePage(listOf("d", "e"))),
67                 )
68             upstream.send(appendEvent)
69             runCurrent()
70             assertThat(fastCollector.items()).containsExactly(refreshEvent, appendEvent)
71             assertThat(slowCollector.items()).isEmpty()
72             advanceTimeBy(3_000)
73             assertThat(slowCollector.items()).containsExactly(refreshEvent, appendEvent)
74             val manyNewAppendEvents =
75                 (0 until 100).map {
76                     localAppend(
77                         listOf(TransformablePage(listOf("f", "g"))),
78                     )
79                 }
80             manyNewAppendEvents.forEach { upstream.send(it) }
81             val lateSlowCollector = PageCollector(subject.downstreamFlow.onEach { delay(1_000) })
82             lateSlowCollector.collectIn(testScope)
83             val finalAppendEvent =
84                 localAppend(
85                     listOf(TransformablePage(listOf("d", "e"))),
86                 )
87             upstream.send(finalAppendEvent)
88             when (terminationType) {
89                 TerminationType.CloseUpstream -> upstream.close()
90                 TerminationType.CloseCachedEventFlow -> subject.close()
91             }
92             val fullList =
93                 listOf(refreshEvent, appendEvent) + manyNewAppendEvents + finalAppendEvent
94             runCurrent()
95             assertThat(fastCollector.items()).containsExactlyElementsIn(fullList).inOrder()
96             assertThat(fastCollector.isActive()).isFalse()
97             assertThat(slowCollector.isActive()).isTrue()
98             assertThat(lateSlowCollector.isActive()).isTrue()
99             advanceUntilIdle()
100             assertThat(slowCollector.items()).containsExactlyElementsIn(fullList).inOrder()
101             assertThat(slowCollector.isActive()).isFalse()
102 
103             val lateCollectorState =
104                 localRefresh(
105                     pages =
106                         (listOf(refreshEvent, appendEvent) + manyNewAppendEvents).flatMap {
107                             it.pages
108                         },
109                 )
110             assertThat(lateSlowCollector.items())
111                 .containsExactly(lateCollectorState, finalAppendEvent)
112                 .inOrder()
113             assertThat(lateSlowCollector.isActive()).isFalse()
114 
115             upstream.close()
116         }
117 
ensureSharing_CloseUpstreamnull118     @Test fun ensureSharing_CloseUpstream() = ensureSharing(TerminationType.CloseUpstream)
119 
120     @Test
121     fun ensureSharing_CloseCachedEventFlow() = ensureSharing(TerminationType.CloseCachedEventFlow)
122 
123     private fun ensureSharing(terminationType: TerminationType) =
124         testScope.runTest {
125             val refreshEvent =
126                 localRefresh(
127                     listOf(TransformablePage(listOf("a", "b", "c"))),
128                 )
129             val appendEvent =
130                 localAppend(
131                     listOf(TransformablePage(listOf("d", "e"))),
132                 )
133             val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED)
134             val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope)
135 
136             val collector1 = PageCollector(subject.downstreamFlow)
137             upstream.send(refreshEvent)
138             upstream.send(appendEvent)
139             collector1.collectIn(testScope)
140             runCurrent()
141             assertThat(collector1.items()).isEqualTo(listOf(refreshEvent, appendEvent))
142             val collector2 = PageCollector(subject.downstreamFlow)
143             collector2.collectIn(testScope)
144             runCurrent()
145             val firstSnapshotRefreshEvent =
146                 localRefresh(
147                     listOf(
148                         TransformablePage(listOf("a", "b", "c")),
149                         TransformablePage(listOf("d", "e"))
150                     ),
151                 )
152             assertThat(collector2.items()).containsExactly(firstSnapshotRefreshEvent)
153             val prependEvent =
154                 localPrepend(
155                     listOf(
156                         TransformablePage(listOf("a0", "a1")),
157                         TransformablePage(listOf("a2", "a3"))
158                     ),
159                 )
160             upstream.send(prependEvent)
161             assertThat(collector1.items())
162                 .isEqualTo(listOf(refreshEvent, appendEvent, prependEvent))
163             assertThat(collector2.items())
164                 .isEqualTo(listOf(firstSnapshotRefreshEvent, prependEvent))
165             val collector3 = PageCollector(subject.downstreamFlow)
166             collector3.collectIn(testScope)
167             val finalState =
168                 localRefresh(
169                     listOf(
170                         TransformablePage(listOf("a0", "a1")),
171                         TransformablePage(listOf("a2", "a3")),
172                         TransformablePage(listOf("a", "b", "c")),
173                         TransformablePage(listOf("d", "e"))
174                     ),
175                 )
176             assertThat(collector3.items()).containsExactly(finalState)
177             assertThat(collector1.isActive()).isTrue()
178             assertThat(collector2.isActive()).isTrue()
179             assertThat(collector3.isActive()).isTrue()
180             when (terminationType) {
181                 TerminationType.CloseUpstream -> upstream.close()
182                 TerminationType.CloseCachedEventFlow -> subject.close()
183             }
184             runCurrent()
185             assertThat(collector1.isActive()).isFalse()
186             assertThat(collector2.isActive()).isFalse()
187             assertThat(collector3.isActive()).isFalse()
188             val collector4 = PageCollector(subject.downstreamFlow).also { it.collectIn(testScope) }
189             runCurrent()
190             // since upstream is closed, this should just close
191             assertThat(collector4.isActive()).isFalse()
192             assertThat(collector4.items()).containsExactly(finalState)
193         }
194 
195     @Test
emptyPage_singlelocalLoadStateUpdatenull196     fun emptyPage_singlelocalLoadStateUpdate() =
197         testScope.runTest {
198             val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED)
199             val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope)
200 
201             // creating two collectors and collecting right away to assert that all collectors
202             val collector = PageCollector(subject.downstreamFlow)
203             collector.collectIn(testScope)
204 
205             val collector2 = PageCollector(subject.downstreamFlow)
206             collector2.collectIn(testScope)
207 
208             runCurrent()
209 
210             // until upstream sends events, collectors shouldn't receive any events
211             assertThat(collector.items()).isEmpty()
212             assertThat(collector2.items()).isEmpty()
213 
214             // now send refresh event
215             val refreshEvent =
216                 localRefresh(
217                     listOf(TransformablePage(listOf("a", "b", "c"))),
218                 )
219             upstream.send(refreshEvent)
220             runCurrent()
221 
222             assertThat(collector.items()).containsExactly(refreshEvent)
223 
224             assertThat(collector2.items()).containsExactly(refreshEvent)
225 
226             upstream.close()
227         }
228 
229     @Test
idleStateUpdate_collectedBySingleCollectornull230     fun idleStateUpdate_collectedBySingleCollector() =
231         testScope.runTest {
232             val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED)
233             val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope)
234 
235             val refreshEvent =
236                 localRefresh(
237                     listOf(TransformablePage(listOf("a", "b", "c"))),
238                 )
239             upstream.send(refreshEvent)
240             runCurrent()
241 
242             val collector = PageCollector(subject.downstreamFlow)
243             collector.collectIn(testScope)
244 
245             runCurrent()
246 
247             // collector shouldn't receive any idle events before the refresh
248             assertThat(collector.items()).containsExactly(refreshEvent)
249 
250             val delayedCollector = PageCollector(subject.downstreamFlow)
251             delayedCollector.collectIn(testScope)
252 
253             // delayed collector shouldn't receive any idle events since we already have refresh
254             assertThat(delayedCollector.items()).containsExactly(refreshEvent)
255 
256             upstream.close()
257         }
258 
259     private class PageCollector<T : Any>(val src: Flow<T>) {
260         private val items = mutableListOf<T>()
261         private var job: Job? = null
262 
collectInnull263         fun collectIn(scope: CoroutineScope) {
264             job = scope.launch { src.collect { items.add(it) } }
265         }
266 
isActivenull267         fun isActive() = job?.isActive ?: false
268 
269         fun items() = items.toList()
270     }
271 
272     enum class TerminationType {
273         CloseUpstream,
274         CloseCachedEventFlow,
275     }
276 }
277