• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.flow
6 
7 import kotlinx.coroutines.*
8 import kotlinx.coroutines.flow.internal.*
9 import kotlin.coroutines.*
10 
11 /**
12  * An asynchronous data stream that sequentially emits values and completes normally or with an exception.
13  *
14  * _Intermediate operators_ on the flow such as [map], [filter], [take], [zip], etc are functions that are
15  * applied to the _upstream_ flow or flows and return a _downstream_ flow where further operators can be applied to.
16  * Intermediate operations do not execute any code in the flow and are not suspending functions themselves.
17  * They only set up a chain of operations for future execution and quickly return.
18  * This is known as a _cold flow_ property.
19  *
20  * _Terminal operators_ on the flow are either suspending functions such as [collect], [single], [reduce], [toList], etc.
21  * or [launchIn] operator that starts collection of the flow in the given scope.
22  * They are applied to the upstream flow and trigger execution of all operations.
23  * Execution of the flow is also called _collecting the flow_  and is always performed in a suspending manner
24  * without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed
25  * execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example:
26  *
27  * ```
28  * try {
29  *     flow.collect { value ->
30  *         println("Received $value")
31  *     }
32  * } catch (e: Exception) {
33  *     println("The flow has thrown an exception: $e")
34  * }
35  * ```
36  *
37  * By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine,
38  * with an exception for a few operations specifically designed to introduce concurrency into flow
39  * execution such as [buffer] and [flatMapMerge]. See their documentation for details.
40  *
41  * The `Flow` interface does not carry information whether a flow is a _cold_ stream that can be collected repeatedly and
42  * triggers execution of the same code every time it is collected, or if it is a _hot_ stream that emits different
43  * values from the same running source on each collection. Usually flows represent _cold_ streams, but
44  * there is a [SharedFlow] subtype that represents _hot_ streams. In addition to that, any flow can be turned
45  * into a _hot_ one by the [stateIn] and [shareIn] operators, or by converting the flow into a hot channel
46  * via the [produceIn] operator.
47  *
48  * ### Flow builders
49  *
50  * There are the following basic ways to create a flow:
51  *
52  * * [flowOf(...)][flowOf] functions to create a flow from a fixed set of values.
53  * * [asFlow()][asFlow] extension functions on various types to convert them into flows.
54  * * [flow { ... }][flow] builder function to construct arbitrary flows from
55  *   sequential calls to [emit][FlowCollector.emit] function.
56  * * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from
57  *   potentially concurrent calls to the [send][kotlinx.coroutines.channels.SendChannel.send] function.
58  * * [MutableStateFlow] and [MutableSharedFlow] define the corresponding constructor functions to create
59  *   a _hot_ flow that can be directly updated.
60  *
61  * ### Flow constraints
62  *
63  * All implementations of the `Flow` interface must adhere to two key properties described in detail below:
64  *
65  * * Context preservation.
66  * * Exception transparency.
67  *
68  * These properties ensure the ability to perform local reasoning about the code with flows and modularize the code
69  * in such a way that upstream flow emitters can be developed separately from downstream flow collectors.
70  * A user of a flow does not need to be aware of implementation details of the upstream flows it uses.
71  *
72  * ### Context preservation
73  *
74  * The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks
75  * it downstream, thus making reasoning about the execution context of particular transformations or terminal
76  * operations trivial.
77  *
78  * There is only one way to change the context of a flow: the [flowOn][Flow.flowOn] operator
79  * that changes the upstream context ("everything above the `flowOn` operator").
80  * For additional information refer to its documentation.
81  *
82  * This reasoning can be demonstrated in practice:
83  *
84  * ```
85  * val flowA = flowOf(1, 2, 3)
86  *     .map { it + 1 } // Will be executed in ctxA
87  *     .flowOn(ctxA) // Changes the upstream context: flowOf and map
88  *
89  * // Now we have a context-preserving flow: it is executed somewhere but this information is encapsulated in the flow itself
90  *
91  * val filtered = flowA // ctxA is encapsulated in flowA
92  *    .filter { it == 3 } // Pure operator without a context yet
93  *
94  * withContext(Dispatchers.Main) {
95  *     // All non-encapsulated operators will be executed in Main: filter and single
96  *     val result = filtered.single()
97  *     myUi.text = result
98  * }
99  * ```
100  *
101  * From the implementation point of view, it means that all flow implementations should
102  * only emit from the same coroutine.
103  * This constraint is efficiently enforced by the default [flow] builder.
104  * The [flow] builder should be used if the flow implementation does not start any coroutines.
105  * Its implementation prevents most of the development mistakes:
106  *
107  * ```
108  * val myFlow = flow {
109  *    // GlobalScope.launch { // is prohibited
110  *    // launch(Dispatchers.IO) { // is prohibited
111  *    // withContext(CoroutineName("myFlow")) { // is prohibited
112  *    emit(1) // OK
113  *    coroutineScope {
114  *        emit(2) // OK -- still the same coroutine
115  *    }
116  * }
117  * ```
118  *
119  * Use [channelFlow] if the collection and emission of a flow are to be separated into multiple coroutines.
120  * It encapsulates all the context preservation work and allows you to focus on your
121  * domain-specific problem, rather than invariant implementation details.
122  * It is possible to use any combination of coroutine builders from within [channelFlow].
123  *
124  * If you are looking for performance and are sure that no concurrent emits and context jumps will happen,
125  * the [flow] builder can be used alongside a [coroutineScope] or [supervisorScope] instead:
126  *  - Scoped primitive should be used to provide a [CoroutineScope].
127  *  - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or
128  *    a builder argument (e.g. `launch(ctx)`).
129  *  - Collecting another flow from a separate context is allowed, but it has the same effect as
130  *    applying the [flowOn] operator to that flow, which is more efficient.
131  *
132  * ### Exception transparency
133  *
134  * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception.
135  * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation,
136  * suppressing the original exception as discussed below.
137  * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator.
138  *
139  * The [catch][Flow.catch] operator only catches upstream exceptions, but passes
140  * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
141  * throw any unhandled exceptions that occur in their code or in upstream flows, for example:
142  *
143  * ```
144  * flow { emitData() }
145  *     .map { computeOne(it) }
146  *     .catch { ... } // catches exceptions in emitData and computeOne
147  *     .map { computeTwo(it) }
148  *     .collect { process(it) } // throws exceptions from process and computeTwo
149  * ```
150  * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block.
151  *
152  * All exception-handling Flow operators follow the principle of exception suppression:
153  *
154  * If the upstream flow throws an exception during its completion when the downstream exception has been thrown,
155  * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic
156  * equivalent of throwing from `finally` block. However, this doesn't affect the operation of the exception-handling operators,
157  * which consider the downstream exception to be the root cause and behave as if the upstream didn't throw anything.
158  *
159  * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
160  * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
161  * by an upstream flow, limiting the ability of local reasoning about the code.
162  *
163  * Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value,
164  * if an exception has been thrown on previous attempt.
165  *
166  * ### Reactive streams
167  *
168  * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
169  * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module.
170  *
171  * ### Not stable for inheritance
172  *
173  * **The `Flow` interface is not stable for inheritance in 3rd party libraries**, as new methods
174  * might be added to this interface in the future, but is stable for use.
175  *
176  * Use the `flow { ... }` builder function to create an implementation, or extend [AbstractFlow].
177  * These implementations ensure that the context preservation property is not violated, and prevent most
178  * of the developer mistakes related to concurrency, inconsistent flow dispatchers, and cancellation.
179  */
180 public interface Flow<out T> {
181 
182     /**
183      * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
184      *
185      * This method can be used along with SAM-conversion of [FlowCollector]:
186      * ```
187      * myFlow.collect { value -> println("Collected $value") }
188      * ```
189      *
190      * ### Method inheritance
191      *
192      * To ensure the context preservation property, it is not recommended implementing this method directly.
193      * Instead, [AbstractFlow] can be used as the base type to properly ensure flow's properties.
194      *
195      * All default flow implementations ensure context preservation and exception transparency properties on a best-effort basis
196      * and throw [IllegalStateException] if a violation was detected.
197      */
collectnull198     public suspend fun collect(collector: FlowCollector<T>)
199 }
200 
201 /**
202  * Base class for stateful implementations of `Flow`.
203  * It tracks all the properties required for context preservation and throws an [IllegalStateException]
204  * if any of the properties are violated.
205  *
206  * Example of the implementation:
207  *
208  * ```
209  * // list.asFlow() + collect counter
210  * class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() {
211  *     private val collectedCounter = AtomicInteger(0)
212  *
213  *     override suspend fun collectSafely(collector: FlowCollector<Int>) {
214  *         collectedCounter.incrementAndGet() // Increment collected counter
215  *         values.forEach { // Emit all the values
216  *             collector.emit(it)
217  *         }
218  *     }
219  *
220  *     fun toDiagnosticString(): String = "Flow with values $values was collected ${collectedCounter.value} times"
221  * }
222  * ```
223  */
224 @ExperimentalCoroutinesApi
225 public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
226 
227     public final override suspend fun collect(collector: FlowCollector<T>) {
228         val safeCollector = SafeCollector(collector, coroutineContext)
229         try {
230             collectSafely(safeCollector)
231         } finally {
232             safeCollector.releaseIntercepted()
233         }
234     }
235 
236     /**
237      * Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
238      *
239      * A valid implementation of this method has the following constraints:
240      * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values.
241      *    The emission should happen in the context of the [collect] call.
242      *    Please refer to the top-level [Flow] documentation for more details.
243      * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not
244      *    thread-safe by default.
245      *    To automatically serialize emissions [channelFlow] builder can be used instead of [flow]
246      *
247      * @throws IllegalStateException if any of the invariants are violated.
248      */
249     public abstract suspend fun collectSafely(collector: FlowCollector<T>)
250 }
251