• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST")
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.jvm.*
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15 import kotlinx.coroutines.flow.unsafeTransform as transform
16 
17 /**
18  * Returns a flow containing only values of the original flow that matches the given [predicate].
19  */
20 public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
21     if (predicate(value)) return@transform emit(value)
22 }
23 
24 /**
25  * Returns a flow containing only values of the original flow that do not match the given [predicate].
26  */
filterNotnull27 public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
28     if (!predicate(value)) return@transform emit(value)
29 }
30 
31 /**
32  * Returns a flow containing only values that are instances of specified type [R].
33  */
34 @Suppress("UNCHECKED_CAST")
<lambda>null35 public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
36 
37 /**
38  * Returns a flow containing only values of the original flow that are not null.
39  */
filterNotNullnull40 public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T> = transform<T?, T> { value ->
41     if (value != null) return@transform emit(value)
42 }
43 
44 /**
45  * Returns a flow containing the results of applying the given [transform] function to each value of the original flow.
46  */
mapnull47 public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform { value ->
48    return@transform emit(transform(value))
49 }
50 
51 /**
52  * Returns a flow that contains only non-null results of applying the given [transform] function to each value of the original flow.
53  */
mapNotNullnull54 public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R> = transform { value ->
55     val transformed = transform(value) ?: return@transform
56     return@transform emit(transformed)
57 }
58 
59 /**
60  * Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
61  */
<lambda>null62 public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
63     var index = 0
64     collect { value ->
65         emit(IndexedValue(checkIndexOverflow(index++), value))
66     }
67 }
68 
69 /**
70  * Returns a flow that invokes the given [action] **before** each value of the upstream flow is emitted downstream.
71  */
onEachnull72 public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
73     action(value)
74     return@transform emit(value)
75 }
76 
77 /**
78  * Folds the given flow with [operation], emitting every intermediate result, including [initial] value.
79  * Note that initial value should be immutable (or should not be mutated) as it is shared between different collectors.
80  * For example:
81  * ```
82  * flowOf(1, 2, 3).scan(emptyList<Int>()) { acc, value -> acc + value }.toList()
83  * ```
84  * will produce `[], [1], [1, 2], [1, 2, 3]]`.
85  */
86 @ExperimentalCoroutinesApi
<lambda>null87 public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R> = flow {
88     var accumulator: R = initial
89     emit(accumulator)
90     collect { value ->
91         accumulator = operation(accumulator, value)
92         emit(accumulator)
93     }
94 }
95 
96 /**
97  * Reduces the given flow with [operation], emitting every intermediate result, including initial value.
98  * The first element is taken as initial value for operation accumulator.
99  * This operator has a sibling with initial value -- [scan].
100  *
101  * For example:
102  * ```
103  * flowOf(1, 2, 3, 4).runningReduce { (v1, v2) -> v1 + v2 }.toList()
104  * ```
105  * will produce `[1, 3, 6, 10]`
106  */
107 @ExperimentalCoroutinesApi
<lambda>null108 public fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T> = flow {
109     var accumulator: Any? = NULL
110     collect { value ->
111         accumulator = if (accumulator === NULL) {
112             value
113         } else {
114             operation(accumulator as T, value)
115         }
116         emit(accumulator as T)
117     }
118 }
119