• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST")
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.flow.internal.*
13 import kotlin.jvm.*
14 
15 // ------------------ WARNING ------------------
16 //   These emitting operators must use safe flow builder, because they allow
17 //   user code to directly emit to the underlying FlowCollector.
18 
19 /**
20  * Applies [transform] function to each value of the given flow.
21  *
22  * The receiver of the `transform` is [FlowCollector] and thus `transform` is a
23  * flexible function that may transform emitted element, skip it or emit it multiple times.
24  *
25  * This operator generalizes [filter] and [map] operators and
26  * can be used as a building block for other operators, for example:
27  *
28  * ```
29  * fun Flow<Int>.skipOddAndDuplicateEven(): Flow<Int> = transform { value ->
30  *     if (value % 2 == 0) { // Emit only even values, but twice
31  *         emit(value)
32  *         emit(value)
33  *     } // Do nothing if odd
34  * }
35  * ```
36  */
37 public inline fun <T, R> Flow<T>.transform(
38     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
39 ): Flow<R> = flow { // Note: safe flow is used here, because collector is exposed to transform on each operation
40     collect { value ->
41         // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
42         return@collect transform(value)
43     }
44 }
45 
46 // For internal operator implementation
47 @PublishedApi
unsafeTransformnull48 internal inline fun <T, R> Flow<T>.unsafeTransform(
49     @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
50 ): Flow<R> = unsafeFlow { // Note: unsafe flow is used here, because unsafeTransform is only for internal use
51     collect { value ->
52         // kludge, without it Unit will be returned and TCE won't kick in, KT-28938
53         return@collect transform(value)
54     }
55 }
56 
57 /**
58  * Returns a flow that invokes the given [action] **before** this flow starts to be collected.
59  *
60  * The [action] is called before the upstream flow is started, so if it is used with a [SharedFlow]
61  * there is **no guarantee** that emissions from the upstream flow that happen inside or immediately
62  * after this `onStart` action will be collected
63  * (see [onSubscription] for an alternative operator on shared flows).
64  *
65  * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements.
66  * For example:
67  *
68  * ```
69  * flowOf("a", "b", "c")
70  *     .onStart { emit("Begin") }
71  *     .collect { println(it) } // prints Begin, a, b, c
72  * ```
73  */
onStartnull74 public fun <T> Flow<T>.onStart(
75     action: suspend FlowCollector<T>.() -> Unit
76 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
77     val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
78     try {
79         safeCollector.action()
80     } finally {
81         safeCollector.releaseIntercepted()
82     }
83     collect(this) // directly delegate
84 }
85 
86 /**
87  * Returns a flow that invokes the given [action] **after** the flow is completed or cancelled, passing
88  * the cancellation exception or failure as cause parameter of [action].
89  *
90  * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block,
91  * for example the following imperative snippet:
92  *
93  * ```
94  * try {
95  *     myFlow.collect { value ->
96  *         println(value)
97  *     }
98  * } finally {
99  *     println("Done")
100  * }
101  * ```
102  *
103  * can be replaced with a declarative one using `onCompletion`:
104  *
105  * ```
106  * myFlow
107  *     .onEach { println(it) }
108  *     .onCompletion { println("Done") }
109  *     .collect()
110  * ```
111  *
112  * Unlike [catch], this operator reports exception that occur both upstream and downstream
113  * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if
114  * the flow had fully completed successfully. Conceptually, the following code:
115  *
116  * ```
117  * myFlow.collect { value ->
118  *     println(value)
119  * }
120  * println("Completed successfully")
121  * ```
122  *
123  * can be replaced with:
124  *
125  * ```
126  * myFlow
127  *     .onEach { println(it) }
128  *     .onCompletion { if (it == null) println("Completed successfully") }
129  *     .collect()
130  * ```
131  *
132  * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional
133  * elements at the end **if it completed successfully**. For example:
134  *
135  * ```
136  * flowOf("a", "b", "c")
137  *     .onCompletion { emit("Done") }
138  *     .collect { println(it) } // prints a, b, c, Done
139  * ```
140  *
141  * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
142  * Use [catch] if you need to suppress failure and replace it with emission of elements.
143  */
onCompletionnull144 public fun <T> Flow<T>.onCompletion(
145     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
146 ): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
147     try {
148         collect(this)
149     } catch (e: Throwable) {
150         /*
151          * Use throwing collector to prevent any emissions from the
152          * completion sequence when downstream has failed, otherwise it may
153          * lead to a non-sequential behaviour impossible with `finally`
154          */
155         ThrowingCollector(e).invokeSafely(action, e)
156         throw e
157     }
158     // Normal completion
159     val sc = SafeCollector(this, currentCoroutineContext())
160     try {
161         sc.action(null)
162     } finally {
163         sc.releaseIntercepted()
164     }
165 }
166 
167 /**
168  * Invokes the given [action] when this flow completes without emitting any elements.
169  * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements.
170  * For example:
171  *
172  * ```
173  * emptyFlow<Int>().onEmpty {
174  *     emit(1)
175  *     emit(2)
176  * }.collect { println(it) } // prints 1, 2
177  * ```
178  */
onEmptynull179 public fun <T> Flow<T>.onEmpty(
180     action: suspend FlowCollector<T>.() -> Unit
181 ): Flow<T> = unsafeFlow {
182     var isEmpty = true
183     collect {
184         isEmpty = false
185         emit(it)
186     }
187     if (isEmpty) {
188         val collector = SafeCollector(this, currentCoroutineContext())
189         try {
190             collector.action()
191         } finally {
192             collector.releaseIntercepted()
193         }
194     }
195 }
196 
197 /*
198  * 'emitAll' methods call this to fail-fast before starting to collect
199  * their sources (that may not have any elements for a long time).
200  */
ensureActivenull201 internal fun FlowCollector<*>.ensureActive() {
202     if (this is ThrowingCollector) throw e
203 }
204 
205 internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> {
emitnull206     override suspend fun emit(value: Any?) {
207         throw e
208     }
209 }
210 
invokeSafelynull211 private suspend fun <T> FlowCollector<T>.invokeSafely(
212     action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit,
213     cause: Throwable?
214 ) {
215     try {
216         action(cause)
217     } catch (e: Throwable) {
218         if (cause !== null && cause !== e) e.addSuppressedThrowable(cause)
219         throw e
220     }
221 }
222