1 /* <lambda>null2 * Copyright 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package androidx.paging 18 19 import androidx.kruth.assertThat 20 import androidx.paging.CombineSource.INITIAL 21 import androidx.paging.CombineSource.OTHER 22 import androidx.paging.CombineSource.RECEIVER 23 import kotlin.random.Random 24 import kotlin.test.Ignore 25 import kotlin.test.Test 26 import kotlinx.coroutines.ExperimentalCoroutinesApi 27 import kotlinx.coroutines.channels.Channel 28 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED 29 import kotlinx.coroutines.delay 30 import kotlinx.coroutines.flow.combine 31 import kotlinx.coroutines.flow.consumeAsFlow 32 import kotlinx.coroutines.flow.emptyFlow 33 import kotlinx.coroutines.flow.first 34 import kotlinx.coroutines.flow.flow 35 import kotlinx.coroutines.flow.flowOf 36 import kotlinx.coroutines.flow.onEach 37 import kotlinx.coroutines.flow.toList 38 import kotlinx.coroutines.launch 39 import kotlinx.coroutines.test.TestScope 40 import kotlinx.coroutines.test.advanceUntilIdle 41 import kotlinx.coroutines.test.runTest 42 import kotlinx.coroutines.yield 43 44 @OptIn(ExperimentalCoroutinesApi::class) 45 class FlowExtTest { 46 val testScope = TestScope() 47 48 @Test 49 fun scan_basic() = 50 testScope.runTest { 51 val arguments = mutableListOf<Pair<Int, Int>>() 52 assertThat( 53 flowOf(1, 2, 3) 54 .simpleScan(0) { acc, value -> 55 arguments.add(acc to value) 56 value + acc 57 } 58 .toList() 59 ) 60 .containsExactly(0, 1, 3, 6) 61 .inOrder() 62 assertThat(arguments).containsExactly(0 to 1, 1 to 2, 3 to 3).inOrder() 63 } 64 65 @Test 66 fun scan_initialValue() = 67 testScope.runTest { 68 assertThat(emptyFlow<Int>().simpleScan("x") { _, value -> "$value" }.toList()) 69 .containsExactly("x") 70 } 71 72 @Test 73 fun runningReduce_basic() = 74 testScope.runTest { 75 assertThat( 76 flowOf(1, 2, 3, 4).simpleRunningReduce { acc, value -> acc + value }.toList() 77 ) 78 .containsExactly(1, 3, 6, 10) 79 } 80 81 @Test 82 fun runningReduce_empty() = 83 testScope.runTest { 84 assertThat(emptyFlow<Int>().simpleRunningReduce { acc, value -> acc + value }.toList()) 85 .isEmpty() 86 } 87 88 @Test 89 fun mapLatest() = 90 testScope.runTest { 91 assertThat( 92 flowOf(1, 2, 3, 4) 93 .onEach { delay(1) } 94 .simpleMapLatest { value -> 95 delay(value.toLong()) 96 "$value-$value" 97 } 98 .toList() 99 ) 100 .containsExactly("1-1", "4-4") 101 .inOrder() 102 } 103 104 @Test 105 fun mapLatest_empty() = 106 testScope.runTest { 107 assertThat(emptyFlow<Int>().simpleMapLatest { value -> "$value-$value" }.toList()) 108 .isEmpty() 109 } 110 111 @Test 112 fun flatMapLatest() = 113 testScope.runTest { 114 assertThat( 115 flowOf(1, 2, 3, 4) 116 .onEach { delay(1) } 117 .simpleFlatMapLatest { value -> flow { repeat(value) { emit(value) } } } 118 .toList() 119 ) 120 .containsExactly(1, 2, 2, 3, 3, 3, 4, 4, 4, 4) 121 .inOrder() 122 } 123 124 @Test 125 fun flatMapLatest_empty() = 126 testScope.runTest { 127 assertThat(emptyFlow<Int>().simpleFlatMapLatest { flowOf(it) }.toList()).isEmpty() 128 } 129 130 @Test 131 fun combineWithoutBatching_buffersEmissions() = 132 testScope.runTest { 133 val flow1 = Channel<Int>(BUFFERED) 134 val flow2 = Channel<String>(BUFFERED) 135 136 val result = mutableListOf<String>() 137 launch { 138 flow1 139 .consumeAsFlow() 140 .combineWithoutBatching(flow2.consumeAsFlow()) { first, second, _ -> 141 "$first$second" 142 } 143 .collect(result::add) 144 } 145 146 flow1.send(1) 147 advanceUntilIdle() 148 assertThat(result).isEmpty() 149 150 flow1.send(2) 151 advanceUntilIdle() 152 assertThat(result).isEmpty() 153 154 flow2.send("A") 155 advanceUntilIdle() 156 assertThat(result).containsExactly("1A", "2A") 157 158 // This should automatically propagate cancellation to the launched collector. 159 flow1.close() 160 flow2.close() 161 } 162 163 @Test 164 fun combineWithoutBatching_doesNotBatchOnSlowTransform() = 165 testScope.runTest { 166 val flow1 = flowOf(1, 2, 3) 167 val flow2 = flowOf("A", "B", "C") 168 val slowTransform: suspend (Int, String) -> String = { num: Int, letter: String -> 169 delay(10) 170 "$num$letter" 171 } 172 173 val batchedCombine = flow1.combine(flow2, slowTransform).toList() 174 advanceUntilIdle() 175 assertThat(batchedCombine).containsExactly("1A", "3B", "3C") 176 177 val unbatchedCombine = 178 flow1 179 .combineWithoutBatching(flow2) { num, letter, _ -> slowTransform(num, letter) } 180 .toList() 181 advanceUntilIdle() 182 assertThat(unbatchedCombine).containsExactly("1A", "2A", "2B", "3B", "3C") 183 } 184 185 @Test 186 fun combineWithoutBatching_updateFrom() = 187 testScope.runTest { 188 val flow1 = Channel<Int>(BUFFERED) 189 val flow2 = Channel<Int>(BUFFERED) 190 191 val result = mutableListOf<CombineSource>() 192 launch { 193 flow1 194 .consumeAsFlow() 195 .combineWithoutBatching(flow2.consumeAsFlow()) { _, _, updateFrom -> 196 result.add(updateFrom) 197 } 198 .collect {} 199 } 200 201 flow1.send(1) 202 advanceUntilIdle() 203 assertThat(result).isEmpty() 204 205 flow1.send(1) 206 advanceUntilIdle() 207 assertThat(result).isEmpty() 208 209 flow2.send(2) 210 advanceUntilIdle() 211 assertThat(result).containsExactly(INITIAL, RECEIVER) 212 213 flow1.send(1) 214 flow2.send(2) 215 advanceUntilIdle() 216 assertThat(result).containsExactly(INITIAL, RECEIVER, RECEIVER, OTHER) 217 218 // This should automatically propagate cancellation to the launched collector. 219 flow1.close() 220 flow2.close() 221 } 222 223 @Test 224 fun combineWithoutBatching_collectorCancellationPropagates() = 225 testScope.runTest { 226 val flow1Emissions = mutableListOf<Int>() 227 val flow1 = flowOf(1, 2, 3).onEach(flow1Emissions::add) 228 val flow2Emissions = mutableListOf<String>() 229 val flow2 = flowOf("A", "B", "C").onEach(flow2Emissions::add) 230 val result = mutableListOf<Unit>() 231 232 flow1.combineWithoutBatching(flow2) { _, _, _ -> result.add(Unit) }.first() 233 234 advanceUntilIdle() 235 236 // We can't guarantee whether cancellation will propagate before or after the second 237 // item 238 // is emitted, but we should never get the third. 239 assertThat(flow1Emissions.size).isIn(1..2) 240 assertThat(flow2Emissions.size).isIn(1..2) 241 assertThat(result.size).isIn(1..2) 242 } 243 244 @Ignore // b/329157121 245 @Test 246 fun combineWithoutBatching_stressTest() = 247 testScope.runTest { 248 val flow1 = flow { 249 repeat(1000) { 250 if (Random.nextBoolean()) { 251 delay(1) 252 } 253 emit(it) 254 } 255 } 256 val flow2 = flow { 257 repeat(1000) { 258 if (Random.nextBoolean()) { 259 delay(1) 260 } 261 emit(it) 262 } 263 } 264 265 repeat(10) { 266 val result = 267 flow1 268 .combineWithoutBatching(flow2) { first, second, _ -> first to second } 269 .toList() 270 271 // Never emit the same values twice. 272 assertThat(result).containsNoDuplicates() 273 274 // Assert order of emissions 275 result.scan(0 to 0) { acc, next -> 276 assertThat(next.first).isAtLeast(acc.first) 277 assertThat(next.second).isAtLeast(acc.second) 278 next 279 } 280 281 // Check we don't miss any emissions 282 assertThat(result).hasSize(1999) 283 } 284 } 285 286 class UnbatchedFlowCombinerTest { 287 private data class SendResult<T1, T2>( 288 val receiverValue: T1, 289 val otherValue: T2, 290 val updateFrom: CombineSource, 291 ) 292 293 @Test 294 fun onNext_receiverBuffers() = runTest { 295 val result = mutableListOf<SendResult<Int, Int>>() 296 val combiner = 297 UnbatchedFlowCombiner<Int, Int> { a, b, c -> result.add(SendResult(a, b, c)) } 298 299 combiner.onNext(index = 0, value = 0) 300 val job = launch { 301 repeat(9) { receiverValue -> combiner.onNext(index = 0, value = receiverValue + 1) } 302 } 303 304 // Ensure subsequent calls to onNext from receiver suspends forever until onNext 305 // is called for the other Flow. 306 advanceUntilIdle() 307 assertThat(job.isCompleted).isFalse() 308 // No events should be received until we receive an event from the other Flow. 309 assertThat(result).isEmpty() 310 311 combiner.onNext(index = 1, value = 0) 312 313 advanceUntilIdle() 314 assertThat(job.isCompleted).isTrue() 315 assertThat(result) 316 .containsExactly( 317 SendResult(0, 0, INITIAL), 318 SendResult(1, 0, RECEIVER), 319 SendResult(2, 0, RECEIVER), 320 SendResult(3, 0, RECEIVER), 321 SendResult(4, 0, RECEIVER), 322 SendResult(5, 0, RECEIVER), 323 SendResult(6, 0, RECEIVER), 324 SendResult(7, 0, RECEIVER), 325 SendResult(8, 0, RECEIVER), 326 SendResult(9, 0, RECEIVER), 327 ) 328 } 329 330 @Test 331 fun onNext_otherBuffers() = runTest { 332 val result = mutableListOf<SendResult<Int, Int>>() 333 val combiner = 334 UnbatchedFlowCombiner<Int, Int> { a, b, c -> result.add(SendResult(a, b, c)) } 335 336 combiner.onNext(index = 1, value = 0) 337 val job = launch { 338 repeat(9) { receiverValue -> combiner.onNext(index = 1, value = receiverValue + 1) } 339 } 340 341 // Ensure subsequent calls to onNext from receiver suspends forever until onNext 342 // is called for the other Flow. 343 advanceUntilIdle() 344 assertThat(job.isCompleted).isFalse() 345 // No events should be received until we receive an event from the other Flow. 346 assertThat(result).isEmpty() 347 348 combiner.onNext(index = 0, value = 0) 349 350 advanceUntilIdle() 351 assertThat(job.isCompleted).isTrue() 352 assertThat(result) 353 .containsExactly( 354 SendResult(0, 0, INITIAL), 355 SendResult(0, 1, OTHER), 356 SendResult(0, 2, OTHER), 357 SendResult(0, 3, OTHER), 358 SendResult(0, 4, OTHER), 359 SendResult(0, 5, OTHER), 360 SendResult(0, 6, OTHER), 361 SendResult(0, 7, OTHER), 362 SendResult(0, 8, OTHER), 363 SendResult(0, 9, OTHER), 364 ) 365 } 366 367 @Test 368 fun onNext_initialDispatchesFirst() = runTest { 369 val result = mutableListOf<SendResult<Int, Int>>() 370 val combiner = 371 UnbatchedFlowCombiner<Int, Int> { a, b, c -> 372 // Give a chance for other calls to onNext to run. 373 yield() 374 result.add(SendResult(a, b, c)) 375 } 376 377 launch { repeat(1000) { value -> combiner.onNext(index = 0, value = value) } } 378 379 repeat(1) { value -> launch { combiner.onNext(index = 1, value = value) } } 380 381 advanceUntilIdle() 382 assertThat(result.first()) 383 .isEqualTo( 384 SendResult(0, 0, INITIAL), 385 ) 386 } 387 } 388 } 389