<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4
5 package kotlinx.coroutines.flow
6
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.jvm.*
10
11 // ------------------ WARNING ------------------
12 // These emitting operators must use safe flow builder, because they allow
13 // user code to directly emit to the underlying FlowCollector.
14
15 /**
16 * Applies [transform] function to each value of the given flow.
17 *
18 * The receiver of the `transform` is [FlowCollector] and thus `transform` is a
19 * flexible function that may transform emitted element, skip it or emit it multiple times.
20 *
21 * This operator generalizes [filter] and [map] operators and
22 * can be used as a building block for other operators, for example:
23 *
24 * ```
25 * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
26 * if (value % 2 == 0) { // Emit only even values, but twice
27 * emit(value)
28 * emit(value)
29 * } // Do nothing if odd
30 * }
31 * ```
32 */
33 public inline fun <T, R> Flow<T>.transform(
34 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
35 ): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
36 collect { value ->
37 // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
38 return@collect transform(value)
39 }
40 }
41
42 // For internal operator implementation
43 @PublishedApi
unsafeTransformnull44 internal inline fun <T, R> Flow<T>.unsafeTransform(
45 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
46 ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
47 collect { value ->
48 // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
49 return@collect transform(value)
50 }
51 }
52
53 /**
54 * Returns a flow that invokes the given [action] **before** this flow starts to be collected.
55 *
56 * The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
57 * there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
58 * after this `onStart` action will be collected
59 * (see [onSubscription] for an alternative operator on shared flows).
60 *
61 * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
62 * For example:
63 *
64 * ```
65 * flowOf("a", "b", "c")
66 * .onStart { emit("Begin") }
67 * .collect { println(it) } // prints Begin, a, b, c
68 * ```
69 */
onStartnull70 public fun <T> Flow<T>.onStart(
71 action: suspend FlowCollector<T>.() -> Unit
72 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
73 val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
74 try {
75 safeCollector.action()
76 } finally {
77 safeCollector.releaseIntercepted()
78 }
79 collect(this) // directly delegate
80 }
81
82 /**
83 * Returns a flow that invokes the given [action] **after** the flow is completed or cancelled, passing
84 * the cancellation exception or failure as cause parameter of [action].
85 *
86 * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block,
87 * for example the following imperative snippet:
88 *
89 * ```
90 * try {
91 * myFlow.collect { value ->
92 * println(value)
93 * }
94 * } finally {
95 * println("Done")
96 * }
97 * ```
98 *
99 * can be replaced with a declarative one using `onCompletion`:
100 *
101 * ```
102 * myFlow
103 * .onEach { println(it) }
104 * .onCompletion { println("Done") }
105 * .collect()
106 * ```
107 *
108 * Unlike [catch], this operator reports exception that occur both upstream and downstream
109 * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if
110 * the flow had fully completed successfully. Conceptually, the following code:
111 *
112 * ```
113 * myFlow.collect { value ->
114 * println(value)
115 * }
116 * println("Completed successfully")
117 * ```
118 *
119 * can be replaced with:
120 *
121 * ```
122 * myFlow
123 * .onEach { println(it) }
124 * .onCompletion { if (it == null) println("Completed successfully") }
125 * .collect()
126 * ```
127 *
128 * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional
129 * elements at the end **if it completed successfully**. For example:
130 *
131 * ```
132 * flowOf("a", "b", "c")
133 * .onCompletion { emit("Done") }
134 * .collect { println(it) } // prints a, b, c, Done
135 * ```
136 *
137 * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
138 * Use [catch] if you need to suppress failure and replace it with emission of elements.
139 */
onCompletionnull140 public fun <T> Flow<T>.onCompletion(
141 action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
142 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
143 try {
144 collect(this)
145 } catch (e: Throwable) {
146 /*
147 * Use throwing collector to prevent any emissions from the
148 * completion sequence when downstream has failed, otherwise it may
149 * lead to a non-sequential behaviour impossible with `finally`
150 */
151 ThrowingCollector(e).invokeSafely(action, e)
152 throw e
153 }
154 // Normal completion
155 val sc = SafeCollector(this, currentCoroutineContext())
156 try {
157 sc.action(null)
158 } finally {
159 sc.releaseIntercepted()
160 }
161 }
162
163 /**
164 * Invokes the given [action] when this flow completes without emitting any elements.
165 * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
166 * For example:
167 *
168 * ```
169 * emptyFlow<Int>().onEmpty {
170 * emit(1)
171 * emit(2)
172 * }.collect { println(it) } // prints 1, 2
173 * ```
174 */
onEmptynull175 public fun <T> Flow<T>.onEmpty(
176 action: suspend FlowCollector<T>.() -> Unit
177 ): Flow<T> = unsafeFlow {
178 var isEmpty = true
179 collect {
180 isEmpty = false
181 emit(it)
182 }
183 if (isEmpty) {
184 val collector = SafeCollector(this, currentCoroutineContext())
185 try {
186 collector.action()
187 } finally {
188 collector.releaseIntercepted()
189 }
190 }
191 }
192
193 /*
194 * 'emitAll' methods call this to fail-fast before starting to collect
195 * their sources (that may not have any elements for a long time).
196 */
ensureActivenull197 internal fun FlowCollector<*>.ensureActive() {
198 if (this is ThrowingCollector) throw e
199 }
200
201 internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> {
emitnull202 override suspend fun emit(value: Any?) {
203 throw e
204 }
205 }
206
invokeSafelynull207 private suspend fun <T> FlowCollector<T>.invokeSafely(
208 action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
209 cause: Throwable?
210 ) {
211 try {
212 action(cause)
213 } catch (e: Throwable) {
214 if (cause !== null && cause !== e) e.addSuppressed(cause)
215 throw e
216 }
217 }
218