• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.jvm.*
10 import kotlin.reflect.*
11 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
12 import kotlinx.coroutines.flow.unsafeTransform as transform
13 
14 /**
15  * Returns a flow containing only values of the original flow that match the given [predicate].
16  */
17 public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
18     if (predicate(value)) return@transform emit(value)
19 }
20 
21 /**
22  * Returns a flow containing only values of the original flow that do not match the given [predicate].
23  */
filterNotnull24 public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
25     if (!predicate(value)) return@transform emit(value)
26 }
27 
28 /**
29  * Returns a flow containing only values that are instances of specified type [R].
30  */
31 @Suppress("UNCHECKED_CAST")
<lambda>null32 public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
33 
34 /**
35  * Returns a flow containing only values that are instances of the given [klass].
36  */
<lambda>null37 public fun <R : Any> Flow<*>.filterIsInstance(klass: KClass<R>): Flow<R> = filter { klass.isInstance(it) } as Flow<R>
38 
39 /**
40  * Returns a flow containing only values of the original flow that are not null.
41  */
filterNotNullnull42 public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
43     if (value != null) return@transform emit(value)
44 }
45 
46 /**
47  * Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
48  */
mapnull49 public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
50     return@transform emit(transform(value))
51 }
52 
53 /**
54  * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
55  */
mapNotNullnull56 public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
57     val transformed = transform(value) ?: return@transform
58     return@transform emit(transformed)
59 }
60 
61 /**
62  * Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
63  */
<lambda>null64 public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
65     var index = 0
66     collect { value ->
67         emit(IndexedValue(checkIndexOverflow(index++), value))
68     }
69 }
70 
71 /**
72  * Returns a flow that invokes the given [action] **before** each value of the upstream flow is emitted downstream.
73  */
onEachnull74 public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
75     action(value)
76     return@transform emit(value)
77 }
78 
79 /**
80  * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
81  * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
82  * For example:
83  * ```
84  * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
85  * ```
86  * will produce `[[], [1], [1, 2], [1, 2, 3]]`.
87  *
88  * This function is an alias to [runningFold] operator.
89  */
scannull90 public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation)
91 
92 /**
93  * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
94  * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
95  * For example:
96  * ```
97  * flowOf(1, 2, 3).runningFold(emptyList<Int>()) { acc, value -> acc + value }.toList()
98  * ```
99  * will produce `[[], [1], [1, 2], [1, 2, 3]]`.
100  */
101 public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
102     var accumulator: R = initial
103     emit(accumulator)
104     collect { value ->
105         accumulator = operation(accumulator, value)
106         emit(accumulator)
107     }
108 }
109 
110 /**
111  * Reduces the given flow with [operation], emitting every intermediate result, including initial value.
112  * The first element is taken as initial value for operation accumulator.
113  * This operator has a sibling with initial value -- [scan].
114  *
115  * For example:
116  * ```
117  * flowOf(1, 2, 3, 4).runningReduce { acc, value -> acc + value }.toList()
118  * ```
119  * will produce `[1, 3, 6, 10]`
120  */
<lambda>null121 public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
122     var accumulator: Any? = NULL
123     collect { value ->
124         accumulator = if (accumulator === NULL) {
125             value
126         } else {
127             operation(accumulator as T, value)
128         }
129         emit(accumulator as T)
130     }
131 }
132 
133 /**
134  * Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
135  * The last emitted list may have fewer elements than the given size.
136  *
137  * Example of usage:
138  * ```
139  * flowOf("a", "b", "c", "d", "e")
140  *     .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
141  *     .map { it.joinToString(separator = "") }
142  *     .collect {
143  *         println(it) // Prints "ab", "cd", e"
144  *     }
145  * ```
146  *
147  * @throws IllegalArgumentException if [size] is not positive.
148  */
149 @ExperimentalCoroutinesApi
chunkednull150 public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {
151     require(size >= 1) { "Expected positive chunk size, but got $size" }
152     return flow {
153         var result: ArrayList<T>? = null // Do not preallocate anything
154         collect { value ->
155             // Allocate if needed
156             val acc = result ?: ArrayList<T>(size).also { result = it }
157             acc.add(value)
158             if (acc.size == size) {
159                 emit(acc)
160                 // Cleanup, but don't allocate -- it might've been the case this is the last element
161                 result = null
162             }
163         }
164         result?.let { emit(it) }
165     }
166 }
167