• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused")
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import kotlinx.coroutines.flow.internal.*
14 import kotlinx.coroutines.internal.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17 
18 /**
19  * Name of the property that defines the value of [DEFAULT_CONCURRENCY].
20  */
21 @FlowPreview
22 public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"
23 
24 /**
25  * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
26  * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
27  */
28 @FlowPreview
29 public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
30     16, 1, Int.MAX_VALUE
31 )
32 
33 /**
34  * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
35  * and then concatenating and flattening these flows.
36  *
37  * This method is is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
38  *
39  * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
40  * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
41  */
42 @FlowPreview
43 public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
44     map(transform).flattenConcat()
45 
46 /**
47  * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
48  * and then merging and flattening these flows.
49  *
50  * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
51  * limit on the number of concurrently collected flows.
52  * It is a shortcut for `map(transform).flattenMerge(concurrency)`.
53  * See [flattenMerge] for details.
54  *
55  * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
56  * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
57  *
58  * ### Operator fusion
59  *
60  * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
61  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
62  *
63  * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
64  * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
65  */
66 @FlowPreview
67 public fun <T, R> Flow<T>.flatMapMerge(
68     concurrency: Int = DEFAULT_CONCURRENCY,
69     transform: suspend (value: T) -> Flow<R>
70 ): Flow<R> =
71     map(transform).flattenMerge(concurrency)
72 
73 /**
74  * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
75  * This method is conceptually identical to `flattenMerge(concurrency = 1)` but has faster implementation.
76  *
77  * Inner flows are collected by this operator *sequentially*.
78  */
79 @FlowPreview
80 public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
81     collect { value -> emitAll(value) }
82 }
83 
84 /**
85  * Merges the given flows into a single flow without preserving an order of elements.
86  * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
87  *
88  * ### Operator fusion
89  *
90  * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
91  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
92  */
93 @ExperimentalCoroutinesApi
mergenull94 public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
95     /*
96      * This is a fuseable implementation of the following operator:
97      * channelFlow {
98      *    forEach { flow ->
99      *        launch {
100      *            flow.collect { send(it) }
101      *        }
102      *    }
103      * }
104      */
105     return ChannelLimitedFlowMerge(this)
106 }
107 
108 /**
109  * Merges the given flows into a single flow without preserving an order of elements.
110  * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
111  *
112  * ### Operator fusion
113  *
114  * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
115  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
116  */
117 @ExperimentalCoroutinesApi
mergenull118 public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119 
120 /**
121  * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
122  * concurrently collected flows.
123  *
124  * If [concurrency] is more than 1, then inner flows are be collected by this operator *concurrently*.
125  * With `concurrency == 1` this operator is identical to [flattenConcat].
126  *
127  * ### Operator fusion
128  *
129  * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
130  * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
131  *
132  * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
133  * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
134  */
135 @FlowPreview
136 public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
137     require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
138     return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
139 }
140 
141 /**
142  * Returns a flow that produces element by [transform] function every time the original flow emits a value.
143  * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
144  *
145  * For example, the following flow:
146  * ```
147  * flow {
148  *     emit("a")
149  *     delay(100)
150  *     emit("b")
151  * }.transformLatest { value ->
152  *     emit(value)
153  *     delay(200)
154  *     emit(value + "_last")
155  * }
156  * ```
157  * produces `a b b_last`.
158  *
159  * This operator is [buffered][buffer] by default
160  * and size of its output buffer can be changed by applying subsequent [buffer] operator.
161  */
162 @ExperimentalCoroutinesApi
transformLatestnull163 public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
164     ChannelFlowTransformLatest(transform, this)
165 
166 /**
167  * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
168  * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
169  *
170  * For example, the following flow:
171  * ```
172  * flow {
173  *     emit("a")
174  *     delay(100)
175  *     emit("b")
176  * }.flatMapLatest { value ->
177  *     flow {
178  *         emit(value)
179  *         delay(200)
180  *         emit(value + "_last")
181  *     }
182  * }
183  * ```
184  * produces `a b b_last`
185  *
186  * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
187  */
188 @ExperimentalCoroutinesApi
189 public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
190     transformLatest { emitAll(transform(it)) }
191 
192 /**
193  * Returns a flow that emits elements from the original flow transformed by [transform] function.
194  * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
195  *
196  * For example, the following flow:
197  * ```
198  * flow {
199  *     emit("a")
200  *     delay(100)
201  *     emit("b")
202  * }.mapLatest { value ->
203  *     println("Started computing $value")
204  *     delay(200)
205  *     "Computed $value"
206  * }
207  * ```
208  * will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
209  *
210  * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
211  */
212 @ExperimentalCoroutinesApi
mapLatestnull213 public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
214     transformLatest { emit(transform(it)) }
215