• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:JvmMultifileClass
2 @file:JvmName("FlowKt")
3 
4 package kotlinx.coroutines.flow
5 
6 import kotlinx.coroutines.*
7 import kotlinx.coroutines.channels.*
8 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
9 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
10 import kotlinx.coroutines.flow.internal.*
11 import kotlin.coroutines.*
12 import kotlin.jvm.*
13 
14 /**
15  * Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
16  *
17  * Normally, [flows][Flow] are _sequential_. It means that the code of all operators is executed in the
18  * same coroutine. For example, consider the following code using [onEach] and [collect] operators:
19  *
20  * ```
21  * flowOf("A", "B", "C")
22  *     .onEach  { println("1$it") }
23  *     .collect { println("2$it") }
24  * ```
25  *
26  * It is going to be executed in the following order by the coroutine `Q` that calls this code:
27  *
28  * ```
29  * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
30  * ```
31  *
32  * So if the operator's code takes considerable time to execute, then the total execution time is going to be
33  * the sum of execution times for all operators.
34  *
35  * The `buffer` operator creates a separate coroutine during execution for the flow it applies to.
36  * Consider the following code:
37  *
38  * ```
39  * flowOf("A", "B", "C")
40  *     .onEach  { println("1$it") }
41  *     .buffer()  // <--------------- buffer between onEach and collect
42  *     .collect { println("2$it") }
43  * ```
44  *
45  * It will use two coroutines for execution of the code. A coroutine `Q` that calls this code is
46  * going to execute `collect`, and the code before `buffer` will be executed in a separate
47  * new coroutine `P` concurrently with `Q`:
48  *
49  * ```
50  * P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }
51  *
52  *                       |
53  *                       | channel               // buffer()
54  *                       V
55  *
56  * Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect
57  * ```
58  *
59  * When the operator's code takes some time to execute, this decreases the total execution time of the flow.
60  * A [channel][Channel] is used between the coroutines to send elements emitted by the coroutine `P` to
61  * the coroutine `Q`. If the code before `buffer` operator (in the coroutine `P`) is faster than the code after
62  * `buffer` operator (in the coroutine `Q`), then this channel will become full at some point and will suspend
63  * the producer coroutine `P` until the consumer coroutine `Q` catches up.
64  * The [capacity] parameter defines the size of this buffer.
65  *
66  * ### Buffer overflow
67  *
68  * By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be
69  * overridden with an optional [onBufferOverflow] parameter so that the emitter is never suspended. In this
70  * case, on buffer overflow either the oldest value in the buffer is dropped with the [DROP_OLDEST][BufferOverflow.DROP_OLDEST]
71  * strategy and the latest emitted value is added to the buffer,
72  * or the latest value that is being emitted is dropped with the [DROP_LATEST][BufferOverflow.DROP_LATEST] strategy,
73  * keeping the buffer intact.
74  * To implement either of the custom strategies, a buffer of at least one element is used.
75  *
76  * ### Operator fusion
77  *
78  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
79  * always fused so that only one properly configured channel is used for execution.
80  *
81  * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
82  * which effectively requests a buffer of any size. Multiple requests with a specified buffer
83  * size produce a buffer with the sum of the requested buffer sizes.
84  *
85  * A `buffer` call with a non-default value of the [onBufferOverflow] parameter overrides all immediately preceding
86  * buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.
87  *
88  * ### Conceptual implementation
89  *
90  * The actual implementation of `buffer` is not trivial due to the fusing, but conceptually its basic
91  * implementation is equivalent to the following code that can be written using [produce]
92  * coroutine builder to produce a channel and [consumeEach][ReceiveChannel.consumeEach] extension to consume it:
93  *
94  * ```
95  * fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
96  *     coroutineScope { // limit the scope of concurrent producer coroutine
97  *         val channel = produce(capacity = capacity) {
98  *             collect { send(it) } // send all to channel
99  *         }
100  *         // emit all received values
101  *         channel.consumeEach { emit(it) }
102  *     }
103  * }
104  * ```
105  *
106  * ### Conflation
107  *
108  * Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is a shortcut to
109  * `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`, and is available via
110  * a separate [conflate] operator. See its documentation for details.
111  *
112  * @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
113  *   factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
114  *   [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
115  *   an explicitly requested size.
116  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
117  *   [SUSPEND][BufferOverflow.SUSPEND], supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
118  *   implicitly creates a channel with at least one buffered element).
119  */
120 @Suppress("NAME_SHADOWING")
buffernull121 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
122     require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
123         "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
124     }
125     require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
126         "CONFLATED capacity cannot be used with non-default onBufferOverflow"
127     }
128     // desugar CONFLATED capacity to (0, DROP_OLDEST)
129     var capacity = capacity
130     var onBufferOverflow = onBufferOverflow
131     if (capacity == CONFLATED) {
132         capacity = 0
133         onBufferOverflow = BufferOverflow.DROP_OLDEST
134     }
135     // create a flow
136     return when (this) {
137         is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
138         else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
139     }
140 }
141 
142 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
buffernull143 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capacity)
144 
145 /**
146  * Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
147  * The effect of this is that emitter is never suspended due to a slow collector, but collector
148  * always gets the most recent value emitted.
149  *
150  * For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
151  *
152  * ```
153  * val flow = flow {
154  *     for (i in 1..30) {
155  *         delay(100)
156  *         emit(i)
157  *     }
158  * }
159  * ```
160  *
161  * Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
162  * integers 1, 10, 20, 30:
163  *
164  * ```
165  * val result = flow.conflate().onEach { delay(1000) }.toList()
166  * assertEquals(listOf(1, 10, 20, 30), result)
167  * ```
168  *
169  * Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED],
170  * which is, in turn, a shortcut to a buffer that only keeps the latest element as
171  * created by `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`.
172  *
173  * ### Operator fusion
174  *
175  * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
176  * always fused so that only one properly configured channel is used for execution.
177  * **Conflation takes precedence over `buffer()` calls with any other capacity.**
178  *
179  * Note that any instance of [StateFlow] already behaves as if `conflate` operator is
180  * applied to it, so applying `conflate` to a `StateFlow` has no effect.
181  * See [StateFlow] documentation on Operator Fusion.
182  */
183 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
184 
185 /**
186  * Changes the context where this flow is executed to the given [context].
187  * This operator is composable and affects only preceding operators that do not have its own context.
188  * This operator is context preserving: [context] **does not** leak into the downstream flow.
189  *
190  * For example:
191  *
192  * ```
193  * withContext(Dispatchers.Main) {
194  *     val singleValue = intFlow // will be executed on IO if context wasn't specified before
195  *         .map { ... } // Will be executed in IO
196  *         .flowOn(Dispatchers.IO)
197  *         .filter { ... } // Will be executed in Default
198  *         .flowOn(Dispatchers.Default)
199  *         .single() // Will be executed in the Main
200  * }
201  * ```
202  *
203  * For more explanation of context preservation please refer to [Flow] documentation.
204  *
205  * This operator retains a _sequential_ nature of flow if changing the context does not call for changing
206  * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
207  * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
208  * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
209  * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
210  * before or after `flowOn`, which requests buffering behavior and specifies channel size.
211  *
212  * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
213  * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
214  * was already emitted by the upstream flow.
215  *
216  * ### Operator fusion
217  *
218  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
219  * always fused so that only one properly configured channel is used for execution.
220  *
221  * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
222  * the first `flowOn` operator naturally take precedence over the elements of the second `flowOn` operator
223  * when they have the same context keys, for example:
224  *
225  * ```
226  * flow.map { ... } // Will be executed in IO
227  *     .flowOn(Dispatchers.IO) // This one takes precedence
228  *     .flowOn(Dispatchers.Default)
229  * ```
230  *
231  * Note that an instance of [SharedFlow] does not have an execution context by itself,
232  * so applying `flowOn` to a `SharedFlow` has not effect. See the [SharedFlow] documentation on Operator Fusion.
233  *
234  * @throws [IllegalArgumentException] if provided context contains [Job] instance.
235  */
236 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
237     checkFlowContext(context)
238     return when {
239         context == EmptyCoroutineContext -> this
240         this is FusibleFlow -> fuse(context = context)
241         else -> ChannelFlowOperatorImpl(this, context = context)
242     }
243 }
244 
245 /**
246  * Returns a flow which checks cancellation status on each emission and throws
247  * the corresponding cancellation cause if flow collector was cancelled.
248  * Note that [flow] builder and all implementations of [SharedFlow] are [cancellable] by default.
249  *
250  * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
251  * See [ensureActive][CoroutineContext.ensureActive] for details.
252  */
cancellablenull253 public fun <T> Flow<T>.cancellable(): Flow<T> =
254     when (this) {
255         is CancellableFlow<*> -> this // Fast-path, already cancellable
256         else -> CancellableFlowImpl(this)
257     }
258 
259 /**
260  * Internal marker for flows that are [cancellable].
261  */
262 internal interface CancellableFlow<out T> : Flow<T>
263 
264 /**
265  * Named implementation class for a flow that is defined by the [cancellable] function.
266  */
267 private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
collectnull268     override suspend fun collect(collector: FlowCollector<T>) {
269         flow.collect {
270             currentCoroutineContext().ensureActive()
271             collector.emit(it)
272         }
273     }
274 }
275 
checkFlowContextnull276 private fun checkFlowContext(context: CoroutineContext) {
277     require(context[Job] == null) {
278         "Flow context cannot contain job in it. Had $context"
279     }
280 }
281