1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7
8 package kotlinx.coroutines.flow
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
13 import kotlinx.coroutines.flow.internal.*
14 import kotlin.coroutines.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17
18 /**
19 * Creates a _cold_ flow from the given suspendable [block].
20 * The flow being _cold_ means that the [block] is called every time a terminal operator is applied to the resulting flow.
21 *
22 * Example of usage:
23 *
24 * ```
25 * fun fibonacci(): Flow<BigInteger> = flow {
26 * var x = BigInteger.ZERO
27 * var y = BigInteger.ONE
28 * while (true) {
29 * emit(x)
30 * x = y.also {
31 * y += x
32 * }
33 * }
34 * }
35 *
36 * fibonacci().take(100).collect { println(it) }
37 * ```
38 *
39 * Emissions from [flow] builder are [cancellable] by default — each call to [emit][FlowCollector.emit]
40 * also calls [ensureActive][CoroutineContext.ensureActive].
41 *
42 * `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
43 * For example, the following code will result in an [IllegalStateException]:
44 *
45 * ```
46 * flow {
47 * emit(1) // Ok
48 * withContext(Dispatcher.IO) {
49 * emit(2) // Will fail with ISE
50 * }
51 * }
52 * ```
53 *
54 * If you want to switch the context of execution of a flow, use the [flowOn] operator.
55 */
56 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
57
58 // Named anonymous object
59 private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
60 override suspend fun collectSafely(collector: FlowCollector<T>) {
61 collector.block()
62 }
63 }
64
65 /**
66 * Creates a _cold_ flow that produces a single value from the given functional type.
67 */
<lambda>null68 public fun <T> (() -> T).asFlow(): Flow<T> = flow {
69 emit(invoke())
70 }
71
72 /**
73 * Creates a _cold_ flow that produces a single value from the given functional type.
74 *
75 * Example of usage:
76 *
77 * ```
78 * suspend fun remoteCall(): R = ...
79 * fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
80 * ```
81 */
<lambda>null82 public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
83 emit(invoke())
84 }
85
86 /**
87 * Creates a _cold_ flow that produces values from the given iterable.
88 */
<lambda>null89 public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
90 forEach { value ->
91 emit(value)
92 }
93 }
94
95 /**
96 * Creates a _cold_ flow that produces values from the given iterator.
97 */
<lambda>null98 public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
99 forEach { value ->
100 emit(value)
101 }
102 }
103
104 /**
105 * Creates a _cold_ flow that produces values from the given sequence.
106 */
<lambda>null107 public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
108 forEach { value ->
109 emit(value)
110 }
111 }
112
113 /**
114 * Creates a flow that produces values from the specified `vararg`-arguments.
115 *
116 * Example of usage:
117 *
118 * ```
119 * flowOf(1, 2, 3)
120 * ```
121 */
<lambda>null122 public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
123 for (element in elements) {
124 emit(element)
125 }
126 }
127
128 /**
129 * Creates a flow that produces the given [value].
130 */
<lambda>null131 public fun <T> flowOf(value: T): Flow<T> = flow {
132 /*
133 * Implementation note: this is just an "optimized" overload of flowOf(vararg)
134 * which significantly reduces the footprint of widespread single-value flows.
135 */
136 emit(value)
137 }
138
139 /**
140 * Returns an empty flow.
141 */
emptyFlownull142 public fun <T> emptyFlow(): Flow<T> = EmptyFlow
143
144 private object EmptyFlow : Flow<Nothing> {
145 override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
146 }
147
148 /**
149 * Creates a _cold_ flow that produces values from the given array.
150 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
151 * to the resulting flow.
152 */
<lambda>null153 public fun <T> Array<T>.asFlow(): Flow<T> = flow {
154 forEach { value ->
155 emit(value)
156 }
157 }
158
159 /**
160 * Creates a _cold_ flow that produces values from the array.
161 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
162 * to the resulting flow.
163 */
<lambda>null164 public fun IntArray.asFlow(): Flow<Int> = flow {
165 forEach { value ->
166 emit(value)
167 }
168 }
169
170 /**
171 * Creates a _cold_ flow that produces values from the given array.
172 * The flow being _cold_ means that the array components are read every time a terminal operator is applied
173 * to the resulting flow.
174 */
<lambda>null175 public fun LongArray.asFlow(): Flow<Long> = flow {
176 forEach { value ->
177 emit(value)
178 }
179 }
180
181 /**
182 * Creates a flow that produces values from the range.
183 */
<lambda>null184 public fun IntRange.asFlow(): Flow<Int> = flow {
185 forEach { value ->
186 emit(value)
187 }
188 }
189
190 /**
191 * Creates a flow that produces values from the range.
192 */
<lambda>null193 public fun LongRange.asFlow(): Flow<Long> = flow {
194 forEach { value ->
195 emit(value)
196 }
197 }
198
199 /**
200 * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel]
201 * provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
202 * produced by code that is running in a different context or concurrently.
203 * The resulting flow is _cold_, which means that [block] is called every time a terminal operator
204 * is applied to the resulting flow.
205 *
206 * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used
207 * concurrently from different contexts.
208 * The resulting flow completes as soon as the code in the [block] and all its children completes.
209 * Use [awaitClose] as the last statement to keep it running.
210 * A more detailed example is provided in the documentation of [callbackFlow].
211 *
212 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
213 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
214 * than consumed, i.e. to control the back-pressure behavior.
215 *
216 * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
217 * always fused so that only one properly configured channel is used for execution.
218 *
219 * Examples of usage:
220 *
221 * ```
222 * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
223 * // collect from one coroutine and send it
224 * launch {
225 * collect { send(it) }
226 * }
227 * // collect and send from this coroutine, too, concurrently
228 * other.collect { send(it) }
229 * }
230 *
231 * fun <T> contextualFlow(): Flow<T> = channelFlow {
232 * // send from one coroutine
233 * launch(Dispatchers.IO) {
234 * send(computeIoValue())
235 * }
236 * // send from another coroutine, concurrently
237 * launch(Dispatchers.Default) {
238 * send(computeCpuValue())
239 * }
240 * }
241 * ```
242 */
channelFlownull243 public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
244 ChannelFlowBuilder(block)
245
246 /**
247 * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel]
248 * provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
249 * produced by code that is running in a different context or concurrently.
250 *
251 * The resulting flow is _cold_, which means that [block] is called every time a terminal operator
252 * is applied to the resulting flow.
253 *
254 * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used
255 * from any context, e.g. from a callback-based API.
256 * The resulting flow completes as soon as the code in the [block] completes.
257 * [awaitClose] should be used to keep the flow running, otherwise the channel will be closed immediately
258 * when block completes.
259 * [awaitClose] argument is called either when a flow consumer cancels the flow collection
260 * or when a callback-based API invokes [SendChannel.close] manually and is typically used
261 * to cleanup the resources after the completion, e.g. unregister a callback.
262 * Using [awaitClose] is mandatory in order to prevent memory leaks when the flow collection is cancelled,
263 * otherwise the callback may keep running even when the flow collector is already completed.
264 * To avoid such leaks, this method throws [IllegalStateException] if block returns, but the channel
265 * is not closed yet.
266 *
267 * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
268 * resulting flow to specify a user-defined value and to control what happens when data is produced faster
269 * than consumed, i.e. to control the back-pressure behavior.
270 *
271 * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are
272 * always fused so that only one properly configured channel is used for execution.
273 *
274 * Example of usage that converts a multi-shot callback API to a flow.
275 * For single-shot callbacks use [suspendCancellableCoroutine].
276 *
277 * ```
278 * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
279 * val callback = object : Callback { // Implementation of some callback interface
280 * override fun onNextValue(value: T) {
281 * // To avoid blocking you can configure channel capacity using
282 * // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
283 * trySendBlocking(value)
284 * .onFailure { throwable ->
285 * // Downstream has been cancelled or failed, can log here
286 * }
287 * }
288 * override fun onApiError(cause: Throwable) {
289 * cancel(CancellationException("API Error", cause))
290 * }
291 * override fun onCompleted() = channel.close()
292 * }
293 * api.register(callback)
294 * /*
295 * * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
296 * * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
297 * * In both cases, callback will be properly unregistered.
298 * */
299 * awaitClose { api.unregister(callback) }
300 * }
301 * ```
302 *
303 * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
304 * > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even
305 * > concurrently with the call of the callback.
306 */
307 public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
308
309 // ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
310 private open class ChannelFlowBuilder<T>(
311 private val block: suspend ProducerScope<T>.() -> Unit,
312 context: CoroutineContext = EmptyCoroutineContext,
313 capacity: Int = BUFFERED,
314 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
315 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
316 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
317 ChannelFlowBuilder(block, context, capacity, onBufferOverflow)
318
319 override suspend fun collectTo(scope: ProducerScope<T>) =
320 block(scope)
321
322 override fun toString(): String =
323 "block[$block] -> ${super.toString()}"
324 }
325
326 private class CallbackFlowBuilder<T>(
327 private val block: suspend ProducerScope<T>.() -> Unit,
328 context: CoroutineContext = EmptyCoroutineContext,
329 capacity: Int = BUFFERED,
330 onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
331 ) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
332
collectTonull333 override suspend fun collectTo(scope: ProducerScope<T>) {
334 super.collectTo(scope)
335 /*
336 * We expect user either call `awaitClose` from within a block (then the channel is closed at this moment)
337 * or being closed/cancelled externally/manually. Otherwise "user forgot to call
338 * awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected.
339 */
340 if (!scope.isClosedForSend) {
341 throw IllegalStateException(
342 """
343 'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
344 Otherwise, a callback/listener may leak in case of external cancellation.
345 See callbackFlow API documentation for the details.
346 """.trimIndent()
347 )
348 }
349 }
350
createnull351 override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
352 CallbackFlowBuilder(block, context, capacity, onBufferOverflow)
353 }
354