1 /* 2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.kruth.assertThat 20 import kotlin.test.Test 21 import kotlinx.coroutines.CoroutineScope 22 import kotlinx.coroutines.ExperimentalCoroutinesApi 23 import kotlinx.coroutines.Job 24 import kotlinx.coroutines.channels.Channel 25 import kotlinx.coroutines.delay 26 import kotlinx.coroutines.flow.Flow 27 import kotlinx.coroutines.flow.consumeAsFlow 28 import kotlinx.coroutines.flow.onEach 29 import kotlinx.coroutines.launch 30 import kotlinx.coroutines.test.TestScope 31 import kotlinx.coroutines.test.UnconfinedTestDispatcher 32 import kotlinx.coroutines.test.advanceTimeBy 33 import kotlinx.coroutines.test.advanceUntilIdle 34 import kotlinx.coroutines.test.runCurrent 35 import kotlinx.coroutines.test.runTest 36 37 @OptIn(ExperimentalCoroutinesApi::class) 38 class CachedPageEventFlowTest { 39 private val testScope = TestScope(UnconfinedTestDispatcher()) 40 slowFastCollectors_CloseUpstreamnull41 @Test fun slowFastCollectors_CloseUpstream() = slowFastCollectors(TerminationType.CloseUpstream) 42 43 @Test 44 fun slowFastCollectors_CloseCachedEventFlow() = 45 slowFastCollectors(TerminationType.CloseCachedEventFlow) 46 47 private fun slowFastCollectors(terminationType: TerminationType) = 48 testScope.runTest { 49 val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED) 50 val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope) 51 val fastCollector = PageCollector(subject.downstreamFlow) 52 fastCollector.collectIn(testScope) 53 val slowCollector = PageCollector(subject.downstreamFlow.onEach { delay(1_000) }) 54 slowCollector.collectIn(testScope) 55 val refreshEvent = 56 localRefresh( 57 listOf(TransformablePage(listOf("a", "b", "c"))), 58 ) 59 upstream.send(refreshEvent) 60 runCurrent() 61 assertThat(fastCollector.items()).containsExactly(refreshEvent) 62 assertThat(slowCollector.items()).isEmpty() 63 64 val appendEvent = 65 localAppend( 66 listOf(TransformablePage(listOf("d", "e"))), 67 ) 68 upstream.send(appendEvent) 69 runCurrent() 70 assertThat(fastCollector.items()).containsExactly(refreshEvent, appendEvent) 71 assertThat(slowCollector.items()).isEmpty() 72 advanceTimeBy(3_000) 73 assertThat(slowCollector.items()).containsExactly(refreshEvent, appendEvent) 74 val manyNewAppendEvents = 75 (0 until 100).map { 76 localAppend( 77 listOf(TransformablePage(listOf("f", "g"))), 78 ) 79 } 80 manyNewAppendEvents.forEach { upstream.send(it) } 81 val lateSlowCollector = PageCollector(subject.downstreamFlow.onEach { delay(1_000) }) 82 lateSlowCollector.collectIn(testScope) 83 val finalAppendEvent = 84 localAppend( 85 listOf(TransformablePage(listOf("d", "e"))), 86 ) 87 upstream.send(finalAppendEvent) 88 when (terminationType) { 89 TerminationType.CloseUpstream -> upstream.close() 90 TerminationType.CloseCachedEventFlow -> subject.close() 91 } 92 val fullList = 93 listOf(refreshEvent, appendEvent) + manyNewAppendEvents + finalAppendEvent 94 runCurrent() 95 assertThat(fastCollector.items()).containsExactlyElementsIn(fullList).inOrder() 96 assertThat(fastCollector.isActive()).isFalse() 97 assertThat(slowCollector.isActive()).isTrue() 98 assertThat(lateSlowCollector.isActive()).isTrue() 99 advanceUntilIdle() 100 assertThat(slowCollector.items()).containsExactlyElementsIn(fullList).inOrder() 101 assertThat(slowCollector.isActive()).isFalse() 102 103 val lateCollectorState = 104 localRefresh( 105 pages = 106 (listOf(refreshEvent, appendEvent) + manyNewAppendEvents).flatMap { 107 it.pages 108 }, 109 ) 110 assertThat(lateSlowCollector.items()) 111 .containsExactly(lateCollectorState, finalAppendEvent) 112 .inOrder() 113 assertThat(lateSlowCollector.isActive()).isFalse() 114 115 upstream.close() 116 } 117 ensureSharing_CloseUpstreamnull118 @Test fun ensureSharing_CloseUpstream() = ensureSharing(TerminationType.CloseUpstream) 119 120 @Test 121 fun ensureSharing_CloseCachedEventFlow() = ensureSharing(TerminationType.CloseCachedEventFlow) 122 123 private fun ensureSharing(terminationType: TerminationType) = 124 testScope.runTest { 125 val refreshEvent = 126 localRefresh( 127 listOf(TransformablePage(listOf("a", "b", "c"))), 128 ) 129 val appendEvent = 130 localAppend( 131 listOf(TransformablePage(listOf("d", "e"))), 132 ) 133 val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED) 134 val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope) 135 136 val collector1 = PageCollector(subject.downstreamFlow) 137 upstream.send(refreshEvent) 138 upstream.send(appendEvent) 139 collector1.collectIn(testScope) 140 runCurrent() 141 assertThat(collector1.items()).isEqualTo(listOf(refreshEvent, appendEvent)) 142 val collector2 = PageCollector(subject.downstreamFlow) 143 collector2.collectIn(testScope) 144 runCurrent() 145 val firstSnapshotRefreshEvent = 146 localRefresh( 147 listOf( 148 TransformablePage(listOf("a", "b", "c")), 149 TransformablePage(listOf("d", "e")) 150 ), 151 ) 152 assertThat(collector2.items()).containsExactly(firstSnapshotRefreshEvent) 153 val prependEvent = 154 localPrepend( 155 listOf( 156 TransformablePage(listOf("a0", "a1")), 157 TransformablePage(listOf("a2", "a3")) 158 ), 159 ) 160 upstream.send(prependEvent) 161 assertThat(collector1.items()) 162 .isEqualTo(listOf(refreshEvent, appendEvent, prependEvent)) 163 assertThat(collector2.items()) 164 .isEqualTo(listOf(firstSnapshotRefreshEvent, prependEvent)) 165 val collector3 = PageCollector(subject.downstreamFlow) 166 collector3.collectIn(testScope) 167 val finalState = 168 localRefresh( 169 listOf( 170 TransformablePage(listOf("a0", "a1")), 171 TransformablePage(listOf("a2", "a3")), 172 TransformablePage(listOf("a", "b", "c")), 173 TransformablePage(listOf("d", "e")) 174 ), 175 ) 176 assertThat(collector3.items()).containsExactly(finalState) 177 assertThat(collector1.isActive()).isTrue() 178 assertThat(collector2.isActive()).isTrue() 179 assertThat(collector3.isActive()).isTrue() 180 when (terminationType) { 181 TerminationType.CloseUpstream -> upstream.close() 182 TerminationType.CloseCachedEventFlow -> subject.close() 183 } 184 runCurrent() 185 assertThat(collector1.isActive()).isFalse() 186 assertThat(collector2.isActive()).isFalse() 187 assertThat(collector3.isActive()).isFalse() 188 val collector4 = PageCollector(subject.downstreamFlow).also { it.collectIn(testScope) } 189 runCurrent() 190 // since upstream is closed, this should just close 191 assertThat(collector4.isActive()).isFalse() 192 assertThat(collector4.items()).containsExactly(finalState) 193 } 194 195 @Test emptyPage_singlelocalLoadStateUpdatenull196 fun emptyPage_singlelocalLoadStateUpdate() = 197 testScope.runTest { 198 val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED) 199 val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope) 200 201 // creating two collectors and collecting right away to assert that all collectors 202 val collector = PageCollector(subject.downstreamFlow) 203 collector.collectIn(testScope) 204 205 val collector2 = PageCollector(subject.downstreamFlow) 206 collector2.collectIn(testScope) 207 208 runCurrent() 209 210 // until upstream sends events, collectors shouldn't receive any events 211 assertThat(collector.items()).isEmpty() 212 assertThat(collector2.items()).isEmpty() 213 214 // now send refresh event 215 val refreshEvent = 216 localRefresh( 217 listOf(TransformablePage(listOf("a", "b", "c"))), 218 ) 219 upstream.send(refreshEvent) 220 runCurrent() 221 222 assertThat(collector.items()).containsExactly(refreshEvent) 223 224 assertThat(collector2.items()).containsExactly(refreshEvent) 225 226 upstream.close() 227 } 228 229 @Test idleStateUpdate_collectedBySingleCollectornull230 fun idleStateUpdate_collectedBySingleCollector() = 231 testScope.runTest { 232 val upstream = Channel<PageEvent<String>>(Channel.UNLIMITED) 233 val subject = CachedPageEventFlow(src = upstream.consumeAsFlow(), scope = testScope) 234 235 val refreshEvent = 236 localRefresh( 237 listOf(TransformablePage(listOf("a", "b", "c"))), 238 ) 239 upstream.send(refreshEvent) 240 runCurrent() 241 242 val collector = PageCollector(subject.downstreamFlow) 243 collector.collectIn(testScope) 244 245 runCurrent() 246 247 // collector shouldn't receive any idle events before the refresh 248 assertThat(collector.items()).containsExactly(refreshEvent) 249 250 val delayedCollector = PageCollector(subject.downstreamFlow) 251 delayedCollector.collectIn(testScope) 252 253 // delayed collector shouldn't receive any idle events since we already have refresh 254 assertThat(delayedCollector.items()).containsExactly(refreshEvent) 255 256 upstream.close() 257 } 258 259 private class PageCollector<T : Any>(val src: Flow<T>) { 260 private val items = mutableListOf<T>() 261 private var job: Job? = null 262 collectInnull263 fun collectIn(scope: CoroutineScope) { 264 job = scope.launch { src.collect { items.add(it) } } 265 } 266 isActivenull267 fun isActive() = job?.isActive ?: false 268 269 fun items() = items.toList() 270 } 271 272 enum class TerminationType { 273 CloseUpstream, 274 CloseCachedEventFlow, 275 } 276 } 277