1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7
8 package kotlinx.coroutines.flow
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
13 import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
14 import kotlinx.coroutines.flow.internal.*
15 import kotlin.coroutines.*
16 import kotlin.jvm.*
17
18 /**
19 * Buffers flow emissions via channel of a specified capacity and runs collector in a separate coroutine.
20 *
21 * Normally, [flows][Flow] are _sequential_. It means that the code of all operators is executed in the
22 * same coroutine. For example, consider the following code using [onEach] and [collect] operators:
23 *
24 * ```
25 * flowOf("A", "B", "C")
26 * .onEach { println("1$it") }
27 * .collect { println("2$it") }
28 * ```
29 *
30 * It is going to be executed in the following order by the coroutine `Q` that calls this code:
31 *
32 * ```
33 * Q : -->-- [1A] -- [2A] -- [1B] -- [2B] -- [1C] -- [2C] -->--
34 * ```
35 *
36 * So if the operator's code takes considerable time to execute, then the total execution time is going to be
37 * the sum of execution times for all operators.
38 *
39 * The `buffer` operator creates a separate coroutine during execution for the flow it applies to.
40 * Consider the following code:
41 *
42 * ```
43 * flowOf("A", "B", "C")
44 * .onEach { println("1$it") }
45 * .buffer() // <--------------- buffer between onEach and collect
46 * .collect { println("2$it") }
47 * ```
48 *
49 * It will use two coroutines for execution of the code. A coroutine `Q` that calls this code is
50 * going to execute `collect`, and the code before `buffer` will be executed in a separate
51 * new coroutine `P` concurrently with `Q`:
52 *
53 * ```
54 * P : -->-- [1A] -- [1B] -- [1C] ---------->-- // flowOf(...).onEach { ... }
55 *
56 * |
57 * | channel // buffer()
58 * V
59 *
60 * Q : -->---------- [2A] -- [2B] -- [2C] -->-- // collect
61 * ```
62 *
63 * When the operator's code takes some time to execute, this decreases the total execution time of the flow.
64 * A [channel][Channel] is used between the coroutines to send elements emitted by the coroutine `P` to
65 * the coroutine `Q`. If the code before `buffer` operator (in the coroutine `P`) is faster than the code after
66 * `buffer` operator (in the coroutine `Q`), then this channel will become full at some point and will suspend
67 * the producer coroutine `P` until the consumer coroutine `Q` catches up.
68 * The [capacity] parameter defines the size of this buffer.
69 *
70 * ### Buffer overflow
71 *
72 * By default, the emitter is suspended when the buffer overflows, to let collector catch up. This strategy can be
73 * overridden with an optional [onBufferOverflow] parameter so that the emitter is never suspended. In this
74 * case, on buffer overflow either the oldest value in the buffer is dropped with the [DROP_OLDEST][BufferOverflow.DROP_OLDEST]
75 * strategy and the latest emitted value is added to the buffer,
76 * or the latest value that is being emitted is dropped with the [DROP_LATEST][BufferOverflow.DROP_LATEST] strategy,
77 * keeping the buffer intact.
78 * To implement either of the custom strategies, a buffer of at least one element is used.
79 *
80 * ### Operator fusion
81 *
82 * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
83 * always fused so that only one properly configured channel is used for execution.
84 *
85 * Explicitly specified buffer capacity takes precedence over `buffer()` or `buffer(Channel.BUFFERED)` calls,
86 * which effectively requests a buffer of any size. Multiple requests with a specified buffer
87 * size produce a buffer with the sum of the requested buffer sizes.
88 *
89 * A `buffer` call with a non-default value of the [onBufferOverflow] parameter overrides all immediately preceding
90 * buffering operators, because it never suspends its upstream, and thus no upstream buffer would ever be used.
91 *
92 * ### Conceptual implementation
93 *
94 * The actual implementation of `buffer` is not trivial due to the fusing, but conceptually its basic
95 * implementation is equivalent to the following code that can be written using [produce]
96 * coroutine builder to produce a channel and [consumeEach][ReceiveChannel.consumeEach] extension to consume it:
97 *
98 * ```
99 * fun <T> Flow<T>.buffer(capacity: Int = DEFAULT): Flow<T> = flow {
100 * coroutineScope { // limit the scope of concurrent producer coroutine
101 * val channel = produce(capacity = capacity) {
102 * collect { send(it) } // send all to channel
103 * }
104 * // emit all received values
105 * channel.consumeEach { emit(it) }
106 * }
107 * }
108 * ```
109 *
110 * ### Conflation
111 *
112 * Usage of this function with [capacity] of [Channel.CONFLATED][Channel.CONFLATED] is a shortcut to
113 * `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`, and is available via
114 * a separate [conflate] operator. See its documentation for details.
115 *
116 * @param capacity type/capacity of the buffer between coroutines. Allowed values are the same as in `Channel(...)`
117 * factory function: [BUFFERED][Channel.BUFFERED] (by default), [CONFLATED][Channel.CONFLATED],
118 * [RENDEZVOUS][Channel.RENDEZVOUS], [UNLIMITED][Channel.UNLIMITED] or a non-negative value indicating
119 * an explicitly requested size.
120 * @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
121 * [SUSPEND][BufferOverflow.SUSPEND], supported only when `capacity >= 0` or `capacity == Channel.BUFFERED`,
122 * implicitly creates a channel with at least one buffered element).
123 */
124 @Suppress("NAME_SHADOWING")
125 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
126 require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
127 "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
128 }
129 require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
130 "CONFLATED capacity cannot be used with non-default onBufferOverflow"
131 }
132 // desugar CONFLATED capacity to (0, DROP_OLDEST)
133 var capacity = capacity
134 var onBufferOverflow = onBufferOverflow
135 if (capacity == CONFLATED) {
136 capacity = 0
137 onBufferOverflow = BufferOverflow.DROP_OLDEST
138 }
139 // create a flow
140 return when (this) {
141 is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
142 else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
143 }
144 }
145
146 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.0, binary compatibility with earlier versions")
buffernull147 public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T> = buffer(capacity)
148
149 /**
150 * Conflates flow emissions via conflated channel and runs collector in a separate coroutine.
151 * The effect of this is that emitter is never suspended due to a slow collector, but collector
152 * always gets the most recent value emitted.
153 *
154 * For example, consider the flow that emits integers from 1 to 30 with 100 ms delay between them:
155 *
156 * ```
157 * val flow = flow {
158 * for (i in 1..30) {
159 * delay(100)
160 * emit(i)
161 * }
162 * }
163 * ```
164 *
165 * Applying `conflate()` operator to it allows a collector that delays 1 second on each element to get
166 * integers 1, 10, 20, 30:
167 *
168 * ```
169 * val result = flow.conflate().onEach { delay(1000) }.toList()
170 * assertEquals(listOf(1, 10, 20, 30), result)
171 * ```
172 *
173 * Note that `conflate` operator is a shortcut for [buffer] with `capacity` of [Channel.CONFLATED][Channel.CONFLATED],
174 * with is, in turn, a shortcut to a buffer that only keeps the latest element as
175 * created by `buffer(onBufferOverflow = `[`BufferOverflow.DROP_OLDEST`][BufferOverflow.DROP_OLDEST]`)`.
176 *
177 * ### Operator fusion
178 *
179 * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are
180 * always fused so that only one properly configured channel is used for execution.
181 * **Conflation takes precedence over `buffer()` calls with any other capacity.**
182 *
183 * Note that any instance of [StateFlow] already behaves as if `conflate` operator is
184 * applied to it, so applying `conflate` to a `StateFlow` has not effect.
185 * See [StateFlow] documentation on Operator Fusion.
186 */
187 public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
188
189 /**
190 * Changes the context where this flow is executed to the given [context].
191 * This operator is composable and affects only preceding operators that do not have its own context.
192 * This operator is context preserving: [context] **does not** leak into the downstream flow.
193 *
194 * For example:
195 *
196 * ```
197 * withContext(Dispatchers.Main) {
198 * val singleValue = intFlow // will be executed on IO if context wasn't specified before
199 * .map { ... } // Will be executed in IO
200 * .flowOn(Dispatchers.IO)
201 * .filter { ... } // Will be executed in Default
202 * .flowOn(Dispatchers.Default)
203 * .single() // Will be executed in the Main
204 * }
205 * ```
206 *
207 * For more explanation of context preservation please refer to [Flow] documentation.
208 *
209 * This operator retains a _sequential_ nature of flow if changing the context does not call for changing
210 * the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
211 * flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
212 * with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
213 * between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
214 * before or after `flowOn`, which requests buffering behavior and specifies channel size.
215 *
216 * Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
217 * In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
218 * was already emitted by the upstream flow.
219 *
220 * ### Operator fusion
221 *
222 * Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
223 * always fused so that only one properly configured channel is used for execution.
224 *
225 * Multiple `flowOn` operators fuse to a single `flowOn` with a combined context. The elements of the context of
226 * the first `flowOn` operator naturally take precedence over the elements of the second `flowOn` operator
227 * when they have the same context keys, for example:
228 *
229 * ```
230 * flow.map { ... } // Will be executed in IO
231 * .flowOn(Dispatchers.IO) // This one takes precedence
232 * .flowOn(Dispatchers.Default)
233 * ```
234 *
235 * Note that an instance of [SharedFlow] does not have an execution context by itself,
236 * so applying `flowOn` to a `SharedFlow` has not effect. See the [SharedFlow] documentation on Operator Fusion.
237 *
238 * @throws [IllegalArgumentException] if provided context contains [Job] instance.
239 */
240 public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
241 checkFlowContext(context)
242 return when {
243 context == EmptyCoroutineContext -> this
244 this is FusibleFlow -> fuse(context = context)
245 else -> ChannelFlowOperatorImpl(this, context = context)
246 }
247 }
248
249 /**
250 * Returns a flow which checks cancellation status on each emission and throws
251 * the corresponding cancellation cause if flow collector was cancelled.
252 * Note that [flow] builder and all implementations of [SharedFlow] are [cancellable] by default.
253 *
254 * This operator provides a shortcut for `.onEach { currentCoroutineContext().ensureActive() }`.
255 * See [ensureActive][CoroutineContext.ensureActive] for details.
256 */
cancellablenull257 public fun <T> Flow<T>.cancellable(): Flow<T> =
258 when (this) {
259 is CancellableFlow<*> -> this // Fast-path, already cancellable
260 else -> CancellableFlowImpl(this)
261 }
262
263 /**
264 * Internal marker for flows that are [cancellable].
265 */
266 internal interface CancellableFlow<out T> : Flow<T>
267
268 /**
269 * Named implementation class for a flow that is defined by the [cancellable] function.
270 */
271 private class CancellableFlowImpl<T>(private val flow: Flow<T>) : CancellableFlow<T> {
collectnull272 override suspend fun collect(collector: FlowCollector<T>) {
273 flow.collect {
274 currentCoroutineContext().ensureActive()
275 collector.emit(it)
276 }
277 }
278 }
279
280 /**
281 * The operator that changes the context where all transformations applied to the given flow within a [builder] are executed.
282 * This operator is context preserving and does not affect the context of the preceding and subsequent operations.
283 *
284 * Example:
285 *
286 * ```
287 * flow // not affected
288 * .map { ... } // Not affected
289 * .flowWith(Dispatchers.IO) {
290 * map { ... } // in IO
291 * .filter { ... } // in IO
292 * }
293 * .map { ... } // Not affected
294 * ```
295 *
296 * For more explanation of context preservation please refer to [Flow] documentation.
297 *
298 * This operator is deprecated without replacement because it was discovered that it doesn't play well with coroutines
299 * and flow semantics:
300 *
301 * 1) It doesn't prevent context elements from the downstream to leak into its body
302 * ```
303 * flowOf(1).flowWith(EmptyCoroutineContext) {
304 * onEach { println(kotlin.coroutines.coroutineContext[CoroutineName]) } // Will print 42
305 * }.flowOn(CoroutineName(42))
306 * ```
307 * 2) To avoid such leaks, new primitive should be introduced to `kotlinx.coroutines` -- the subtraction of contexts.
308 * And this will become a new concept to learn, maintain and explain.
309 * 3) It defers the execution of declarative [builder] until the moment of [collection][Flow.collect] similarly
310 * to `Observable.defer`. But it is unexpected because nothing in the name `flowWith` reflects this fact.
311 * 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
312 */
313 @FlowPreview
314 @Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4
flowWithnull315 public fun <T, R> Flow<T>.flowWith(
316 flowContext: CoroutineContext,
317 bufferSize: Int = BUFFERED,
318 builder: Flow<T>.() -> Flow<R>
319 ): Flow<R> {
320 checkFlowContext(flowContext)
321 val source = this
322 return unsafeFlow {
323 /**
324 * Here we should remove a Job instance from the context.
325 * All builders are written using scoping and no global coroutines are launched, so it is safe not to provide explicit Job.
326 * It is also necessary not to mess with cancellation if multiple flowWith are used.
327 */
328 val originalContext = currentCoroutineContext().minusKey(Job)
329 val prepared = source.flowOn(originalContext).buffer(bufferSize)
330 builder(prepared).flowOn(flowContext).buffer(bufferSize).collect { value ->
331 return@collect emit(value)
332 }
333 }
334 }
335
checkFlowContextnull336 private fun checkFlowContext(context: CoroutineContext) {
337 require(context[Job] == null) {
338 "Flow context cannot contain job in it. Had $context"
339 }
340 }
341