• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 @file:Suppress("UNCHECKED_CAST")
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.jvm.*
10 
11 // ------------------ WARNING ------------------
12 //   These emitting operators must use safe flow builder, because they allow
13 //   user code to directly emit to the underlying FlowCollector.
14 
15 /**
16  * Applies [transform] function to each value of the given flow.
17  *
18  * The receiver of the `transform` is [FlowCollector] and thus `transform` is a
19  * flexible function that may transform emitted element, skip it or emit it multiple times.
20  *
21  * This operator generalizes [filter] and [map] operators and
22  * can be used as a building block for other operators, for example:
23  *
24  * ```
25  * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
26  *     if (value % 2 == 0) { // Emit only even values, but twice
27  *         emit(value)
28  *         emit(value)
29  *     } // Do nothing if odd
30  * }
31  * ```
32  */
33 public inline fun <T, R> Flow<T>.transform(
34     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
35 ): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
36     collect { value ->
37         // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
38         return@collect transform(value)
39     }
40 }
41 
42 // For internal operator implementation
43 @PublishedApi
unsafeTransformnull44 internal inline fun <T, R> Flow<T>.unsafeTransform(
45     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
46 ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
47     collect { value ->
48         // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
49         return@collect transform(value)
50     }
51 }
52 
53 /**
54  * Returns a flow that invokes the given [action] **before** this flow starts to be collected.
55  *
56  * The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
57  * there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
58  * after this `onStart` action will be collected
59  * (see [onSubscription] for an alternative operator on shared flows).
60  *
61  * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
62  * For example:
63  *
64  * ```
65  * flowOf("a", "b", "c")
66  *     .onStart { emit("Begin") }
67  *     .collect { println(it) } // prints Begin, a, b, c
68  * ```
69  */
onStartnull70 public fun <T> Flow<T>.onStart(
71     action: suspend FlowCollector<T>.() -> Unit
72 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
73     val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
74     try {
75         safeCollector.action()
76     } finally {
77         safeCollector.releaseIntercepted()
78     }
79     collect(this) // directly delegate
80 }
81 
82 /**
83  * Returns a flow that invokes the given [action] **after** the flow is completed or cancelled, passing
84  * the cancellation exception or failure as cause parameter of [action].
85  *
86  * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block,
87  * for example the following imperative snippet:
88  *
89  * ```
90  * try {
91  *     myFlow.collect { value ->
92  *         println(value)
93  *     }
94  * } finally {
95  *     println("Done")
96  * }
97  * ```
98  *
99  * can be replaced with a declarative one using `onCompletion`:
100  *
101  * ```
102  * myFlow
103  *     .onEach { println(it) }
104  *     .onCompletion { println("Done") }
105  *     .collect()
106  * ```
107  *
108  * Unlike [catch], this operator reports exception that occur both upstream and downstream
109  * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if
110  * the flow had fully completed successfully. Conceptually, the following code:
111  *
112  * ```
113  * myFlow.collect { value ->
114  *     println(value)
115  * }
116  * println("Completed successfully")
117  * ```
118  *
119  * can be replaced with:
120  *
121  * ```
122  * myFlow
123  *     .onEach { println(it) }
124  *     .onCompletion { if (it == null) println("Completed successfully") }
125  *     .collect()
126  * ```
127  *
128  * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional
129  * elements at the end **if it completed successfully**. For example:
130  *
131  * ```
132  * flowOf("a", "b", "c")
133  *     .onCompletion { emit("Done") }
134  *     .collect { println(it) } // prints a, b, c, Done
135  * ```
136  *
137  * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
138  * Use [catch] if you need to suppress failure and replace it with emission of elements.
139  */
onCompletionnull140 public fun <T> Flow<T>.onCompletion(
141     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
142 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
143     try {
144         collect(this)
145     } catch (e: Throwable) {
146         /*
147          * Use throwing collector to prevent any emissions from the
148          * completion sequence when downstream has failed, otherwise it may
149          * lead to a non-sequential behaviour impossible with `finally`
150          */
151         ThrowingCollector(e).invokeSafely(action, e)
152         throw e
153     }
154     // Normal completion
155     val sc = SafeCollector(this, currentCoroutineContext())
156     try {
157         sc.action(null)
158     } finally {
159         sc.releaseIntercepted()
160     }
161 }
162 
163 /**
164  * Invokes the given [action] when this flow completes without emitting any elements.
165  * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
166  * For example:
167  *
168  * ```
169  * emptyFlow<Int>().onEmpty {
170  *     emit(1)
171  *     emit(2)
172  * }.collect { println(it) } // prints 1, 2
173  * ```
174  */
onEmptynull175 public fun <T> Flow<T>.onEmpty(
176     action: suspend FlowCollector<T>.() -> Unit
177 ): Flow<T> = unsafeFlow {
178     var isEmpty = true
179     collect {
180         isEmpty = false
181         emit(it)
182     }
183     if (isEmpty) {
184         val collector = SafeCollector(this, currentCoroutineContext())
185         try {
186             collector.action()
187         } finally {
188             collector.releaseIntercepted()
189         }
190     }
191 }
192 
193 /*
194  * 'emitAll' methods call this to fail-fast before starting to collect
195  * their sources (that may not have any elements for a long time).
196  */
ensureActivenull197 internal fun FlowCollector<*>.ensureActive() {
198     if (this is ThrowingCollector) throw e
199 }
200 
201 internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> {
emitnull202     override suspend fun emit(value: Any?) {
203         throw e
204     }
205 }
206 
invokeSafelynull207 private suspend fun <T> FlowCollector<T>.invokeSafely(
208     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
209     cause: Throwable?
210 ) {
211     try {
212         action(cause)
213     } catch (e: Throwable) {
214         if (cause !== null && cause !== e) e.addSuppressed(cause)
215         throw e
216     }
217 }
218