1 /* 2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. 3 */ 4 5 package kotlinx.coroutines.flow 6 7 import kotlinx.coroutines.* 8 import kotlinx.coroutines.flow.internal.* 9 import kotlin.coroutines.* 10 11 /** 12 * An asynchronous data stream that sequentially emits values and completes normally or with an exception. 13 * 14 * _Intermediate operators_ on the flow such as [map], [filter], [take], [zip], etc are functions that are 15 * applied to the _upstream_ flow or flows and return a _downstream_ flow where further operators can be applied to. 16 * Intermediate operations do not execute any code in the flow and are not suspending functions themselves. 17 * They only set up a chain of operations for future execution and quickly return. 18 * This is known as a _cold flow_ property. 19 * 20 * _Terminal operators_ on the flow are either suspending functions such as [collect], [single], [reduce], [toList], etc. 21 * or [launchIn] operator that starts collection of the flow in the given scope. 22 * They are applied to the upstream flow and trigger execution of all operations. 23 * Execution of the flow is also called _collecting the flow_ and is always performed in a suspending manner 24 * without actual blocking. Terminal operators complete normally or exceptionally depending on successful or failed 25 * execution of all the flow operations in the upstream. The most basic terminal operator is [collect], for example: 26 * 27 * ``` 28 * try { 29 * flow.collect { value -> 30 * println("Received $value") 31 * } 32 * } catch (e: Exception) { 33 * println("The flow has thrown an exception: $e") 34 * } 35 * ``` 36 * 37 * By default, flows are _sequential_ and all flow operations are executed sequentially in the same coroutine, 38 * with an exception for a few operations specifically designed to introduce concurrency into flow 39 * execution such as [buffer] and [flatMapMerge]. See their documentation for details. 40 * 41 * The `Flow` interface does not carry information whether a flow is a _cold_ stream that can be collected repeatedly and 42 * triggers execution of the same code every time it is collected, or if it is a _hot_ stream that emits different 43 * values from the same running source on each collection. Usually flows represent _cold_ streams, but 44 * there is a [SharedFlow] subtype that represents _hot_ streams. In addition to that, any flow can be turned 45 * into a _hot_ one by the [stateIn] and [shareIn] operators, or by converting the flow into a hot channel 46 * via the [produceIn] operator. 47 * 48 * ### Flow builders 49 * 50 * There are the following basic ways to create a flow: 51 * 52 * * [flowOf(...)][flowOf] functions to create a flow from a fixed set of values. 53 * * [asFlow()][asFlow] extension functions on various types to convert them into flows. 54 * * [flow { ... }][flow] builder function to construct arbitrary flows from 55 * sequential calls to [emit][FlowCollector.emit] function. 56 * * [channelFlow { ... }][channelFlow] builder function to construct arbitrary flows from 57 * potentially concurrent calls to the [send][kotlinx.coroutines.channels.SendChannel.send] function. 58 * * [MutableStateFlow] and [MutableSharedFlow] define the corresponding constructor functions to create 59 * a _hot_ flow that can be directly updated. 60 * 61 * ### Flow constraints 62 * 63 * All implementations of the `Flow` interface must adhere to two key properties described in detail below: 64 * 65 * * Context preservation. 66 * * Exception transparency. 67 * 68 * These properties ensure the ability to perform local reasoning about the code with flows and modularize the code 69 * in such a way that upstream flow emitters can be developed separately from downstream flow collectors. 70 * A user of a flow does not need to be aware of implementation details of the upstream flows it uses. 71 * 72 * ### Context preservation 73 * 74 * The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks 75 * it downstream, thus making reasoning about the execution context of particular transformations or terminal 76 * operations trivial. 77 * 78 * There is only one way to change the context of a flow: the [flowOn][Flow.flowOn] operator 79 * that changes the upstream context ("everything above the `flowOn` operator"). 80 * For additional information refer to its documentation. 81 * 82 * This reasoning can be demonstrated in practice: 83 * 84 * ``` 85 * val flowA = flowOf(1, 2, 3) 86 * .map { it + 1 } // Will be executed in ctxA 87 * .flowOn(ctxA) // Changes the upstream context: flowOf and map 88 * 89 * // Now we have a context-preserving flow: it is executed somewhere but this information is encapsulated in the flow itself 90 * 91 * val filtered = flowA // ctxA is encapsulated in flowA 92 * .filter { it == 3 } // Pure operator without a context yet 93 * 94 * withContext(Dispatchers.Main) { 95 * // All non-encapsulated operators will be executed in Main: filter and single 96 * val result = filtered.single() 97 * myUi.text = result 98 * } 99 * ``` 100 * 101 * From the implementation point of view, it means that all flow implementations should 102 * only emit from the same coroutine. 103 * This constraint is efficiently enforced by the default [flow] builder. 104 * The [flow] builder should be used if the flow implementation does not start any coroutines. 105 * Its implementation prevents most of the development mistakes: 106 * 107 * ``` 108 * val myFlow = flow { 109 * // GlobalScope.launch { // is prohibited 110 * // launch(Dispatchers.IO) { // is prohibited 111 * // withContext(CoroutineName("myFlow")) { // is prohibited 112 * emit(1) // OK 113 * coroutineScope { 114 * emit(2) // OK -- still the same coroutine 115 * } 116 * } 117 * ``` 118 * 119 * Use [channelFlow] if the collection and emission of a flow are to be separated into multiple coroutines. 120 * It encapsulates all the context preservation work and allows you to focus on your 121 * domain-specific problem, rather than invariant implementation details. 122 * It is possible to use any combination of coroutine builders from within [channelFlow]. 123 * 124 * If you are looking for performance and are sure that no concurrent emits and context jumps will happen, 125 * the [flow] builder can be used alongside a [coroutineScope] or [supervisorScope] instead: 126 * - Scoped primitive should be used to provide a [CoroutineScope]. 127 * - Changing the context of emission is prohibited, no matter whether it is `withContext(ctx)` or 128 * a builder argument (e.g. `launch(ctx)`). 129 * - Collecting another flow from a separate context is allowed, but it has the same effect as 130 * applying the [flowOn] operator to that flow, which is more efficient. 131 * 132 * ### Exception transparency 133 * 134 * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. 135 * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, 136 * suppressing the original exception as discussed below. 137 * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. 138 * 139 * The [catch][Flow.catch] operator only catches upstream exceptions, but passes 140 * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] 141 * throw any unhandled exceptions that occur in their code or in upstream flows, for example: 142 * 143 * ``` 144 * flow { emitData() } 145 * .map { computeOne(it) } 146 * .catch { ... } // catches exceptions in emitData and computeOne 147 * .map { computeTwo(it) } 148 * .collect { process(it) } // throws exceptions from process and computeTwo 149 * ``` 150 * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block. 151 * 152 * All exception-handling Flow operators follow the principle of exception suppression: 153 * 154 * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, 155 * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic 156 * equivalent of throwing from `finally` block. However, this doesn't affect the operation of the exception-handling operators, 157 * which consider the downstream exception to be the root cause and behave as if the upstream didn't throw anything. 158 * 159 * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make 160 * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" 161 * by an upstream flow, limiting the ability of local reasoning about the code. 162 * 163 * Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value, 164 * if an exception has been thrown on previous attempt. 165 * 166 * ### Reactive streams 167 * 168 * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with 169 * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module. 170 * 171 * ### Not stable for inheritance 172 * 173 * **The `Flow` interface is not stable for inheritance in 3rd party libraries**, as new methods 174 * might be added to this interface in the future, but is stable for use. 175 * 176 * Use the `flow { ... }` builder function to create an implementation, or extend [AbstractFlow]. 177 * These implementations ensure that the context preservation property is not violated, and prevent most 178 * of the developer mistakes related to concurrency, inconsistent flow dispatchers, and cancellation. 179 */ 180 public interface Flow<out T> { 181 182 /** 183 * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. 184 * 185 * This method can be used along with SAM-conversion of [FlowCollector]: 186 * ``` 187 * myFlow.collect { value -> println("Collected $value") } 188 * ``` 189 * 190 * ### Method inheritance 191 * 192 * To ensure the context preservation property, it is not recommended implementing this method directly. 193 * Instead, [AbstractFlow] can be used as the base type to properly ensure flow's properties. 194 * 195 * All default flow implementations ensure context preservation and exception transparency properties on a best-effort basis 196 * and throw [IllegalStateException] if a violation was detected. 197 */ collectnull198 public suspend fun collect(collector: FlowCollector<T>) 199 } 200 201 /** 202 * Base class for stateful implementations of `Flow`. 203 * It tracks all the properties required for context preservation and throws an [IllegalStateException] 204 * if any of the properties are violated. 205 * 206 * Example of the implementation: 207 * 208 * ``` 209 * // list.asFlow() + collect counter 210 * class CountingListFlow(private val values: List<Int>) : AbstractFlow<Int>() { 211 * private val collectedCounter = AtomicInteger(0) 212 * 213 * override suspend fun collectSafely(collector: FlowCollector<Int>) { 214 * collectedCounter.incrementAndGet() // Increment collected counter 215 * values.forEach { // Emit all the values 216 * collector.emit(it) 217 * } 218 * } 219 * 220 * fun toDiagnosticString(): String = "Flow with values $values was collected ${collectedCounter.value} times" 221 * } 222 * ``` 223 */ 224 @ExperimentalCoroutinesApi 225 public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> { 226 227 public final override suspend fun collect(collector: FlowCollector<T>) { 228 val safeCollector = SafeCollector(collector, coroutineContext) 229 try { 230 collectSafely(safeCollector) 231 } finally { 232 safeCollector.releaseIntercepted() 233 } 234 } 235 236 /** 237 * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. 238 * 239 * A valid implementation of this method has the following constraints: 240 * 1) It should not change the coroutine context (e.g. with `withContext(Dispatchers.IO)`) when emitting values. 241 * The emission should happen in the context of the [collect] call. 242 * Please refer to the top-level [Flow] documentation for more details. 243 * 2) It should serialize calls to [emit][FlowCollector.emit] as [FlowCollector] implementations are not 244 * thread-safe by default. 245 * To automatically serialize emissions [channelFlow] builder can be used instead of [flow] 246 * 247 * @throws IllegalStateException if any of the invariants are violated. 248 */ 249 public abstract suspend fun collectSafely(collector: FlowCollector<T>) 250 } 251