• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused")
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import kotlinx.coroutines.flow.internal.*
14 import kotlinx.coroutines.internal.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17 
18 /**
19  * Name of the property that defines the value of [DEFAULT_CONCURRENCY].
20  * This is a preview API and can be changed in a backwards-incompatible manner within a single release.
21  */
22 @FlowPreview
23 public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"
24 
25 /**
26  * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
27  * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
28  * This is a preview API and can be changed in a backwards-incompatible manner within a single release.
29  */
30 @FlowPreview
31 public val DEFAULT_CONCURRENCY: Int = systemProp(
32     DEFAULT_CONCURRENCY_PROPERTY_NAME,
33     16, 1, Int.MAX_VALUE
34 )
35 
36 /**
37  * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
38  * and then concatenating and flattening these flows.
39  *
40  * This method is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
41  *
42  * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
43  * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
44  */
45 @ExperimentalCoroutinesApi
46 public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
47     map(transform).flattenConcat()
48 
49 /**
50  * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
51  * and then merging and flattening these flows.
52  *
53  * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
54  * limit on the number of concurrently collected flows.
55  * It is a shortcut for `map(transform).flattenMerge(concurrency)`.
56  * See [flattenMerge] for details.
57  *
58  * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
59  * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
60  *
61  * ### Operator fusion
62  *
63  * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
64  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
65  *
66  * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
67  * at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
68  */
69 @ExperimentalCoroutinesApi
70 public fun <T, R> Flow<T>.flatMapMerge(
71     concurrency: Int = DEFAULT_CONCURRENCY,
72     transform: suspend (value: T) -> Flow<R>
73 ): Flow<R> =
74     map(transform).flattenMerge(concurrency)
75 
76 /**
77  * Flattens the given flow of flows into a single flow in a sequential manner, without interleaving nested flows.
78  *
79  * Inner flows are collected by this operator *sequentially*.
80  */
81 @ExperimentalCoroutinesApi
82 public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
83     collect { value -> emitAll(value) }
84 }
85 
86 /**
87  * Merges the given flows into a single flow without preserving an order of elements.
88  * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
89  *
90  * ### Operator fusion
91  *
92  * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
93  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
94  */
mergenull95 public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
96     /*
97      * This is a fuseable implementation of the following operator:
98      * channelFlow {
99      *    forEach { flow ->
100      *        launch {
101      *            flow.collect { send(it) }
102      *        }
103      *    }
104      * }
105      */
106     return ChannelLimitedFlowMerge(this)
107 }
108 
109 /**
110  * Merges the given flows into a single flow without preserving an order of elements.
111  * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
112  *
113  * ### Operator fusion
114  *
115  * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
116  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
117  */
mergenull118 public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119 
120 /**
121  * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
122  * concurrently collected flows.
123  *
124  * If [concurrency] is more than 1, then inner flows are collected by this operator *concurrently*.
125  * With `concurrency == 1` this operator is identical to [flattenConcat].
126  *
127  * ### Operator fusion
128  *
129  * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
130  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
131  *
132  * When [concurrency] is greater than 1, this operator is [buffered][buffer] by default
133  * and size of its output buffer can be changed by applying subsequent [buffer] operator.
134  *
135  * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
136  * at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
137  */
138 @ExperimentalCoroutinesApi
139 public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
140     require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
141     return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
142 }
143 
144 /**
145  * Returns a flow that produces element by [transform] function every time the original flow emits a value.
146  * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
147  *
148  * For example, the following flow:
149  * ```
150  * flow {
151  *     emit("a")
152  *     delay(100)
153  *     emit("b")
154  * }.transformLatest { value ->
155  *     emit(value)
156  *     delay(200)
157  *     emit(value + "_last")
158  * }
159  * ```
160  * produces `a b b_last`.
161  *
162  * This operator is [buffered][buffer] by default
163  * and size of its output buffer can be changed by applying subsequent [buffer] operator.
164  */
165 @ExperimentalCoroutinesApi
transformLatestnull166 public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
167     ChannelFlowTransformLatest(transform, this)
168 
169 /**
170  * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
171  * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
172  *
173  * For example, the following flow:
174  * ```
175  * flow {
176  *     emit("a")
177  *     delay(100)
178  *     emit("b")
179  * }.flatMapLatest { value ->
180  *     flow {
181  *         emit(value)
182  *         delay(200)
183  *         emit(value + "_last")
184  *     }
185  * }
186  * ```
187  * produces `a b b_last`
188  *
189  * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
190  */
191 @ExperimentalCoroutinesApi
192 public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
193     transformLatest { emitAll(transform(it)) }
194 
195 /**
196  * Returns a flow that emits elements from the original flow transformed by [transform] function.
197  * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
198  *
199  * For example, the following flow:
200  * ```
201  * flow {
202  *     emit("a")
203  *     delay(100)
204  *     emit("b")
205  * }.mapLatest { value ->
206  *     println("Started computing $value")
207  *     delay(200)
208  *     "Computed $value"
209  * }
210  * ```
211  * will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
212  *
213  * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
214  */
215 @ExperimentalCoroutinesApi
mapLatestnull216 public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
217     transformLatest { emit(transform(it)) }
218