• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 
8 package kotlinx.coroutines.flow
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.channels.*
12 import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
13 import kotlinx.coroutines.flow.internal.*
14 import kotlin.coroutines.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17 
18 /**
19  * Creates a _cold_ flow from the given suspendable [block].
20  * The flow being _cold_ means that the [block] is called every time a terminal operator is applied to the resulting flow.
21  *
22  * Example of usage:
23  *
24  * ```
25  * fun fibonacci(): Flow<BigInteger> = flow {
26  *     var x = BigInteger.ZERO
27  *     var y = BigInteger.ONE
28  *     while (true) {
29  *         emit(x)
30  *         x = y.also {
31  *             y += x
32  *         }
33  *     }
34  * }
35  *
36  * fibonacci().take(100).collect { println(it) }
37  * ```
38  *
39  * Emissions from [flow] builder are [cancellable] by default &mdash; each call to [emit][FlowCollector.emit]
40  * also calls [ensureActive][CoroutineContext.ensureActive].
41  *
42  * `emit` should happen strictly in the dispatchers of the [block] in order to preserve the flow context.
43  * For example, the following code will result in an [IllegalStateException]:
44  *
45  * ```
46  * flow {
47  *     emit(1) // Ok
48  *     withContext(Dispatcher.IO) {
49  *         emit(2) // Will fail with ISE
50  *     }
51  * }
52  * ```
53  *
54  * If you want to switch the context of execution of a flow, use the [flowOn] operator.
55  */
56 public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
57 
58 // Named anonymous object
59 private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
60     override suspend fun collectSafely(collector: FlowCollector<T>) {
61         collector.block()
62     }
63 }
64 
65 /**
66  * Creates a _cold_ flow that produces a single value from the given functional type.
67  */
<lambda>null68 public fun <T> (() -> T).asFlow(): Flow<T> = flow {
69     emit(invoke())
70 }
71 
72 /**
73  * Creates a _cold_ flow that produces a single value from the given functional type.
74  *
75  * Example of usage:
76  *
77  * ```
78  * suspend fun remoteCall(): R = ...
79  * fun remoteCallFlow(): Flow<R> = ::remoteCall.asFlow()
80  * ```
81  */
<lambda>null82 public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
83     emit(invoke())
84 }
85 
86 /**
87  * Creates a _cold_ flow that produces values from the given iterable.
88  */
<lambda>null89 public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
90     forEach { value ->
91         emit(value)
92     }
93 }
94 
95 /**
96  * Creates a _cold_ flow that produces values from the given iterator.
97  */
<lambda>null98 public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
99     forEach { value ->
100         emit(value)
101     }
102 }
103 
104 /**
105  * Creates a _cold_ flow that produces values from the given sequence.
106  */
<lambda>null107 public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
108     forEach { value ->
109         emit(value)
110     }
111 }
112 
113 /**
114  * Creates a flow that produces values from the specified `vararg`-arguments.
115  *
116  * Example of usage:
117  *
118  * ```
119  * flowOf(1, 2, 3)
120  * ```
121  */
<lambda>null122 public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
123     for (element in elements) {
124         emit(element)
125     }
126 }
127 
128 /**
129  * Creates a flow that produces the given [value].
130  */
<lambda>null131 public fun <T> flowOf(value: T): Flow<T> = flow {
132     /*
133      * Implementation note: this is just an "optimized" overload of flowOf(vararg)
134      * which significantly reduces the footprint of widespread single-value flows.
135      */
136     emit(value)
137 }
138 
139 /**
140  * Returns an empty flow.
141  */
emptyFlownull142 public fun <T> emptyFlow(): Flow<T> = EmptyFlow
143 
144 private object EmptyFlow : Flow<Nothing> {
145     override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
146 }
147 
148 /**
149  * Creates a _cold_ flow that produces values from the given array.
150  * The flow being _cold_ means that the array components are read every time a terminal operator is applied
151  * to the resulting flow.
152  */
<lambda>null153 public fun <T> Array<T>.asFlow(): Flow<T> = flow {
154     forEach { value ->
155         emit(value)
156     }
157 }
158 
159 /**
160  * Creates a _cold_ flow that produces values from the array.
161  * The flow being _cold_ means that the array components are read every time a terminal operator is applied
162  * to the resulting flow.
163  */
<lambda>null164 public fun IntArray.asFlow(): Flow<Int> = flow {
165     forEach { value ->
166         emit(value)
167     }
168 }
169 
170 /**
171  * Creates a _cold_ flow that produces values from the given array.
172  * The flow being _cold_ means that the array components are read every time a terminal operator is applied
173  * to the resulting flow.
174  */
<lambda>null175 public fun LongArray.asFlow(): Flow<Long> = flow {
176     forEach { value ->
177         emit(value)
178     }
179 }
180 
181 /**
182  * Creates a flow that produces values from the range.
183  */
<lambda>null184 public fun IntRange.asFlow(): Flow<Int> = flow {
185     forEach { value ->
186         emit(value)
187     }
188 }
189 
190 /**
191  * Creates a flow that produces values from the range.
192  */
<lambda>null193 public fun LongRange.asFlow(): Flow<Long> = flow {
194     forEach { value ->
195         emit(value)
196     }
197 }
198 
199 /**
200  * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel]
201  * provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
202  * produced by code that is running in a different context or concurrently.
203  * The resulting flow is _cold_, which means that [block] is called every time a terminal operator
204  * is applied to the resulting flow.
205  *
206  * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used
207  * concurrently from different contexts.
208  * The resulting flow completes as soon as the code in the [block] and all its children completes.
209  * Use [awaitClose] as the last statement to keep it running.
210  * A more detailed example is provided in the documentation of [callbackFlow].
211  *
212  * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
213  * resulting flow to specify a user-defined value and to control what happens when data is produced faster
214  * than consumed, i.e. to control the back-pressure behavior.
215  *
216  * Adjacent applications of [channelFlow], [flowOn], [buffer], and [produceIn] are
217  * always fused so that only one properly configured channel is used for execution.
218  *
219  * Examples of usage:
220  *
221  * ```
222  * fun <T> Flow<T>.merge(other: Flow<T>): Flow<T> = channelFlow {
223  *     // collect from one coroutine and send it
224  *     launch {
225  *         collect { send(it) }
226  *     }
227  *     // collect and send from this coroutine, too, concurrently
228  *     other.collect { send(it) }
229  * }
230  *
231  * fun <T> contextualFlow(): Flow<T> = channelFlow {
232  *     // send from one coroutine
233  *     launch(Dispatchers.IO) {
234  *         send(computeIoValue())
235  *     }
236  *     // send from another coroutine, concurrently
237  *     launch(Dispatchers.Default) {
238  *         send(computeCpuValue())
239  *     }
240  * }
241  * ```
242  */
channelFlownull243 public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> =
244     ChannelFlowBuilder(block)
245 
246 /**
247  * Creates an instance of a _cold_ [Flow] with elements that are sent to a [SendChannel]
248  * provided to the builder's [block] of code via [ProducerScope]. It allows elements to be
249  * produced by code that is running in a different context or concurrently.
250  *
251  * The resulting flow is _cold_, which means that [block] is called every time a terminal operator
252  * is applied to the resulting flow.
253  *
254  * This builder ensures thread-safety and context preservation, thus the provided [ProducerScope] can be used
255  * from any context, e.g. from a callback-based API.
256  * The resulting flow completes as soon as the code in the [block] completes.
257  * [awaitClose] should be used to keep the flow running, otherwise the channel will be closed immediately
258  * when block completes.
259  * [awaitClose] argument is called either when a flow consumer cancels the flow collection
260  * or when a callback-based API invokes [SendChannel.close] manually and is typically used
261  * to cleanup the resources after the completion, e.g. unregister a callback.
262  * Using [awaitClose] is mandatory in order to prevent memory leaks when the flow collection is cancelled,
263  * otherwise the callback may keep running even when the flow collector is already completed.
264  * To avoid such leaks, this method throws [IllegalStateException] if block returns, but the channel
265  * is not closed yet.
266  *
267  * A channel with the [default][Channel.BUFFERED] buffer size is used. Use the [buffer] operator on the
268  * resulting flow to specify a user-defined value and to control what happens when data is produced faster
269  * than consumed, i.e. to control the back-pressure behavior.
270  *
271  * Adjacent applications of [callbackFlow], [flowOn], [buffer], and [produceIn] are
272  * always fused so that only one properly configured channel is used for execution.
273  *
274  * Example of usage that converts a multi-shot callback API to a flow.
275  * For single-shot callbacks use [suspendCancellableCoroutine].
276  *
277  * ```
278  * fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
279  *     val callback = object : Callback { // Implementation of some callback interface
280  *         override fun onNextValue(value: T) {
281  *             // To avoid blocking you can configure channel capacity using
282  *             // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
283  *             trySendBlocking(value)
284  *                 .onFailure { throwable ->
285  *                     // Downstream has been cancelled or failed, can log here
286  *                 }
287  *         }
288  *         override fun onApiError(cause: Throwable) {
289  *             cancel(CancellationException("API Error", cause))
290  *         }
291  *         override fun onCompleted() = channel.close()
292  *     }
293  *     api.register(callback)
294  *     /*
295  *      * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
296  *      * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
297  *      * In both cases, callback will be properly unregistered.
298  *      */
299  *     awaitClose { api.unregister(callback) }
300  * }
301  * ```
302  *
303  * > The callback `register`/`unregister` methods provided by an external API must be thread-safe, because
304  * > `awaitClose` block can be called at any time due to asynchronous nature of cancellation, even
305  * > concurrently with the call of the callback.
306  */
307 public fun <T> callbackFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T> = CallbackFlowBuilder(block)
308 
309 // ChannelFlow implementation that is the first in the chain of flow operations and introduces (builds) a flow
310 private open class ChannelFlowBuilder<T>(
311     private val block: suspend ProducerScope<T>.() -> Unit,
312     context: CoroutineContext = EmptyCoroutineContext,
313     capacity: Int = BUFFERED,
314     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
315 ) : ChannelFlow<T>(context, capacity, onBufferOverflow) {
316     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
317         ChannelFlowBuilder(block, context, capacity, onBufferOverflow)
318 
319     override suspend fun collectTo(scope: ProducerScope<T>) =
320         block(scope)
321 
322     override fun toString(): String =
323         "block[$block] -> ${super.toString()}"
324 }
325 
326 private class CallbackFlowBuilder<T>(
327     private val block: suspend ProducerScope<T>.() -> Unit,
328     context: CoroutineContext = EmptyCoroutineContext,
329     capacity: Int = BUFFERED,
330     onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
331 ) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
332 
collectTonull333     override suspend fun collectTo(scope: ProducerScope<T>) {
334         super.collectTo(scope)
335         /*
336          * We expect user either call `awaitClose` from within a block (then the channel is closed at this moment)
337          * or being closed/cancelled externally/manually. Otherwise "user forgot to call
338          * awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected.
339          */
340         if (!scope.isClosedForSend) {
341             throw IllegalStateException(
342                 """
343                     'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
344                     Otherwise, a callback/listener may leak in case of external cancellation.
345                     See callbackFlow API documentation for the details.
346                 """.trimIndent()
347             )
348         }
349     }
350 
createnull351     override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
352         CallbackFlowBuilder(block, context, capacity, onBufferOverflow)
353 }
354