1 /*
<lambda>null2  * Copyright 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package androidx.paging
17 
18 import androidx.kruth.assertThat
19 import androidx.paging.ActiveFlowTracker.FlowType
20 import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW
21 import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW
22 import androidx.paging.internal.AtomicInt
23 import kotlin.test.Test
24 import kotlinx.coroutines.CoroutineScope
25 import kotlinx.coroutines.Dispatchers
26 import kotlinx.coroutines.ExperimentalCoroutinesApi
27 import kotlinx.coroutines.Job
28 import kotlinx.coroutines.SupervisorJob
29 import kotlinx.coroutines.cancelAndJoin
30 import kotlinx.coroutines.flow.Flow
31 import kotlinx.coroutines.flow.SharedFlow
32 import kotlinx.coroutines.flow.catch
33 import kotlinx.coroutines.flow.filterIsInstance
34 import kotlinx.coroutines.flow.first
35 import kotlinx.coroutines.flow.map
36 import kotlinx.coroutines.flow.mapLatest
37 import kotlinx.coroutines.flow.onEach
38 import kotlinx.coroutines.flow.toList
39 import kotlinx.coroutines.launch
40 import kotlinx.coroutines.test.TestScope
41 import kotlinx.coroutines.test.UnconfinedTestDispatcher
42 import kotlinx.coroutines.test.runCurrent
43 import kotlinx.coroutines.test.runTest
44 import kotlinx.coroutines.yield
45 
46 @OptIn(ExperimentalCoroutinesApi::class)
47 class CachingTest {
48     private val tracker = ActiveFlowTrackerImpl()
49 
50     private val testScope = TestScope(UnconfinedTestDispatcher())
51 
52     @Test
53     fun noSharing() =
54         testScope.runTest {
55             val pageFlow = buildPageFlow()
56             val firstCollect = pageFlow.collectItemsUntilSize(6)
57             val secondCollect = pageFlow.collectItemsUntilSize(9)
58             assertThat(firstCollect)
59                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6))
60 
61             assertThat(secondCollect)
62                 .isEqualTo(buildItems(version = 1, generation = 0, start = 0, size = 9))
63             assertThat(tracker.pageDataFlowCount()).isEqualTo(0)
64         }
65 
66     @Test
67     fun cached() =
68         testScope.runTest {
69             val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker)
70             val firstCollect = pageFlow.collectItemsUntilSize(6)
71             val secondCollect = pageFlow.collectItemsUntilSize(9)
72             assertThat(firstCollect)
73                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6))
74 
75             assertThat(secondCollect)
76                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 9))
77             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
78         }
79 
80     @Test
81     fun cachedData() =
82         testScope.runTest {
83             val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker)
84             assertThat(pageFlow).isInstanceOf<SharedFlow<PagingData<Item>>>()
85             assertThat((pageFlow as SharedFlow<PagingData<Item>>).replayCache).isEmpty()
86 
87             pageFlow.collectItemsUntilSize(6)
88             val firstCachedData = pageFlow.cachedData()
89             assertThat(firstCachedData)
90                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6))
91 
92             pageFlow.collectItemsUntilSize(9)
93             val secondCachedEvent = pageFlow.cachedData()
94             assertThat(secondCachedEvent)
95                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 9))
96             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
97         }
98 
99     @Test
100     fun cached_afterMapping() =
101         testScope.runTest {
102             var mappingCnt = 0
103             val pageFlow =
104                 buildPageFlow()
105                     .map { pagingData ->
106                         val mappingIndex = mappingCnt++
107                         pagingData.map { it.copy(metadata = mappingIndex.toString()) }
108                     }
109                     .cachedIn(backgroundScope, tracker)
110             val firstCollect = pageFlow.collectItemsUntilSize(6)
111             val secondCollect = pageFlow.collectItemsUntilSize(9)
112             assertThat(firstCollect)
113                 .isEqualTo(
114                     buildItems(version = 0, generation = 0, start = 0, size = 6) {
115                         it.copy(metadata = "0")
116                     }
117                 )
118 
119             assertThat(secondCollect)
120                 .isEqualTo(
121                     buildItems(version = 0, generation = 0, start = 0, size = 9) {
122                         it.copy(metadata = "0")
123                     }
124                 )
125             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
126         }
127 
128     @Test
129     fun cachedData_afterMapping() =
130         testScope.runTest {
131             var mappingCnt = 0
132             val pageFlow =
133                 buildPageFlow()
134                     .map { pagingData ->
135                         val mappingIndex = mappingCnt++
136                         pagingData.map { it.copy(metadata = mappingIndex.toString()) }
137                     }
138                     .cachedIn(backgroundScope, tracker)
139 
140             pageFlow.collectItemsUntilSize(6)
141             val firstCachedData = pageFlow.cachedData()
142             assertThat(firstCachedData)
143                 .isEqualTo(
144                     buildItems(version = 0, generation = 0, start = 0, size = 6) {
145                         it.copy(metadata = "0")
146                     }
147                 )
148 
149             pageFlow.collectItemsUntilSize(9)
150             val secondCachedData = pageFlow.cachedData()
151             assertThat(secondCachedData)
152                 .isEqualTo(
153                     buildItems(version = 0, generation = 0, start = 0, size = 9) {
154                         it.copy(metadata = "0")
155                     }
156                 )
157             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
158         }
159 
160     @Test
161     fun cached_beforeMapping() =
162         testScope.runTest {
163             var mappingCnt = 0
164             val pageFlow =
165                 buildPageFlow().cachedIn(backgroundScope, tracker).map { pagingData ->
166                     val mappingIndex = mappingCnt++
167                     pagingData.map { it.copy(metadata = mappingIndex.toString()) }
168                 }
169             val firstCollect = pageFlow.collectItemsUntilSize(6)
170             val secondCollect = pageFlow.collectItemsUntilSize(9)
171             assertThat(firstCollect)
172                 .isEqualTo(
173                     buildItems(version = 0, generation = 0, start = 0, size = 6) {
174                         it.copy(metadata = "0")
175                     }
176                 )
177 
178             assertThat(secondCollect)
179                 .isEqualTo(
180                     buildItems(version = 0, generation = 0, start = 0, size = 9) {
181                         it.copy(metadata = "1")
182                     }
183                 )
184             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
185         }
186 
187     @Test
188     fun cachedData_beforeMapping() =
189         testScope.runTest {
190             var mappingCnt = 0
191             val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker)
192             val mappedFlow =
193                 pageFlow.map { pagingData ->
194                     val mappingIndex = mappingCnt++
195                     pagingData.map { it.copy(metadata = mappingIndex.toString()) }
196                 }
197             // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's
198             // replayCache. You can still access latest cachedData directly from pre-mapped flow.
199             mappedFlow.collectItemsUntilSize(6)
200             val firstCachedData = pageFlow.cachedData()
201             assertThat(firstCachedData)
202                 .isEqualTo(
203                     buildItems(
204                         version = 0,
205                         generation = 0,
206                         start = 0,
207                         size = 6,
208                         modifier = null // before mapping
209                     )
210                 )
211 
212             mappedFlow.collectItemsUntilSize(9)
213             val secondCachedEvent = pageFlow.cachedData()
214             assertThat(secondCachedEvent)
215                 .isEqualTo(
216                     buildItems(
217                         version = 0,
218                         generation = 0,
219                         start = 0,
220                         size = 9,
221                         modifier = null // before mapping
222                     )
223                 )
224             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
225         }
226 
227     @Test
228     fun cached_afterMapping_withMoreMappingAfterwards() =
229         testScope.runTest {
230             var mappingCnt = 0
231             val pageFlow =
232                 buildPageFlow()
233                     .map { pagingData ->
234                         val mappingIndex = mappingCnt++
235                         pagingData.map { it.copy(metadata = mappingIndex.toString()) }
236                     }
237                     .cachedIn(backgroundScope, tracker)
238                     .map { pagingData ->
239                         val mappingIndex = mappingCnt++
240                         pagingData.map { it.copy(metadata = "${it.metadata}_$mappingIndex") }
241                     }
242             val firstCollect = pageFlow.collectItemsUntilSize(6)
243             val secondCollect = pageFlow.collectItemsUntilSize(9)
244             assertThat(firstCollect)
245                 .isEqualTo(
246                     buildItems(version = 0, generation = 0, start = 0, size = 6) {
247                         it.copy(metadata = "0_1")
248                     }
249                 )
250 
251             assertThat(secondCollect)
252                 .isEqualTo(
253                     buildItems(version = 0, generation = 0, start = 0, size = 9) {
254                         it.copy(metadata = "0_2")
255                     }
256                 )
257             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
258         }
259 
260     @Test
261     fun cachedData_afterMapping_withMoreMappingAfterwards() =
262         testScope.runTest {
263             var mappingCnt = 0
264             val pageFlow =
265                 buildPageFlow()
266                     .map { pagingData ->
267                         val mappingIndex = mappingCnt++
268                         pagingData.map { it.copy(metadata = mappingIndex.toString()) }
269                     }
270                     .cachedIn(backgroundScope, tracker)
271             val mappedFlow =
272                 pageFlow.map { pagingData ->
273                     val mappingIndex = mappingCnt++
274                     pagingData.map { it.copy(metadata = "${it.metadata}_$mappingIndex") }
275                 }
276             // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's
277             // replayCache. You can still access latest cachedData directly from pre-mapped flow.
278             mappedFlow.collectItemsUntilSize(6)
279             val firstCachedData = pageFlow.cachedData()
280             assertThat(firstCachedData)
281                 .isEqualTo(
282                     buildItems(version = 0, generation = 0, start = 0, size = 6) {
283                         it.copy(metadata = "0") // with mapping before cache
284                     }
285                 )
286 
287             mappedFlow.collectItemsUntilSize(9)
288             val secondCachedEvent = pageFlow.cachedData()
289             assertThat(secondCachedEvent)
290                 .isEqualTo(
291                     buildItems(version = 0, generation = 0, start = 0, size = 9) {
292                         it.copy(metadata = "0") // with mapping before cache
293                     }
294                 )
295             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
296         }
297 
298     @Test
299     fun pagesAreClosedProperty() =
300         testScope.runTest {
301             val job = SupervisorJob()
302             val subScope = CoroutineScope(job + Dispatchers.Default)
303             val pageFlow = buildPageFlow().cachedIn(subScope, tracker)
304             assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
305             assertThat(tracker.pageDataFlowCount()).isEqualTo(0)
306             val items = pageFlow.collectItemsUntilSize(9)
307             val firstList = buildItems(version = 0, generation = 0, start = 0, size = 9)
308             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
309             val items2 = pageFlow.collectItemsUntilSize(21)
310             assertThat(items2)
311                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 21))
312             assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
313             assertThat(tracker.pageDataFlowCount()).isEqualTo(1)
314             assertThat(items).isEqualTo(firstList)
315             job.cancelAndJoin()
316             assertThat(tracker.pageEventFlowCount()).isEqualTo(0)
317             assertThat(tracker.pageDataFlowCount()).isEqualTo(0)
318         }
319 
320     @Test
321     fun cachedWithPassiveCollector() =
322         testScope.runTest {
323             val flow = buildPageFlow().cachedIn(backgroundScope, tracker)
324             val passive = ItemCollector(flow)
325             passive.collectPassivelyIn(backgroundScope)
326             testScope.runCurrent()
327             // collecting on the paged source will trigger initial page
328             assertThat(passive.items())
329                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3))
330             val firstList = buildItems(version = 0, generation = 0, start = 0, size = 9)
331             // another collector is causing more items to be loaded, they should be reflected in the
332             // passive one
333             assertThat(flow.collectItemsUntilSize(9)).isEqualTo(firstList)
334             assertThat(passive.items()).isEqualTo(firstList)
335             val passive2 = ItemCollector(flow)
336             passive2.collectPassivelyIn(backgroundScope)
337             testScope.runCurrent()
338             // a new passive one should receive all existing items immediately
339             assertThat(passive2.items()).isEqualTo(firstList)
340 
341             // now we get another collector that'll fetch more pages, it should reflect in passives
342             val secondList = buildItems(version = 0, generation = 0, start = 0, size = 12)
343             // another collector is causing more items to be loaded, they should be reflected in the
344             // passive one
345             assertThat(flow.collectItemsUntilSize(12)).isEqualTo(secondList)
346             assertThat(passive.items()).isEqualTo(secondList)
347             assertThat(passive2.items()).isEqualTo(secondList)
348         }
349 
350     /**
351      * Test that, when cache is active but there is no active downstream collectors, intermediate
352      * invalidations create new PagingData BUT a new collector only sees the latest one.
353      */
354     @Test
355     public fun unusedPagingDataIsNeverCollectedByNewDownstream() =
356         testScope.runTest {
357             val factory = StringPagingSource.VersionedFactory()
358             val flow = buildPageFlow(factory).cachedIn(backgroundScope, tracker)
359             val collector = ItemCollector(flow)
360             val job = SupervisorJob()
361             val subScope = CoroutineScope(coroutineContext + job)
362             collector.collectPassivelyIn(subScope)
363             testScope.runCurrent()
364             assertThat(collector.items())
365                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3))
366             // finish that collector
367             job.cancelAndJoin()
368             assertThat(factory.nextVersion).isEqualTo(1)
369             repeat(10) {
370                 factory.invalidateLatest()
371                 testScope.runCurrent()
372             }
373             runCurrent()
374             // next version is 11, the last paged data we've created has version 10
375             assertThat(factory.nextVersion).isEqualTo(11)
376 
377             // create another collector from shared, should only receive 1 paging data and that
378             // should be the latest because previous PagingData is invalidated
379             val collector2 = ItemCollector(flow)
380             collector2.collectPassivelyIn(backgroundScope)
381             testScope.runCurrent()
382             assertThat(collector2.items())
383                 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 3))
384             assertThat(collector2.receivedPagingDataCount).isEqualTo(1)
385             testScope.runCurrent()
386             assertThat(factory.nextVersion).isEqualTo(11)
387             val activeCollection = flow.collectItemsUntilSize(9)
388             assertThat(activeCollection)
389                 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9))
390             testScope.runCurrent()
391             // make sure passive collector received those items as well
392             assertThat(collector2.items())
393                 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9))
394         }
395 
396     @Test
397     public fun unusedPagingDataIsNeverCached() =
398         testScope.runTest {
399             val factory = StringPagingSource.VersionedFactory()
400             val flow = buildPageFlow(factory).cachedIn(backgroundScope, tracker)
401             val collector = ItemCollector(flow)
402             val job = SupervisorJob()
403             val subScope = CoroutineScope(coroutineContext + job)
404             collector.collectPassivelyIn(subScope)
405             testScope.runCurrent()
406             // check that cachedData contains data from passive collector
407             assertThat(flow.cachedData())
408                 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3))
409             // finish that collector
410             job.cancelAndJoin()
411             assertThat(factory.nextVersion).isEqualTo(1)
412             repeat(10) {
413                 factory.invalidateLatest()
414                 testScope.runCurrent()
415             }
416             runCurrent()
417             // next version is 11, the last paged data we've created has version 10
418             assertThat(factory.nextVersion).isEqualTo(11)
419 
420             // the replayCache has paged data version 10 but no collection on this pagingData yet
421             // so it has no cachedEvent.
422             val cachedPagingData = (flow as SharedFlow<PagingData<Item>>).replayCache.first()
423             assertThat(cachedPagingData.cachedEvent()).isNull()
424 
425             // create another collector from shared, should only receive 1 paging data and that
426             // should be the latest because previous PagingData is invalidated
427             val collector2 = ItemCollector(flow)
428             collector2.collectPassivelyIn(backgroundScope)
429             testScope.runCurrent()
430             // now this PagingData has cachedEvents from version 10
431             assertThat(flow.cachedData())
432                 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 3))
433             assertThat(factory.nextVersion).isEqualTo(11)
434             // collect some more and ensure cachedData is still up-to-date
435             flow.collectItemsUntilSize(9)
436             assertThat(flow.cachedData())
437                 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9))
438         }
439 
440     private fun buildPageFlow(
441         factory: StringPagingSource.VersionedFactory = StringPagingSource.VersionedFactory()
442     ): Flow<PagingData<Item>> {
443         return Pager(
444                 pagingSourceFactory = factory::create,
445                 config =
446                     PagingConfig(
447                         pageSize = 3,
448                         prefetchDistance = 1,
449                         enablePlaceholders = false,
450                         initialLoadSize = 3,
451                         maxSize = 1000
452                     )
453             )
454             .flow
455     }
456 
457     /**
458      * Used for assertions internally to ensure we don't get some data with wrong generation during
459      * collection. This shouldn't happen but happened during development so it is best to add
460      * assertions for it.
461      */
462     private val PagingData<Item>.version
463         get(): Int {
464             return ((hintReceiver as PageFetcher<*, *>.PagerHintReceiver<*, *>)
465                     .pageFetcherSnapshot
466                     .pagingSource as StringPagingSource)
467                 .version
468         }
469 
470     private suspend fun Flow<PagingData<Item>>.collectItemsUntilSize(
471         expectedSize: Int,
472     ): List<Item> {
473         return this.mapLatest { pagingData ->
474                 val expectedVersion = pagingData.version
475                 val items = mutableListOf<Item>()
476                 yield() // this yield helps w/ cancellation wrt mapLatest
477                 val receiver = pagingData.hintReceiver
478                 var loadedPageCount = 0
479                 pagingData.flow
480                     .filterIsInstance<PageEvent.Insert<Item>>()
481                     .onEach {
482                         items.addAll(
483                             it.pages.flatMap {
484                                 assertThat(it.data.map { it.pagingSourceId }.toSet())
485                                     .containsExactly(expectedVersion)
486                                 it.data
487                             }
488                         )
489                         loadedPageCount += it.pages.size
490                         if (items.size < expectedSize) {
491                             receiver.accessHint(
492                                 ViewportHint.Access(
493                                     pageOffset = loadedPageCount - 1,
494                                     indexInPage = it.pages.last().data.size - 1,
495                                     presentedItemsBefore = it.pages.sumOf { it.data.size } - 1,
496                                     presentedItemsAfter = 0,
497                                     originalPageOffsetFirst =
498                                         it.pages.first().originalPageOffsets.minOrNull()!!,
499                                     originalPageOffsetLast =
500                                         it.pages.last().originalPageOffsets.maxOrNull()!!
501                                 )
502                             )
503                         } else {
504                             throw AbortCollectionException()
505                         }
506                     }
507                     .catch { ex ->
508                         if (ex !is AbortCollectionException) {
509                             throw ex
510                         }
511                     }
512                     .toList()
513                 items
514             }
515             .first()
516     }
517 
518     private fun Flow<PagingData<Item>>.cachedData(): List<Item> {
519         assertThat(this).isInstanceOf<SharedFlow<PagingData<Item>>>()
520         val flow = this as SharedFlow<PagingData<Item>>
521         assertThat(flow.replayCache).isNotEmpty()
522 
523         val pagingData = flow.replayCache.firstOrNull()
524         assertThat(pagingData).isNotNull()
525 
526         val event = pagingData!!.cachedEvent()
527         assertThat(event).isInstanceOf<PageEvent.Insert<Item>>()
528 
529         return (event as PageEvent.Insert<Item>).pages.flatMap { it.data }
530     }
531 
532     /** Paged list collector that does not call any hints but always collects */
533     private class ItemCollector(val source: Flow<PagingData<Item>>) {
534         private var items: List<Item> = emptyList()
535         private var job: Job? = null
536         var receivedPagingDataCount = 0
537             private set
538 
539         /**
540          * Collect w/o calling any UI hints so it more like observing the stream w/o affecting it.
541          */
542         fun collectPassivelyIn(scope: CoroutineScope) {
543             check(job == null) { "don't call collect twice" }
544             job = scope.launch { collectPassively() }
545         }
546 
547         private suspend fun collectPassively() {
548             source.collect {
549                 receivedPagingDataCount++
550                 // clear to latest
551                 val list = mutableListOf<Item>()
552                 items = list
553                 it.flow.filterIsInstance<PageEvent.Insert<Item>>().collect {
554                     it.pages.forEach { list.addAll(it.data) }
555                 }
556             }
557         }
558 
559         fun items() = items.toList()
560     }
561 
562     private class StringPagingSource(val version: Int) : PagingSource<Int, Item>() {
563         private var generation = -1
564 
565         override val keyReuseSupported: Boolean
566             get() = true
567 
568         override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Item> {
569             when (params) {
570                 is LoadParams.Refresh -> {
571                     generation++
572                     return doLoad(position = params.key ?: 0, size = params.loadSize)
573                 }
574                 is LoadParams.Prepend -> {
575                     val loadSize = minOf(params.key, params.loadSize)
576                     return doLoad(position = params.key - params.loadSize, size = loadSize)
577                 }
578                 is LoadParams.Append -> {
579                     return doLoad(position = params.key, size = params.loadSize)
580                 }
581             }
582         }
583 
584         override fun getRefreshKey(state: PagingState<Int, Item>): Int? = null
585 
586         private fun doLoad(position: Int, size: Int): LoadResult<Int, Item> {
587             return LoadResult.Page(
588                 data =
589                     buildItems(
590                         version = version,
591                         generation = generation,
592                         start = position,
593                         size = size
594                     ),
595                 prevKey = if (position == 0) null else position,
596                 nextKey = position + size
597             )
598         }
599 
600         class VersionedFactory {
601             var nextVersion = 0
602                 private set
603 
604             private var latestSource: StringPagingSource? = null
605 
606             fun create() = StringPagingSource(nextVersion++).also { latestSource = it }
607 
608             fun invalidateLatest() = latestSource?.invalidate()
609         }
610     }
611 
612     companion object {
613         private fun buildItems(
614             version: Int,
615             generation: Int,
616             start: Int,
617             size: Int,
618             modifier: ((Item) -> Item)? = null
619         ): List<Item> {
620             return (start until start + size).map { id ->
621                 Item(pagingSourceId = version, generation = generation, value = id).let {
622                     modifier?.invoke(it) ?: it
623                 }
624             }
625         }
626     }
627 
628     private data class Item(
629         /** which paged source generated this item */
630         val pagingSourceId: Int,
631         /** # of refresh counts in the paged source */
632         val generation: Int,
633         /** Item unique identifier */
634         val value: Int,
635 
636         /** Any additional data by transformations etc */
637         val metadata: String? = null
638     )
639 
640     private class ActiveFlowTrackerImpl : ActiveFlowTracker {
641         private val counters =
642             mapOf(PAGED_DATA_FLOW to AtomicInt(0), PAGE_EVENT_FLOW to AtomicInt(0))
643 
644         override fun onNewCachedEventFlow(cachedPageEventFlow: CachedPageEventFlow<*>) {}
645 
646         override suspend fun onStart(flowType: FlowType) {
647             (counters[flowType] ?: error("invalid type $flowType")).incrementAndGet()
648         }
649 
650         override suspend fun onComplete(flowType: FlowType) {
651             (counters[flowType] ?: error("invalid type $flowType")).decrementAndGet()
652         }
653 
654         fun pageDataFlowCount() = (counters[PAGED_DATA_FLOW] ?: error("unexpected")).get()
655 
656         fun pageEventFlowCount() = (counters[PAGE_EVENT_FLOW] ?: error("unexpected")).get()
657     }
658 
659     private class AbortCollectionException : Throwable()
660 }
661