1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST")
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.jvm.*
14
15 // ------------------ WARNING ------------------
16 // These emitting operators must use safe flow builder, because they allow
17 // user code to directly emit to the underlying FlowCollector.
18
19 /**
20 * Applies [transform] function to each value of the given flow.
21 *
22 * The receiver of the `transform` is [FlowCollector] and thus `transform` is a
23 * flexible function that may transform emitted element, skip it or emit it multiple times.
24 *
25 * This operator generalizes [filter] and [map] operators and
26 * can be used as a building block for other operators, for example:
27 *
28 * ```
29 * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
30 * if (value % 2 == 0) { // Emit only even values, but twice
31 * emit(value)
32 * emit(value)
33 * } // Do nothing if odd
34 * }
35 * ```
36 */
37 public inline fun <T, R> Flow<T>.transform(
38 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
39 ): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
40 collect { value ->
41 // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
42 return@collect transform(value)
43 }
44 }
45
46 // For internal operator implementation
47 @PublishedApi
unsafeTransformnull48 internal inline fun <T, R> Flow<T>.unsafeTransform(
49 @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
50 ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
51 collect { value ->
52 // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
53 return@collect transform(value)
54 }
55 }
56
57 /**
58 * Returns a flow that invokes the given [action] **before** this flow starts to be collected.
59 *
60 * The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
61 * there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
62 * after this `onStart` action will be collected
63 * (see [onSubscription] for an alternative operator on shared flows).
64 *
65 * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
66 * For example:
67 *
68 * ```
69 * flowOf("a", "b", "c")
70 * .onStart { emit("Begin") }
71 * .collect { println(it) } // prints Begin, a, b, c
72 * ```
73 */
onStartnull74 public fun <T> Flow<T>.onStart(
75 action: suspend FlowCollector<T>.() -> Unit
76 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
77 val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
78 try {
79 safeCollector.action()
80 } finally {
81 safeCollector.releaseIntercepted()
82 }
83 collect(this) // directly delegate
84 }
85
86 /**
87 * Returns a flow that invokes the given [action] **after** the flow is completed or cancelled, passing
88 * the cancellation exception or failure as cause parameter of [action].
89 *
90 * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block,
91 * for example the following imperative snippet:
92 *
93 * ```
94 * try {
95 * myFlow.collect { value ->
96 * println(value)
97 * }
98 * } finally {
99 * println("Done")
100 * }
101 * ```
102 *
103 * can be replaced with a declarative one using `onCompletion`:
104 *
105 * ```
106 * myFlow
107 * .onEach { println(it) }
108 * .onCompletion { println("Done") }
109 * .collect()
110 * ```
111 *
112 * Unlike [catch], this operator reports exception that occur both upstream and downstream
113 * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if
114 * the flow had fully completed successfully. Conceptually, the following code:
115 *
116 * ```
117 * myFlow.collect { value ->
118 * println(value)
119 * }
120 * println("Completed successfully")
121 * ```
122 *
123 * can be replaced with:
124 *
125 * ```
126 * myFlow
127 * .onEach { println(it) }
128 * .onCompletion { if (it == null) println("Completed successfully") }
129 * .collect()
130 * ```
131 *
132 * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional
133 * elements at the end **if it completed successfully**. For example:
134 *
135 * ```
136 * flowOf("a", "b", "c")
137 * .onCompletion { emit("Done") }
138 * .collect { println(it) } // prints a, b, c, Done
139 * ```
140 *
141 * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
142 * Use [catch] if you need to suppress failure and replace it with emission of elements.
143 */
onCompletionnull144 public fun <T> Flow<T>.onCompletion(
145 action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
146 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
147 try {
148 collect(this)
149 } catch (e: Throwable) {
150 /*
151 * Use throwing collector to prevent any emissions from the
152 * completion sequence when downstream has failed, otherwise it may
153 * lead to a non-sequential behaviour impossible with `finally`
154 */
155 ThrowingCollector(e).invokeSafely(action, e)
156 throw e
157 }
158 // Normal completion
159 val sc = SafeCollector(this, currentCoroutineContext())
160 try {
161 sc.action(null)
162 } finally {
163 sc.releaseIntercepted()
164 }
165 }
166
167 /**
168 * Invokes the given [action] when this flow completes without emitting any elements.
169 * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
170 * For example:
171 *
172 * ```
173 * emptyFlow<Int>().onEmpty {
174 * emit(1)
175 * emit(2)
176 * }.collect { println(it) } // prints 1, 2
177 * ```
178 */
onEmptynull179 public fun <T> Flow<T>.onEmpty(
180 action: suspend FlowCollector<T>.() -> Unit
181 ): Flow<T> = unsafeFlow {
182 var isEmpty = true
183 collect {
184 isEmpty = false
185 emit(it)
186 }
187 if (isEmpty) {
188 val collector = SafeCollector(this, currentCoroutineContext())
189 try {
190 collector.action()
191 } finally {
192 collector.releaseIntercepted()
193 }
194 }
195 }
196
197 private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
emitnull198 override suspend fun emit(value: Any?) {
199 throw e
200 }
201 }
202
invokeSafelynull203 private suspend fun <T> FlowCollector<T>.invokeSafely(
204 action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
205 cause: Throwable?
206 ) {
207 try {
208 action(cause)
209 } catch (e: Throwable) {
210 if (cause !== null && cause !== e) e.addSuppressedThrowable(cause)
211 throw e
212 }
213 }
214