1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST")
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.jvm.*
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15 import kotlinx.coroutines.flow.unsafeTransform as transform
16
17 /**
18 * Returns a flow containing only values of the original flow that matches the given [predicate].
19 */
20 public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
21 if (predicate(value)) return@transform emit(value)
22 }
23
24 /**
25 * Returns a flow containing only values of the original flow that do not match the given [predicate].
26 */
filterNotnull27 public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
28 if (!predicate(value)) return@transform emit(value)
29 }
30
31 /**
32 * Returns a flow containing only values that are instances of specified type [R].
33 */
34 @Suppress("UNCHECKED_CAST")
<lambda>null35 public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
36
37 /**
38 * Returns a flow containing only values of the original flow that are not null.
39 */
filterNotNullnull40 public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
41 if (value != null) return@transform emit(value)
42 }
43
44 /**
45 * Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
46 */
mapnull47 public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
48 return@transform emit(transform(value))
49 }
50
51 /**
52 * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
53 */
mapNotNullnull54 public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
55 val transformed = transform(value) ?: return@transform
56 return@transform emit(transformed)
57 }
58
59 /**
60 * Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
61 */
<lambda>null62 public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
63 var index = 0
64 collect { value ->
65 emit(IndexedValue(checkIndexOverflow(index++), value))
66 }
67 }
68
69 /**
70 * Returns a flow that invokes the given [action] **before** each value of the upstream flow is emitted downstream.
71 */
onEachnull72 public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
73 action(value)
74 return@transform emit(value)
75 }
76
77 /**
78 * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
79 * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
80 * For example:
81 * ```
82 * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
83 * ```
84 * will produce `[], [1], [1, 2], [1, 2, 3]]`.
85 */
86 @ExperimentalCoroutinesApi
<lambda>null87 public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
88 var accumulator: R = initial
89 emit(accumulator)
90 collect { value ->
91 accumulator = operation(accumulator, value)
92 emit(accumulator)
93 }
94 }
95
96 /**
97 * Reduces the given flow with [operation], emitting every intermediate result, including initial value.
98 * The first element is taken as initial value for operation accumulator.
99 * This operator has a sibling with initial value -- [scan].
100 *
101 * For example:
102 * ```
103 * flowOf(1, 2, 3, 4).runningReduce { (v1, v2) -> v1 + v2 }.toList()
104 * ```
105 * will produce `[1, 3, 6, 10]`
106 */
107 @ExperimentalCoroutinesApi
<lambda>null108 public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
109 var accumulator: Any? = NULL
110 collect { value ->
111 accumulator = if (accumulator === NULL) {
112 value
113 } else {
114 operation(accumulator as T, value)
115 }
116 emit(accumulator as T)
117 }
118 }
119