<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4
5 package kotlinx.coroutines.flow
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.jvm.*
10 import kotlin.reflect.*
11 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
12 import kotlinx.coroutines.flow.unsafeTransform as transform
13
14 /**
15 * Returns a flow containing only values of the original flow that match the given [predicate].
16 */
17 public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
18 if (predicate(value)) return@transform emit(value)
19 }
20
21 /**
22 * Returns a flow containing only values of the original flow that do not match the given [predicate].
23 */
filterNotnull24 public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
25 if (!predicate(value)) return@transform emit(value)
26 }
27
28 /**
29 * Returns a flow containing only values that are instances of specified type [R].
30 */
31 @Suppress("UNCHECKED_CAST")
<lambda>null32 public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
33
34 /**
35 * Returns a flow containing only values that are instances of the given [klass].
36 */
<lambda>null37 public fun <R : Any> Flow<*>.filterIsInstance(klass: KClass<R>): Flow<R> = filter { klass.isInstance(it) } as Flow<R>
38
39 /**
40 * Returns a flow containing only values of the original flow that are not null.
41 */
filterNotNullnull42 public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
43 if (value != null) return@transform emit(value)
44 }
45
46 /**
47 * Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
48 */
mapnull49 public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
50 return@transform emit(transform(value))
51 }
52
53 /**
54 * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
55 */
mapNotNullnull56 public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
57 val transformed = transform(value) ?: return@transform
58 return@transform emit(transformed)
59 }
60
61 /**
62 * Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
63 */
<lambda>null64 public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
65 var index = 0
66 collect { value ->
67 emit(IndexedValue(checkIndexOverflow(index++), value))
68 }
69 }
70
71 /**
72 * Returns a flow that invokes the given [action] **before** each value of the upstream flow is emitted downstream.
73 */
onEachnull74 public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
75 action(value)
76 return@transform emit(value)
77 }
78
79 /**
80 * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
81 * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
82 * For example:
83 * ```
84 * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
85 * ```
86 * will produce `[[], [1], [1, 2], [1, 2, 3]]`.
87 *
88 * This function is an alias to [runningFold] operator.
89 */
scannull90 public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = runningFold(initial, operation)
91
92 /**
93 * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
94 * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
95 * For example:
96 * ```
97 * flowOf(1, 2, 3).runningFold(emptyList<Int>()) { acc, value -> acc + value }.toList()
98 * ```
99 * will produce `[[], [1], [1, 2], [1, 2, 3]]`.
100 */
101 public fun <T, R> Flow<T>.runningFold(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
102 var accumulator: R = initial
103 emit(accumulator)
104 collect { value ->
105 accumulator = operation(accumulator, value)
106 emit(accumulator)
107 }
108 }
109
110 /**
111 * Reduces the given flow with [operation], emitting every intermediate result, including initial value.
112 * The first element is taken as initial value for operation accumulator.
113 * This operator has a sibling with initial value -- [scan].
114 *
115 * For example:
116 * ```
117 * flowOf(1, 2, 3, 4).runningReduce { acc, value -> acc + value }.toList()
118 * ```
119 * will produce `[1, 3, 6, 10]`
120 */
<lambda>null121 public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
122 var accumulator: Any? = NULL
123 collect { value ->
124 accumulator = if (accumulator === NULL) {
125 value
126 } else {
127 operation(accumulator as T, value)
128 }
129 emit(accumulator as T)
130 }
131 }
132
133 /**
134 * Splits the given flow into a flow of non-overlapping lists each not exceeding the given [size] but never empty.
135 * The last emitted list may have fewer elements than the given size.
136 *
137 * Example of usage:
138 * ```
139 * flowOf("a", "b", "c", "d", "e")
140 * .chunked(2) // ["a", "b"], ["c", "d"], ["e"]
141 * .map { it.joinToString(separator = "") }
142 * .collect {
143 * println(it) // Prints "ab", "cd", e"
144 * }
145 * ```
146 *
147 * @throws IllegalArgumentException if [size] is not positive.
148 */
149 @ExperimentalCoroutinesApi
chunkednull150 public fun <T> Flow<T>.chunked(size: Int): Flow<List<T>> {
151 require(size >= 1) { "Expected positive chunk size, but got $size" }
152 return flow {
153 var result: ArrayList<T>? = null // Do not preallocate anything
154 collect { value ->
155 // Allocate if needed
156 val acc = result ?: ArrayList<T>(size).also { result = it }
157 acc.add(value)
158 if (acc.size == size) {
159 emit(acc)
160 // Cleanup, but don't allocate -- it might've been the case this is the last element
161 result = null
162 }
163 }
164 result?.let { emit(it) }
165 }
166 }
167