1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3
4 package kotlinx.coroutines.flow
5
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
10 import kotlinx.coroutines.flow.internal.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13
14 /**
15 * Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
16 *
17 * Normally, [flows][Flow] are _sequential_. It means that the code of all operators is executed in the
18 * same coroutine. For example, consider the following code using [onEach] and [collect] operators:
19 *
20 * ```
21 * flowOf("A", "B", "C")
22 * .onEach { println("1$it") }
23 * .collect { println("2$it") }
24 * ```
25 *
26 * It is going to be executed in the following order by the coroutine `Q` that calls this code:
27 *
28 * ```
29 * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
30 * ```
31 *
32 * So if the operator's code takes considerable time to execute, then the total execution time is going to be
33 * the sum of execution times for all operators.
34 *
35 * The `buffer` operator creates a separate coroutine during execution for the flow it applies to.
36 * Consider the following code:
37 *
38 * ```
39 * flowOf("A", "B", "C")
40 * .onEach { println("1$it") }
41 * .buffer() // <--------------- buffer between onEach and collect
42 * .collect { println("2$it") }
43 * ```
44 *
45 * It will use two coroutines for execution of the code. A coroutine `Q` that calls this code is
46 * going to execute `collect`, and the code before `buffer` will be executed in a separate
47 * new coroutine `P` concurrently with `Q`:
48 *
49 * ```
50 * P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }
51 *
52 * |
53 * | channel // buffer()
54 * V
55 *
56 * Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect
57 * ```
58 *
59 * When the operator's code takes some time to execute, this decreases the total execution time of the flow.
60 * A [channel][Channel] is used between the coroutines to send elements emitted by the coroutine `P` to
61 * the coroutine `Q`. If the code before `buffer` operator (in the coroutine `P`) is faster than the code after
62 * `buffer` operator (in the coroutine `Q`), then this channel will become full at some point and will suspend
63 * the producer coroutine `P` until the consumer coroutine `Q` catches up.
64 * The [capacity] parameter defines the size of this buffer.
65 *
66 * ### Buffer overflow
67 *
68 * By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be
69 * overridden with an optional [onBufferOverflow] parameter so that the emitter is never suspended. In this
70 * case, on buffer overflow either the oldest value in the buffer is dropped with the [DROP_OLDEST][BufferOverflow.DROP_OLDEST]
71 * strategy and the latest emitted value is added to the buffer,
72 * or the latest value that is being emitted is dropped with the [DROP_LATEST][BufferOverflow.DROP_LATEST] strategy,
73 * keeping the buffer intact.
74 * To implement either of the custom strategies, a buffer of at least one element is used.
75 *
76 * ### Operator fusion
77 *
78 * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
79 * always fused so that only one properly configured channel is used for execution.
80 *
81 * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
82 * which effectively requests a buffer of any size. Multiple requests with a specified buffer
83 * size produce a buffer with the sum of the requested buffer sizes.
84 *
85 * A `buffer` call with a non-default value of the [onBufferOverflow] parameter overrides all immediately preceding
86 * buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.
87 *
88 * ### Conceptual implementation
89 *
90 * The actual implementation of `buffer` is not trivial due to the fusing, but conceptually its basic
91 * implementation is equivalent to the following code that can be written using [produce]
92 * coroutine builder to produce a channel and [consumeEach][ReceiveChannel.consumeEach] extension to consume it:
93 *
94 * ```
95 * fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
96 * coroutineScope { // limit the scope of concurrent producer coroutine
97 * val channel = produce(capacity = capacity) {
98 * collect { send(it) } // send all to channel
99 * }
100 * // emit all received values
101 * channel.consumeEach { emit(it) }
102 * }
103 * }
104 * ```
105 *
106 * ### Conflation
107 *
108 * Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is a shortcut to
109 * `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`, and is available via
110 * a separate [conflate] operator. See its documentation for details.
111 *
112 * @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
113 * factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
114 * [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
115 * an explicitly requested size.
116 * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
117 * [SUSPEND][BufferOverflow.SUSPEND], supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
118 * implicitly creates a channel with at least one buffered element).
119 */
120 @Suppress("NAME_SHADOWING")
buffernull121 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
122 require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
123 "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
124 }
125 require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
126 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
127 }
128 // desugar CONFLATED capacity to (0, DROP_OLDEST)
129 var capacity = capacity
130 var onBufferOverflow = onBufferOverflow
131 if (capacity == CONFLATED) {
132 capacity = 0
133 onBufferOverflow = BufferOverflow.DROP_OLDEST
134 }
135 // create a flow
136 return when (this) {
137 is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
138 else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
139 }
140 }
141
142 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
buffernull143 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capacity)
144
145 /**
146 * Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
147 * The effect of this is that emitter is never suspended due to a slow collector, but collector
148 * always gets the most recent value emitted.
149 *
150 * For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
151 *
152 * ```
153 * val flow = flow {
154 * for (i in 1..30) {
155 * delay(100)
156 * emit(i)
157 * }
158 * }
159 * ```
160 *
161 * Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
162 * integers 1, 10, 20, 30:
163 *
164 * ```
165 * val result = flow.conflate().onEach { delay(1000) }.toList()
166 * assertEquals(listOf(1, 10, 20, 30), result)
167 * ```
168 *
169 * Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED],
170 * which is, in turn, a shortcut to a buffer that only keeps the latest element as
171 * created by `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`.
172 *
173 * ### Operator fusion
174 *
175 * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
176 * always fused so that only one properly configured channel is used for execution.
177 * **Conflation takes precedence over `buffer()` calls with any other capacity.**
178 *
179 * Note that any instance of [StateFlow] already behaves as if `conflate` operator is
180 * applied to it, so applying `conflate` to a `StateFlow` has no effect.
181 * See [StateFlow] documentation on Operator Fusion.
182 */
183 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
184
185 /**
186 * Changes the context where this flow is executed to the given [context].
187 * This operator is composable and affects only preceding operators that do not have its own context.
188 * This operator is context preserving: [context] **does not** leak into the downstream flow.
189 *
190 * For example:
191 *
192 * ```
193 * withContext(Dispatchers.Main) {
194 * val singleValue = intFlow // will be executed on IO if context wasn't specified before
195 * .map { ... } // Will be executed in IO
196 * .flowOn(Dispatchers.IO)
197 * .filter { ... } // Will be executed in Default
198 * .flowOn(Dispatchers.Default)
199 * .single() // Will be executed in the Main
200 * }
201 * ```
202 *
203 * For more explanation of context preservation please refer to [Flow] documentation.
204 *
205 * This operator retains a _sequential_ nature of flow if changing the context does not call for changing
206 * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
207 * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
208 * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
209 * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
210 * before or after `flowOn`, which requests buffering behavior and specifies channel size.
211 *
212 * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
213 * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
214 * was already emitted by the upstream flow.
215 *
216 * ### Operator fusion
217 *
218 * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
219 * always fused so that only one properly configured channel is used for execution.
220 *
221 * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
222 * the first `flowOn` operator naturally take precedence over the elements of the second `flowOn` operator
223 * when they have the same context keys, for example:
224 *
225 * ```
226 * flow.map { ... } // Will be executed in IO
227 * .flowOn(Dispatchers.IO) // This one takes precedence
228 * .flowOn(Dispatchers.Default)
229 * ```
230 *
231 * Note that an instance of [SharedFlow] does not have an execution context by itself,
232 * so applying `flowOn` to a `SharedFlow` has not effect. See the [SharedFlow] documentation on Operator Fusion.
233 *
234 * @throws [IllegalArgumentException] if provided context contains [Job] instance.
235 */
236 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
237 checkFlowContext(context)
238 return when {
239 context == EmptyCoroutineContext -> this
240 this is FusibleFlow -> fuse(context = context)
241 else -> ChannelFlowOperatorImpl(this, context = context)
242 }
243 }
244
245 /**
246 * Returns a flow which checks cancellation status on each emission and throws
247 * the corresponding cancellation cause if flow collector was cancelled.
248 * Note that [flow] builder and all implementations of [SharedFlow] are [cancellable] by default.
249 *
250 * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
251 * See [ensureActive][CoroutineContext.ensureActive] for details.
252 */
cancellablenull253 public fun <T> Flow<T>.cancellable(): Flow<T> =
254 when (this) {
255 is CancellableFlow<*> -> this // Fast-path, already cancellable
256 else -> CancellableFlowImpl(this)
257 }
258
259 /**
260 * Internal marker for flows that are [cancellable].
261 */
262 internal interface CancellableFlow<out T> : Flow<T>
263
264 /**
265 * Named implementation class for a flow that is defined by the [cancellable] function.
266 */
267 private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
collectnull268 override suspend fun collect(collector: FlowCollector<T>) {
269 flow.collect {
270 currentCoroutineContext().ensureActive()
271 collector.emit(it)
272 }
273 }
274 }
275
checkFlowContextnull276 private fun checkFlowContext(context: CoroutineContext) {
277 require(context[Job] == null) {
278 "Flow context cannot contain job in it. Had $context"
279 }
280 }
281