1 /*
<lambda>null2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package androidx.paging
18 
19 import androidx.kruth.assertThat
20 import androidx.paging.CombineSource.INITIAL
21 import androidx.paging.CombineSource.OTHER
22 import androidx.paging.CombineSource.RECEIVER
23 import kotlin.random.Random
24 import kotlin.test.Ignore
25 import kotlin.test.Test
26 import kotlinx.coroutines.ExperimentalCoroutinesApi
27 import kotlinx.coroutines.channels.Channel
28 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
29 import kotlinx.coroutines.delay
30 import kotlinx.coroutines.flow.combine
31 import kotlinx.coroutines.flow.consumeAsFlow
32 import kotlinx.coroutines.flow.emptyFlow
33 import kotlinx.coroutines.flow.first
34 import kotlinx.coroutines.flow.flow
35 import kotlinx.coroutines.flow.flowOf
36 import kotlinx.coroutines.flow.onEach
37 import kotlinx.coroutines.flow.toList
38 import kotlinx.coroutines.launch
39 import kotlinx.coroutines.test.TestScope
40 import kotlinx.coroutines.test.advanceUntilIdle
41 import kotlinx.coroutines.test.runTest
42 import kotlinx.coroutines.yield
43 
44 @OptIn(ExperimentalCoroutinesApi::class)
45 class FlowExtTest {
46     val testScope = TestScope()
47 
48     @Test
49     fun scan_basic() =
50         testScope.runTest {
51             val arguments = mutableListOf<Pair<Int, Int>>()
52             assertThat(
53                     flowOf(1, 2, 3)
54                         .simpleScan(0) { acc, value ->
55                             arguments.add(acc to value)
56                             value + acc
57                         }
58                         .toList()
59                 )
60                 .containsExactly(0, 1, 3, 6)
61                 .inOrder()
62             assertThat(arguments).containsExactly(0 to 1, 1 to 2, 3 to 3).inOrder()
63         }
64 
65     @Test
66     fun scan_initialValue() =
67         testScope.runTest {
68             assertThat(emptyFlow<Int>().simpleScan("x") { _, value -> "$value" }.toList())
69                 .containsExactly("x")
70         }
71 
72     @Test
73     fun runningReduce_basic() =
74         testScope.runTest {
75             assertThat(
76                     flowOf(1, 2, 3, 4).simpleRunningReduce { acc, value -> acc + value }.toList()
77                 )
78                 .containsExactly(1, 3, 6, 10)
79         }
80 
81     @Test
82     fun runningReduce_empty() =
83         testScope.runTest {
84             assertThat(emptyFlow<Int>().simpleRunningReduce { acc, value -> acc + value }.toList())
85                 .isEmpty()
86         }
87 
88     @Test
89     fun mapLatest() =
90         testScope.runTest {
91             assertThat(
92                     flowOf(1, 2, 3, 4)
93                         .onEach { delay(1) }
94                         .simpleMapLatest { value ->
95                             delay(value.toLong())
96                             "$value-$value"
97                         }
98                         .toList()
99                 )
100                 .containsExactly("1-1", "4-4")
101                 .inOrder()
102         }
103 
104     @Test
105     fun mapLatest_empty() =
106         testScope.runTest {
107             assertThat(emptyFlow<Int>().simpleMapLatest { value -> "$value-$value" }.toList())
108                 .isEmpty()
109         }
110 
111     @Test
112     fun flatMapLatest() =
113         testScope.runTest {
114             assertThat(
115                     flowOf(1, 2, 3, 4)
116                         .onEach { delay(1) }
117                         .simpleFlatMapLatest { value -> flow { repeat(value) { emit(value) } } }
118                         .toList()
119                 )
120                 .containsExactly(1, 2, 2, 3, 3, 3, 4, 4, 4, 4)
121                 .inOrder()
122         }
123 
124     @Test
125     fun flatMapLatest_empty() =
126         testScope.runTest {
127             assertThat(emptyFlow<Int>().simpleFlatMapLatest { flowOf(it) }.toList()).isEmpty()
128         }
129 
130     @Test
131     fun combineWithoutBatching_buffersEmissions() =
132         testScope.runTest {
133             val flow1 = Channel<Int>(BUFFERED)
134             val flow2 = Channel<String>(BUFFERED)
135 
136             val result = mutableListOf<String>()
137             launch {
138                 flow1
139                     .consumeAsFlow()
140                     .combineWithoutBatching(flow2.consumeAsFlow()) { first, second, _ ->
141                         "$first$second"
142                     }
143                     .collect(result::add)
144             }
145 
146             flow1.send(1)
147             advanceUntilIdle()
148             assertThat(result).isEmpty()
149 
150             flow1.send(2)
151             advanceUntilIdle()
152             assertThat(result).isEmpty()
153 
154             flow2.send("A")
155             advanceUntilIdle()
156             assertThat(result).containsExactly("1A", "2A")
157 
158             // This should automatically propagate cancellation to the launched collector.
159             flow1.close()
160             flow2.close()
161         }
162 
163     @Test
164     fun combineWithoutBatching_doesNotBatchOnSlowTransform() =
165         testScope.runTest {
166             val flow1 = flowOf(1, 2, 3)
167             val flow2 = flowOf("A", "B", "C")
168             val slowTransform: suspend (Int, String) -> String = { num: Int, letter: String ->
169                 delay(10)
170                 "$num$letter"
171             }
172 
173             val batchedCombine = flow1.combine(flow2, slowTransform).toList()
174             advanceUntilIdle()
175             assertThat(batchedCombine).containsExactly("1A", "3B", "3C")
176 
177             val unbatchedCombine =
178                 flow1
179                     .combineWithoutBatching(flow2) { num, letter, _ -> slowTransform(num, letter) }
180                     .toList()
181             advanceUntilIdle()
182             assertThat(unbatchedCombine).containsExactly("1A", "2A", "2B", "3B", "3C")
183         }
184 
185     @Test
186     fun combineWithoutBatching_updateFrom() =
187         testScope.runTest {
188             val flow1 = Channel<Int>(BUFFERED)
189             val flow2 = Channel<Int>(BUFFERED)
190 
191             val result = mutableListOf<CombineSource>()
192             launch {
193                 flow1
194                     .consumeAsFlow()
195                     .combineWithoutBatching(flow2.consumeAsFlow()) { _, _, updateFrom ->
196                         result.add(updateFrom)
197                     }
198                     .collect {}
199             }
200 
201             flow1.send(1)
202             advanceUntilIdle()
203             assertThat(result).isEmpty()
204 
205             flow1.send(1)
206             advanceUntilIdle()
207             assertThat(result).isEmpty()
208 
209             flow2.send(2)
210             advanceUntilIdle()
211             assertThat(result).containsExactly(INITIAL, RECEIVER)
212 
213             flow1.send(1)
214             flow2.send(2)
215             advanceUntilIdle()
216             assertThat(result).containsExactly(INITIAL, RECEIVER, RECEIVER, OTHER)
217 
218             // This should automatically propagate cancellation to the launched collector.
219             flow1.close()
220             flow2.close()
221         }
222 
223     @Test
224     fun combineWithoutBatching_collectorCancellationPropagates() =
225         testScope.runTest {
226             val flow1Emissions = mutableListOf<Int>()
227             val flow1 = flowOf(1, 2, 3).onEach(flow1Emissions::add)
228             val flow2Emissions = mutableListOf<String>()
229             val flow2 = flowOf("A", "B", "C").onEach(flow2Emissions::add)
230             val result = mutableListOf<Unit>()
231 
232             flow1.combineWithoutBatching(flow2) { _, _, _ -> result.add(Unit) }.first()
233 
234             advanceUntilIdle()
235 
236             // We can't guarantee whether cancellation will propagate before or after the second
237             // item
238             // is emitted, but we should never get the third.
239             assertThat(flow1Emissions.size).isIn(1..2)
240             assertThat(flow2Emissions.size).isIn(1..2)
241             assertThat(result.size).isIn(1..2)
242         }
243 
244     @Ignore // b/329157121
245     @Test
246     fun combineWithoutBatching_stressTest() =
247         testScope.runTest {
248             val flow1 = flow {
249                 repeat(1000) {
250                     if (Random.nextBoolean()) {
251                         delay(1)
252                     }
253                     emit(it)
254                 }
255             }
256             val flow2 = flow {
257                 repeat(1000) {
258                     if (Random.nextBoolean()) {
259                         delay(1)
260                     }
261                     emit(it)
262                 }
263             }
264 
265             repeat(10) {
266                 val result =
267                     flow1
268                         .combineWithoutBatching(flow2) { first, second, _ -> first to second }
269                         .toList()
270 
271                 // Never emit the same values twice.
272                 assertThat(result).containsNoDuplicates()
273 
274                 // Assert order of emissions
275                 result.scan(0 to 0) { acc, next ->
276                     assertThat(next.first).isAtLeast(acc.first)
277                     assertThat(next.second).isAtLeast(acc.second)
278                     next
279                 }
280 
281                 // Check we don't miss any emissions
282                 assertThat(result).hasSize(1999)
283             }
284         }
285 
286     class UnbatchedFlowCombinerTest {
287         private data class SendResult<T1, T2>(
288             val receiverValue: T1,
289             val otherValue: T2,
290             val updateFrom: CombineSource,
291         )
292 
293         @Test
294         fun onNext_receiverBuffers() = runTest {
295             val result = mutableListOf<SendResult<Int, Int>>()
296             val combiner =
297                 UnbatchedFlowCombiner<Int, Int> { a, b, c -> result.add(SendResult(a, b, c)) }
298 
299             combiner.onNext(index = 0, value = 0)
300             val job = launch {
301                 repeat(9) { receiverValue -> combiner.onNext(index = 0, value = receiverValue + 1) }
302             }
303 
304             // Ensure subsequent calls to onNext from receiver suspends forever until onNext
305             // is called for the other Flow.
306             advanceUntilIdle()
307             assertThat(job.isCompleted).isFalse()
308             // No events should be received until we receive an event from the other Flow.
309             assertThat(result).isEmpty()
310 
311             combiner.onNext(index = 1, value = 0)
312 
313             advanceUntilIdle()
314             assertThat(job.isCompleted).isTrue()
315             assertThat(result)
316                 .containsExactly(
317                     SendResult(0, 0, INITIAL),
318                     SendResult(1, 0, RECEIVER),
319                     SendResult(2, 0, RECEIVER),
320                     SendResult(3, 0, RECEIVER),
321                     SendResult(4, 0, RECEIVER),
322                     SendResult(5, 0, RECEIVER),
323                     SendResult(6, 0, RECEIVER),
324                     SendResult(7, 0, RECEIVER),
325                     SendResult(8, 0, RECEIVER),
326                     SendResult(9, 0, RECEIVER),
327                 )
328         }
329 
330         @Test
331         fun onNext_otherBuffers() = runTest {
332             val result = mutableListOf<SendResult<Int, Int>>()
333             val combiner =
334                 UnbatchedFlowCombiner<Int, Int> { a, b, c -> result.add(SendResult(a, b, c)) }
335 
336             combiner.onNext(index = 1, value = 0)
337             val job = launch {
338                 repeat(9) { receiverValue -> combiner.onNext(index = 1, value = receiverValue + 1) }
339             }
340 
341             // Ensure subsequent calls to onNext from receiver suspends forever until onNext
342             // is called for the other Flow.
343             advanceUntilIdle()
344             assertThat(job.isCompleted).isFalse()
345             // No events should be received until we receive an event from the other Flow.
346             assertThat(result).isEmpty()
347 
348             combiner.onNext(index = 0, value = 0)
349 
350             advanceUntilIdle()
351             assertThat(job.isCompleted).isTrue()
352             assertThat(result)
353                 .containsExactly(
354                     SendResult(0, 0, INITIAL),
355                     SendResult(0, 1, OTHER),
356                     SendResult(0, 2, OTHER),
357                     SendResult(0, 3, OTHER),
358                     SendResult(0, 4, OTHER),
359                     SendResult(0, 5, OTHER),
360                     SendResult(0, 6, OTHER),
361                     SendResult(0, 7, OTHER),
362                     SendResult(0, 8, OTHER),
363                     SendResult(0, 9, OTHER),
364                 )
365         }
366 
367         @Test
368         fun onNext_initialDispatchesFirst() = runTest {
369             val result = mutableListOf<SendResult<Int, Int>>()
370             val combiner =
371                 UnbatchedFlowCombiner<Int, Int> { a, b, c ->
372                     // Give a chance for other calls to onNext to run.
373                     yield()
374                     result.add(SendResult(a, b, c))
375                 }
376 
377             launch { repeat(1000) { value -> combiner.onNext(index = 0, value = value) } }
378 
379             repeat(1) { value -> launch { combiner.onNext(index = 1, value = value) } }
380 
381             advanceUntilIdle()
382             assertThat(result.first())
383                 .isEqualTo(
384                     SendResult(0, 0, INITIAL),
385                 )
386         }
387     }
388 }
389