• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 
8 package kotlinx.coroutines.flow
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
13 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
14 import kotlinx.coroutines.flow.internal.*
15 import kotlin.coroutines.*
16 import kotlin.jvm.*
17 
18 /**
19  * Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
20  *
21  * Normally, [flows][Flow] are _sequential_. It means that the code of all operators is executed in the
22  * same coroutine. For example, consider the following code using [onEach] and [collect] operators:
23  *
24  * ```
25  * flowOf("A", "B", "C")
26  *     .onEach  { println("1$it") }
27  *     .collect { println("2$it") }
28  * ```
29  *
30  * It is going to be executed in the following order by the coroutine `Q` that calls this code:
31  *
32  * ```
33  * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
34  * ```
35  *
36  * So if the operator's code takes considerable time to execute, then the total execution time is going to be
37  * the sum of execution times for all operators.
38  *
39  * The `buffer` operator creates a separate coroutine during execution for the flow it applies to.
40  * Consider the following code:
41  *
42  * ```
43  * flowOf("A", "B", "C")
44  *     .onEach  { println("1$it") }
45  *     .buffer()  // <--------------- buffer between onEach and collect
46  *     .collect { println("2$it") }
47  * ```
48  *
49  * It will use two coroutines for execution of the code. A coroutine `Q` that calls this code is
50  * going to execute `collect`, and the code before `buffer` will be executed in a separate
51  * new coroutine `P` concurrently with `Q`:
52  *
53  * ```
54  * P : -->-- [1A] -- [1B] -- [1C] ---------->--  // flowOf(...).onEach { ... }
55  *
56  *                       |
57  *                       | channel               // buffer()
58  *                       V
59  *
60  * Q : -->---------- [2A] -- [2B] -- [2C] -->--  // collect
61  * ```
62  *
63  * When the operator's code takes some time to execute, this decreases the total execution time of the flow.
64  * A [channel][Channel] is used between the coroutines to send elements emitted by the coroutine `P` to
65  * the coroutine `Q`. If the code before `buffer` operator (in the coroutine `P`) is faster than the code after
66  * `buffer` operator (in the coroutine `Q`), then this channel will become full at some point and will suspend
67  * the producer coroutine `P` until the consumer coroutine `Q` catches up.
68  * The [capacity] parameter defines the size of this buffer.
69  *
70  * ### Buffer overflow
71  *
72  * By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be
73  * overridden with an optional [onBufferOverflow] parameter so that the emitter is never suspended. In this
74  * case, on buffer overflow either the oldest value in the buffer is dropped with the [DROP_OLDEST][BufferOverflow.DROP_OLDEST]
75  * strategy and the latest emitted value is added to the buffer,
76  * or the latest value that is being emitted is dropped with the [DROP_LATEST][BufferOverflow.DROP_LATEST] strategy,
77  * keeping the buffer intact.
78  * To implement either of the custom strategies, a buffer of at least one element is used.
79  *
80  * ### Operator fusion
81  *
82  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
83  * always fused so that only one properly configured channel is used for execution.
84  *
85  * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
86  * which effectively requests a buffer of any size. Multiple requests with a specified buffer
87  * size produce a buffer with the sum of the requested buffer sizes.
88  *
89  * A `buffer` call with a non-default value of the [onBufferOverflow] parameter overrides all immediately preceding
90  * buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.
91  *
92  * ### Conceptual implementation
93  *
94  * The actual implementation of `buffer` is not trivial due to the fusing, but conceptually its basic
95  * implementation is equivalent to the following code that can be written using [produce]
96  * coroutine builder to produce a channel and [consumeEach][ReceiveChannel.consumeEach] extension to consume it:
97  *
98  * ```
99  * fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
100  *     coroutineScope { // limit the scope of concurrent producer coroutine
101  *         val channel = produce(capacity = capacity) {
102  *             collect { send(it) } // send all to channel
103  *         }
104  *         // emit all received values
105  *         channel.consumeEach { emit(it) }
106  *     }
107  * }
108  * ```
109  *
110  * ### Conflation
111  *
112  * Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is a shortcut to
113  * `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`, and is available via
114  * a separate [conflate] operator. See its documentation for details.
115  *
116  * @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
117  *   factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
118  *   [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
119  *   an explicitly requested size.
120  * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
121  *   [SUSPEND][BufferOverflow.SUSPEND], supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
122  *   implicitly creates a channel with at least one buffered element).
123  */
124 @Suppress("NAME_SHADOWING")
buffernull125 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
126     require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
127         "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
128     }
129     require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
130         "CONFLATED capacity cannot be used with non-default onBufferOverflow"
131     }
132     // desugar CONFLATED capacity to (0, DROP_OLDEST)
133     var capacity = capacity
134     var onBufferOverflow = onBufferOverflow
135     if (capacity == CONFLATED) {
136         capacity = 0
137         onBufferOverflow = BufferOverflow.DROP_OLDEST
138     }
139     // create a flow
140     return when (this) {
141         is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
142         else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
143     }
144 }
145 
146 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
buffernull147 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capacity)
148 
149 /**
150  * Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
151  * The effect of this is that emitter is never suspended due to a slow collector, but collector
152  * always gets the most recent value emitted.
153  *
154  * For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
155  *
156  * ```
157  * val flow = flow {
158  *     for (i in 1..30) {
159  *         delay(100)
160  *         emit(i)
161  *     }
162  * }
163  * ```
164  *
165  * Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
166  * integers 1, 10, 20, 30:
167  *
168  * ```
169  * val result = flow.conflate().onEach { delay(1000) }.toList()
170  * assertEquals(listOf(1, 10, 20, 30), result)
171  * ```
172  *
173  * Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED],
174  * which is, in turn, a shortcut to a buffer that only keeps the latest element as
175  * created by `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`.
176  *
177  * ### Operator fusion
178  *
179  * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn] and [produceIn] are
180  * always fused so that only one properly configured channel is used for execution.
181  * **Conflation takes precedence over `buffer()` calls with any other capacity.**
182  *
183  * Note that any instance of [StateFlow] already behaves as if `conflate` operator is
184  * applied to it, so applying `conflate` to a `StateFlow` has no effect.
185  * See [StateFlow] documentation on Operator Fusion.
186  */
187 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
188 
189 /**
190  * Changes the context where this flow is executed to the given [context].
191  * This operator is composable and affects only preceding operators that do not have its own context.
192  * This operator is context preserving: [context] **does not** leak into the downstream flow.
193  *
194  * For example:
195  *
196  * ```
197  * withContext(Dispatchers.Main) {
198  *     val singleValue = intFlow // will be executed on IO if context wasn't specified before
199  *         .map { ... } // Will be executed in IO
200  *         .flowOn(Dispatchers.IO)
201  *         .filter { ... } // Will be executed in Default
202  *         .flowOn(Dispatchers.Default)
203  *         .single() // Will be executed in the Main
204  * }
205  * ```
206  *
207  * For more explanation of context preservation please refer to [Flow] documentation.
208  *
209  * This operator retains a _sequential_ nature of flow if changing the context does not call for changing
210  * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
211  * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
212  * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
213  * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
214  * before or after `flowOn`, which requests buffering behavior and specifies channel size.
215  *
216  * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
217  * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
218  * was already emitted by the upstream flow.
219  *
220  * ### Operator fusion
221  *
222  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
223  * always fused so that only one properly configured channel is used for execution.
224  *
225  * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
226  * the first `flowOn` operator naturally take precedence over the elements of the second `flowOn` operator
227  * when they have the same context keys, for example:
228  *
229  * ```
230  * flow.map { ... } // Will be executed in IO
231  *     .flowOn(Dispatchers.IO) // This one takes precedence
232  *     .flowOn(Dispatchers.Default)
233  * ```
234  *
235  * Note that an instance of [SharedFlow] does not have an execution context by itself,
236  * so applying `flowOn` to a `SharedFlow` has not effect. See the [SharedFlow] documentation on Operator Fusion.
237  *
238  * @throws [IllegalArgumentException] if provided context contains [Job] instance.
239  */
240 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
241     checkFlowContext(context)
242     return when {
243         context == EmptyCoroutineContext -> this
244         this is FusibleFlow -> fuse(context = context)
245         else -> ChannelFlowOperatorImpl(this, context = context)
246     }
247 }
248 
249 /**
250  * Returns a flow which checks cancellation status on each emission and throws
251  * the corresponding cancellation cause if flow collector was cancelled.
252  * Note that [flow] builder and all implementations of [SharedFlow] are [cancellable] by default.
253  *
254  * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
255  * See [ensureActive][CoroutineContext.ensureActive] for details.
256  */
cancellablenull257 public fun <T> Flow<T>.cancellable(): Flow<T> =
258     when (this) {
259         is CancellableFlow<*> -> this // Fast-path, already cancellable
260         else -> CancellableFlowImpl(this)
261     }
262 
263 /**
264  * Internal marker for flows that are [cancellable].
265  */
266 internal interface CancellableFlow<out T> : Flow<T>
267 
268 /**
269  * Named implementation class for a flow that is defined by the [cancellable] function.
270  */
271 private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
collectnull272     override suspend fun collect(collector: FlowCollector<T>) {
273         flow.collect {
274             currentCoroutineContext().ensureActive()
275             collector.emit(it)
276         }
277     }
278 }
279 
checkFlowContextnull280 private fun checkFlowContext(context: CoroutineContext) {
281     require(context[Job] == null) {
282         "Flow context cannot contain job in it. Had $context"
283     }
284 }
285