1 /* <lambda>null2 * Copyright 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package androidx.paging 17 18 import androidx.kruth.assertThat 19 import androidx.paging.ActiveFlowTracker.FlowType 20 import androidx.paging.ActiveFlowTracker.FlowType.PAGED_DATA_FLOW 21 import androidx.paging.ActiveFlowTracker.FlowType.PAGE_EVENT_FLOW 22 import androidx.paging.internal.AtomicInt 23 import kotlin.test.Test 24 import kotlinx.coroutines.CoroutineScope 25 import kotlinx.coroutines.Dispatchers 26 import kotlinx.coroutines.ExperimentalCoroutinesApi 27 import kotlinx.coroutines.Job 28 import kotlinx.coroutines.SupervisorJob 29 import kotlinx.coroutines.cancelAndJoin 30 import kotlinx.coroutines.flow.Flow 31 import kotlinx.coroutines.flow.SharedFlow 32 import kotlinx.coroutines.flow.catch 33 import kotlinx.coroutines.flow.filterIsInstance 34 import kotlinx.coroutines.flow.first 35 import kotlinx.coroutines.flow.map 36 import kotlinx.coroutines.flow.mapLatest 37 import kotlinx.coroutines.flow.onEach 38 import kotlinx.coroutines.flow.toList 39 import kotlinx.coroutines.launch 40 import kotlinx.coroutines.test.TestScope 41 import kotlinx.coroutines.test.UnconfinedTestDispatcher 42 import kotlinx.coroutines.test.runCurrent 43 import kotlinx.coroutines.test.runTest 44 import kotlinx.coroutines.yield 45 46 @OptIn(ExperimentalCoroutinesApi::class) 47 class CachingTest { 48 private val tracker = ActiveFlowTrackerImpl() 49 50 private val testScope = TestScope(UnconfinedTestDispatcher()) 51 52 @Test 53 fun noSharing() = 54 testScope.runTest { 55 val pageFlow = buildPageFlow() 56 val firstCollect = pageFlow.collectItemsUntilSize(6) 57 val secondCollect = pageFlow.collectItemsUntilSize(9) 58 assertThat(firstCollect) 59 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6)) 60 61 assertThat(secondCollect) 62 .isEqualTo(buildItems(version = 1, generation = 0, start = 0, size = 9)) 63 assertThat(tracker.pageDataFlowCount()).isEqualTo(0) 64 } 65 66 @Test 67 fun cached() = 68 testScope.runTest { 69 val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker) 70 val firstCollect = pageFlow.collectItemsUntilSize(6) 71 val secondCollect = pageFlow.collectItemsUntilSize(9) 72 assertThat(firstCollect) 73 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6)) 74 75 assertThat(secondCollect) 76 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 9)) 77 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 78 } 79 80 @Test 81 fun cachedData() = 82 testScope.runTest { 83 val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker) 84 assertThat(pageFlow).isInstanceOf<SharedFlow<PagingData<Item>>>() 85 assertThat((pageFlow as SharedFlow<PagingData<Item>>).replayCache).isEmpty() 86 87 pageFlow.collectItemsUntilSize(6) 88 val firstCachedData = pageFlow.cachedData() 89 assertThat(firstCachedData) 90 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 6)) 91 92 pageFlow.collectItemsUntilSize(9) 93 val secondCachedEvent = pageFlow.cachedData() 94 assertThat(secondCachedEvent) 95 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 9)) 96 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 97 } 98 99 @Test 100 fun cached_afterMapping() = 101 testScope.runTest { 102 var mappingCnt = 0 103 val pageFlow = 104 buildPageFlow() 105 .map { pagingData -> 106 val mappingIndex = mappingCnt++ 107 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 108 } 109 .cachedIn(backgroundScope, tracker) 110 val firstCollect = pageFlow.collectItemsUntilSize(6) 111 val secondCollect = pageFlow.collectItemsUntilSize(9) 112 assertThat(firstCollect) 113 .isEqualTo( 114 buildItems(version = 0, generation = 0, start = 0, size = 6) { 115 it.copy(metadata = "0") 116 } 117 ) 118 119 assertThat(secondCollect) 120 .isEqualTo( 121 buildItems(version = 0, generation = 0, start = 0, size = 9) { 122 it.copy(metadata = "0") 123 } 124 ) 125 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 126 } 127 128 @Test 129 fun cachedData_afterMapping() = 130 testScope.runTest { 131 var mappingCnt = 0 132 val pageFlow = 133 buildPageFlow() 134 .map { pagingData -> 135 val mappingIndex = mappingCnt++ 136 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 137 } 138 .cachedIn(backgroundScope, tracker) 139 140 pageFlow.collectItemsUntilSize(6) 141 val firstCachedData = pageFlow.cachedData() 142 assertThat(firstCachedData) 143 .isEqualTo( 144 buildItems(version = 0, generation = 0, start = 0, size = 6) { 145 it.copy(metadata = "0") 146 } 147 ) 148 149 pageFlow.collectItemsUntilSize(9) 150 val secondCachedData = pageFlow.cachedData() 151 assertThat(secondCachedData) 152 .isEqualTo( 153 buildItems(version = 0, generation = 0, start = 0, size = 9) { 154 it.copy(metadata = "0") 155 } 156 ) 157 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 158 } 159 160 @Test 161 fun cached_beforeMapping() = 162 testScope.runTest { 163 var mappingCnt = 0 164 val pageFlow = 165 buildPageFlow().cachedIn(backgroundScope, tracker).map { pagingData -> 166 val mappingIndex = mappingCnt++ 167 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 168 } 169 val firstCollect = pageFlow.collectItemsUntilSize(6) 170 val secondCollect = pageFlow.collectItemsUntilSize(9) 171 assertThat(firstCollect) 172 .isEqualTo( 173 buildItems(version = 0, generation = 0, start = 0, size = 6) { 174 it.copy(metadata = "0") 175 } 176 ) 177 178 assertThat(secondCollect) 179 .isEqualTo( 180 buildItems(version = 0, generation = 0, start = 0, size = 9) { 181 it.copy(metadata = "1") 182 } 183 ) 184 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 185 } 186 187 @Test 188 fun cachedData_beforeMapping() = 189 testScope.runTest { 190 var mappingCnt = 0 191 val pageFlow = buildPageFlow().cachedIn(backgroundScope, tracker) 192 val mappedFlow = 193 pageFlow.map { pagingData -> 194 val mappingIndex = mappingCnt++ 195 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 196 } 197 // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's 198 // replayCache. You can still access latest cachedData directly from pre-mapped flow. 199 mappedFlow.collectItemsUntilSize(6) 200 val firstCachedData = pageFlow.cachedData() 201 assertThat(firstCachedData) 202 .isEqualTo( 203 buildItems( 204 version = 0, 205 generation = 0, 206 start = 0, 207 size = 6, 208 modifier = null // before mapping 209 ) 210 ) 211 212 mappedFlow.collectItemsUntilSize(9) 213 val secondCachedEvent = pageFlow.cachedData() 214 assertThat(secondCachedEvent) 215 .isEqualTo( 216 buildItems( 217 version = 0, 218 generation = 0, 219 start = 0, 220 size = 9, 221 modifier = null // before mapping 222 ) 223 ) 224 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 225 } 226 227 @Test 228 fun cached_afterMapping_withMoreMappingAfterwards() = 229 testScope.runTest { 230 var mappingCnt = 0 231 val pageFlow = 232 buildPageFlow() 233 .map { pagingData -> 234 val mappingIndex = mappingCnt++ 235 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 236 } 237 .cachedIn(backgroundScope, tracker) 238 .map { pagingData -> 239 val mappingIndex = mappingCnt++ 240 pagingData.map { it.copy(metadata = "${it.metadata}_$mappingIndex") } 241 } 242 val firstCollect = pageFlow.collectItemsUntilSize(6) 243 val secondCollect = pageFlow.collectItemsUntilSize(9) 244 assertThat(firstCollect) 245 .isEqualTo( 246 buildItems(version = 0, generation = 0, start = 0, size = 6) { 247 it.copy(metadata = "0_1") 248 } 249 ) 250 251 assertThat(secondCollect) 252 .isEqualTo( 253 buildItems(version = 0, generation = 0, start = 0, size = 9) { 254 it.copy(metadata = "0_2") 255 } 256 ) 257 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 258 } 259 260 @Test 261 fun cachedData_afterMapping_withMoreMappingAfterwards() = 262 testScope.runTest { 263 var mappingCnt = 0 264 val pageFlow = 265 buildPageFlow() 266 .map { pagingData -> 267 val mappingIndex = mappingCnt++ 268 pagingData.map { it.copy(metadata = mappingIndex.toString()) } 269 } 270 .cachedIn(backgroundScope, tracker) 271 val mappedFlow = 272 pageFlow.map { pagingData -> 273 val mappingIndex = mappingCnt++ 274 pagingData.map { it.copy(metadata = "${it.metadata}_$mappingIndex") } 275 } 276 // Mapping converts SharedFlow to Flow and thereby blocks access to cachedIn's 277 // replayCache. You can still access latest cachedData directly from pre-mapped flow. 278 mappedFlow.collectItemsUntilSize(6) 279 val firstCachedData = pageFlow.cachedData() 280 assertThat(firstCachedData) 281 .isEqualTo( 282 buildItems(version = 0, generation = 0, start = 0, size = 6) { 283 it.copy(metadata = "0") // with mapping before cache 284 } 285 ) 286 287 mappedFlow.collectItemsUntilSize(9) 288 val secondCachedEvent = pageFlow.cachedData() 289 assertThat(secondCachedEvent) 290 .isEqualTo( 291 buildItems(version = 0, generation = 0, start = 0, size = 9) { 292 it.copy(metadata = "0") // with mapping before cache 293 } 294 ) 295 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 296 } 297 298 @Test 299 fun pagesAreClosedProperty() = 300 testScope.runTest { 301 val job = SupervisorJob() 302 val subScope = CoroutineScope(job + Dispatchers.Default) 303 val pageFlow = buildPageFlow().cachedIn(subScope, tracker) 304 assertThat(tracker.pageEventFlowCount()).isEqualTo(0) 305 assertThat(tracker.pageDataFlowCount()).isEqualTo(0) 306 val items = pageFlow.collectItemsUntilSize(9) 307 val firstList = buildItems(version = 0, generation = 0, start = 0, size = 9) 308 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 309 val items2 = pageFlow.collectItemsUntilSize(21) 310 assertThat(items2) 311 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 21)) 312 assertThat(tracker.pageEventFlowCount()).isEqualTo(0) 313 assertThat(tracker.pageDataFlowCount()).isEqualTo(1) 314 assertThat(items).isEqualTo(firstList) 315 job.cancelAndJoin() 316 assertThat(tracker.pageEventFlowCount()).isEqualTo(0) 317 assertThat(tracker.pageDataFlowCount()).isEqualTo(0) 318 } 319 320 @Test 321 fun cachedWithPassiveCollector() = 322 testScope.runTest { 323 val flow = buildPageFlow().cachedIn(backgroundScope, tracker) 324 val passive = ItemCollector(flow) 325 passive.collectPassivelyIn(backgroundScope) 326 testScope.runCurrent() 327 // collecting on the paged source will trigger initial page 328 assertThat(passive.items()) 329 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3)) 330 val firstList = buildItems(version = 0, generation = 0, start = 0, size = 9) 331 // another collector is causing more items to be loaded, they should be reflected in the 332 // passive one 333 assertThat(flow.collectItemsUntilSize(9)).isEqualTo(firstList) 334 assertThat(passive.items()).isEqualTo(firstList) 335 val passive2 = ItemCollector(flow) 336 passive2.collectPassivelyIn(backgroundScope) 337 testScope.runCurrent() 338 // a new passive one should receive all existing items immediately 339 assertThat(passive2.items()).isEqualTo(firstList) 340 341 // now we get another collector that'll fetch more pages, it should reflect in passives 342 val secondList = buildItems(version = 0, generation = 0, start = 0, size = 12) 343 // another collector is causing more items to be loaded, they should be reflected in the 344 // passive one 345 assertThat(flow.collectItemsUntilSize(12)).isEqualTo(secondList) 346 assertThat(passive.items()).isEqualTo(secondList) 347 assertThat(passive2.items()).isEqualTo(secondList) 348 } 349 350 /** 351 * Test that, when cache is active but there is no active downstream collectors, intermediate 352 * invalidations create new PagingData BUT a new collector only sees the latest one. 353 */ 354 @Test 355 public fun unusedPagingDataIsNeverCollectedByNewDownstream() = 356 testScope.runTest { 357 val factory = StringPagingSource.VersionedFactory() 358 val flow = buildPageFlow(factory).cachedIn(backgroundScope, tracker) 359 val collector = ItemCollector(flow) 360 val job = SupervisorJob() 361 val subScope = CoroutineScope(coroutineContext + job) 362 collector.collectPassivelyIn(subScope) 363 testScope.runCurrent() 364 assertThat(collector.items()) 365 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3)) 366 // finish that collector 367 job.cancelAndJoin() 368 assertThat(factory.nextVersion).isEqualTo(1) 369 repeat(10) { 370 factory.invalidateLatest() 371 testScope.runCurrent() 372 } 373 runCurrent() 374 // next version is 11, the last paged data we've created has version 10 375 assertThat(factory.nextVersion).isEqualTo(11) 376 377 // create another collector from shared, should only receive 1 paging data and that 378 // should be the latest because previous PagingData is invalidated 379 val collector2 = ItemCollector(flow) 380 collector2.collectPassivelyIn(backgroundScope) 381 testScope.runCurrent() 382 assertThat(collector2.items()) 383 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 3)) 384 assertThat(collector2.receivedPagingDataCount).isEqualTo(1) 385 testScope.runCurrent() 386 assertThat(factory.nextVersion).isEqualTo(11) 387 val activeCollection = flow.collectItemsUntilSize(9) 388 assertThat(activeCollection) 389 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9)) 390 testScope.runCurrent() 391 // make sure passive collector received those items as well 392 assertThat(collector2.items()) 393 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9)) 394 } 395 396 @Test 397 public fun unusedPagingDataIsNeverCached() = 398 testScope.runTest { 399 val factory = StringPagingSource.VersionedFactory() 400 val flow = buildPageFlow(factory).cachedIn(backgroundScope, tracker) 401 val collector = ItemCollector(flow) 402 val job = SupervisorJob() 403 val subScope = CoroutineScope(coroutineContext + job) 404 collector.collectPassivelyIn(subScope) 405 testScope.runCurrent() 406 // check that cachedData contains data from passive collector 407 assertThat(flow.cachedData()) 408 .isEqualTo(buildItems(version = 0, generation = 0, start = 0, size = 3)) 409 // finish that collector 410 job.cancelAndJoin() 411 assertThat(factory.nextVersion).isEqualTo(1) 412 repeat(10) { 413 factory.invalidateLatest() 414 testScope.runCurrent() 415 } 416 runCurrent() 417 // next version is 11, the last paged data we've created has version 10 418 assertThat(factory.nextVersion).isEqualTo(11) 419 420 // the replayCache has paged data version 10 but no collection on this pagingData yet 421 // so it has no cachedEvent. 422 val cachedPagingData = (flow as SharedFlow<PagingData<Item>>).replayCache.first() 423 assertThat(cachedPagingData.cachedEvent()).isNull() 424 425 // create another collector from shared, should only receive 1 paging data and that 426 // should be the latest because previous PagingData is invalidated 427 val collector2 = ItemCollector(flow) 428 collector2.collectPassivelyIn(backgroundScope) 429 testScope.runCurrent() 430 // now this PagingData has cachedEvents from version 10 431 assertThat(flow.cachedData()) 432 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 3)) 433 assertThat(factory.nextVersion).isEqualTo(11) 434 // collect some more and ensure cachedData is still up-to-date 435 flow.collectItemsUntilSize(9) 436 assertThat(flow.cachedData()) 437 .isEqualTo(buildItems(version = 10, generation = 0, start = 0, size = 9)) 438 } 439 440 private fun buildPageFlow( 441 factory: StringPagingSource.VersionedFactory = StringPagingSource.VersionedFactory() 442 ): Flow<PagingData<Item>> { 443 return Pager( 444 pagingSourceFactory = factory::create, 445 config = 446 PagingConfig( 447 pageSize = 3, 448 prefetchDistance = 1, 449 enablePlaceholders = false, 450 initialLoadSize = 3, 451 maxSize = 1000 452 ) 453 ) 454 .flow 455 } 456 457 /** 458 * Used for assertions internally to ensure we don't get some data with wrong generation during 459 * collection. This shouldn't happen but happened during development so it is best to add 460 * assertions for it. 461 */ 462 private val PagingData<Item>.version 463 get(): Int { 464 return ((hintReceiver as PageFetcher<*, *>.PagerHintReceiver<*, *>) 465 .pageFetcherSnapshot 466 .pagingSource as StringPagingSource) 467 .version 468 } 469 470 private suspend fun Flow<PagingData<Item>>.collectItemsUntilSize( 471 expectedSize: Int, 472 ): List<Item> { 473 return this.mapLatest { pagingData -> 474 val expectedVersion = pagingData.version 475 val items = mutableListOf<Item>() 476 yield() // this yield helps w/ cancellation wrt mapLatest 477 val receiver = pagingData.hintReceiver 478 var loadedPageCount = 0 479 pagingData.flow 480 .filterIsInstance<PageEvent.Insert<Item>>() 481 .onEach { 482 items.addAll( 483 it.pages.flatMap { 484 assertThat(it.data.map { it.pagingSourceId }.toSet()) 485 .containsExactly(expectedVersion) 486 it.data 487 } 488 ) 489 loadedPageCount += it.pages.size 490 if (items.size < expectedSize) { 491 receiver.accessHint( 492 ViewportHint.Access( 493 pageOffset = loadedPageCount - 1, 494 indexInPage = it.pages.last().data.size - 1, 495 presentedItemsBefore = it.pages.sumOf { it.data.size } - 1, 496 presentedItemsAfter = 0, 497 originalPageOffsetFirst = 498 it.pages.first().originalPageOffsets.minOrNull()!!, 499 originalPageOffsetLast = 500 it.pages.last().originalPageOffsets.maxOrNull()!! 501 ) 502 ) 503 } else { 504 throw AbortCollectionException() 505 } 506 } 507 .catch { ex -> 508 if (ex !is AbortCollectionException) { 509 throw ex 510 } 511 } 512 .toList() 513 items 514 } 515 .first() 516 } 517 518 private fun Flow<PagingData<Item>>.cachedData(): List<Item> { 519 assertThat(this).isInstanceOf<SharedFlow<PagingData<Item>>>() 520 val flow = this as SharedFlow<PagingData<Item>> 521 assertThat(flow.replayCache).isNotEmpty() 522 523 val pagingData = flow.replayCache.firstOrNull() 524 assertThat(pagingData).isNotNull() 525 526 val event = pagingData!!.cachedEvent() 527 assertThat(event).isInstanceOf<PageEvent.Insert<Item>>() 528 529 return (event as PageEvent.Insert<Item>).pages.flatMap { it.data } 530 } 531 532 /** Paged list collector that does not call any hints but always collects */ 533 private class ItemCollector(val source: Flow<PagingData<Item>>) { 534 private var items: List<Item> = emptyList() 535 private var job: Job? = null 536 var receivedPagingDataCount = 0 537 private set 538 539 /** 540 * Collect w/o calling any UI hints so it more like observing the stream w/o affecting it. 541 */ 542 fun collectPassivelyIn(scope: CoroutineScope) { 543 check(job == null) { "don't call collect twice" } 544 job = scope.launch { collectPassively() } 545 } 546 547 private suspend fun collectPassively() { 548 source.collect { 549 receivedPagingDataCount++ 550 // clear to latest 551 val list = mutableListOf<Item>() 552 items = list 553 it.flow.filterIsInstance<PageEvent.Insert<Item>>().collect { 554 it.pages.forEach { list.addAll(it.data) } 555 } 556 } 557 } 558 559 fun items() = items.toList() 560 } 561 562 private class StringPagingSource(val version: Int) : PagingSource<Int, Item>() { 563 private var generation = -1 564 565 override val keyReuseSupported: Boolean 566 get() = true 567 568 override suspend fun load(params: LoadParams<Int>): LoadResult<Int, Item> { 569 when (params) { 570 is LoadParams.Refresh -> { 571 generation++ 572 return doLoad(position = params.key ?: 0, size = params.loadSize) 573 } 574 is LoadParams.Prepend -> { 575 val loadSize = minOf(params.key, params.loadSize) 576 return doLoad(position = params.key - params.loadSize, size = loadSize) 577 } 578 is LoadParams.Append -> { 579 return doLoad(position = params.key, size = params.loadSize) 580 } 581 } 582 } 583 584 override fun getRefreshKey(state: PagingState<Int, Item>): Int? = null 585 586 private fun doLoad(position: Int, size: Int): LoadResult<Int, Item> { 587 return LoadResult.Page( 588 data = 589 buildItems( 590 version = version, 591 generation = generation, 592 start = position, 593 size = size 594 ), 595 prevKey = if (position == 0) null else position, 596 nextKey = position + size 597 ) 598 } 599 600 class VersionedFactory { 601 var nextVersion = 0 602 private set 603 604 private var latestSource: StringPagingSource? = null 605 606 fun create() = StringPagingSource(nextVersion++).also { latestSource = it } 607 608 fun invalidateLatest() = latestSource?.invalidate() 609 } 610 } 611 612 companion object { 613 private fun buildItems( 614 version: Int, 615 generation: Int, 616 start: Int, 617 size: Int, 618 modifier: ((Item) -> Item)? = null 619 ): List<Item> { 620 return (start until start + size).map { id -> 621 Item(pagingSourceId = version, generation = generation, value = id).let { 622 modifier?.invoke(it) ?: it 623 } 624 } 625 } 626 } 627 628 private data class Item( 629 /** which paged source generated this item */ 630 val pagingSourceId: Int, 631 /** # of refresh counts in the paged source */ 632 val generation: Int, 633 /** Item unique identifier */ 634 val value: Int, 635 636 /** Any additional data by transformations etc */ 637 val metadata: String? = null 638 ) 639 640 private class ActiveFlowTrackerImpl : ActiveFlowTracker { 641 private val counters = 642 mapOf(PAGED_DATA_FLOW to AtomicInt(0), PAGE_EVENT_FLOW to AtomicInt(0)) 643 644 override fun onNewCachedEventFlow(cachedPageEventFlow: CachedPageEventFlow<*>) {} 645 646 override suspend fun onStart(flowType: FlowType) { 647 (counters[flowType] ?: error("invalid type $flowType")).incrementAndGet() 648 } 649 650 override suspend fun onComplete(flowType: FlowType) { 651 (counters[flowType] ?: error("invalid type $flowType")).decrementAndGet() 652 } 653 654 fun pageDataFlowCount() = (counters[PAGED_DATA_FLOW] ?: error("unexpected")).get() 655 656 fun pageEventFlowCount() = (counters[PAGE_EVENT_FLOW] ?: error("unexpected")).get() 657 } 658 659 private class AbortCollectionException : Throwable() 660 } 661