1 /*
<lambda>null2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused")
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import kotlinx.coroutines.flow.internal.*
14 import kotlinx.coroutines.internal.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17
18 /**
19 * Name of the property that defines the value of [DEFAULT_CONCURRENCY].
20 */
21 @FlowPreview
22 public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"
23
24 /**
25 * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
26 * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
27 */
28 @FlowPreview
29 public val DEFAULT_CONCURRENCY: Int = systemProp(DEFAULT_CONCURRENCY_PROPERTY_NAME,
30 16, 1, Int.MAX_VALUE
31 )
32
33 /**
34 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
35 * and then concatenating and flattening these flows.
36 *
37 * This method is is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
38 *
39 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
40 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
41 */
42 @FlowPreview
43 public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
44 map(transform).flattenConcat()
45
46 /**
47 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
48 * and then merging and flattening these flows.
49 *
50 * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
51 * limit on the number of concurrently collected flows.
52 * It is a shortcut for `map(transform).flattenMerge(concurrency)`.
53 * See [flattenMerge] for details.
54 *
55 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
56 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
57 *
58 * ### Operator fusion
59 *
60 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
61 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
62 *
63 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
64 * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
65 */
66 @FlowPreview
67 public fun <T, R> Flow<T>.flatMapMerge(
68 concurrency: Int = DEFAULT_CONCURRENCY,
69 transform: suspend (value: T) -> Flow<R>
70 ): Flow<R> =
71 map(transform).flattenMerge(concurrency)
72
73 /**
74 * Flattens the given flow of flows into a single flow in a sequentially manner, without interleaving nested flows.
75 * This method is conceptually identical to `flattenMerge(concurrency = 1)` but has faster implementation.
76 *
77 * Inner flows are collected by this operator *sequentially*.
78 */
79 @FlowPreview
80 public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
81 collect { value -> emitAll(value) }
82 }
83
84 /**
85 * Merges the given flows into a single flow without preserving an order of elements.
86 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
87 *
88 * ### Operator fusion
89 *
90 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
91 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
92 */
93 @ExperimentalCoroutinesApi
mergenull94 public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
95 /*
96 * This is a fuseable implementation of the following operator:
97 * channelFlow {
98 * forEach { flow ->
99 * launch {
100 * flow.collect { send(it) }
101 * }
102 * }
103 * }
104 */
105 return ChannelLimitedFlowMerge(this)
106 }
107
108 /**
109 * Merges the given flows into a single flow without preserving an order of elements.
110 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
111 *
112 * ### Operator fusion
113 *
114 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
115 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
116 */
117 @ExperimentalCoroutinesApi
mergenull118 public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119
120 /**
121 * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
122 * concurrently collected flows.
123 *
124 * If [concurrency] is more than 1, then inner flows are be collected by this operator *concurrently*.
125 * With `concurrency == 1` this operator is identical to [flattenConcat].
126 *
127 * ### Operator fusion
128 *
129 * Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
130 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
131 *
132 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
133 * at the same time. By default it is equal to [DEFAULT_CONCURRENCY].
134 */
135 @FlowPreview
136 public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
137 require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
138 return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
139 }
140
141 /**
142 * Returns a flow that produces element by [transform] function every time the original flow emits a value.
143 * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
144 *
145 * For example, the following flow:
146 * ```
147 * flow {
148 * emit("a")
149 * delay(100)
150 * emit("b")
151 * }.transformLatest { value ->
152 * emit(value)
153 * delay(200)
154 * emit(value + "_last")
155 * }
156 * ```
157 * produces `a b b_last`.
158 *
159 * This operator is [buffered][buffer] by default
160 * and size of its output buffer can be changed by applying subsequent [buffer] operator.
161 */
162 @ExperimentalCoroutinesApi
transformLatestnull163 public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
164 ChannelFlowTransformLatest(transform, this)
165
166 /**
167 * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
168 * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
169 *
170 * For example, the following flow:
171 * ```
172 * flow {
173 * emit("a")
174 * delay(100)
175 * emit("b")
176 * }.flatMapLatest { value ->
177 * flow {
178 * emit(value)
179 * delay(200)
180 * emit(value + "_last")
181 * }
182 * }
183 * ```
184 * produces `a b b_last`
185 *
186 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
187 */
188 @ExperimentalCoroutinesApi
189 public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
190 transformLatest { emitAll(transform(it)) }
191
192 /**
193 * Returns a flow that emits elements from the original flow transformed by [transform] function.
194 * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
195 *
196 * For example, the following flow:
197 * ```
198 * flow {
199 * emit("a")
200 * delay(100)
201 * emit("b")
202 * }.mapLatest { value ->
203 * println("Started computing $value")
204 * delay(200)
205 * "Computed $value"
206 * }
207 * ```
208 * will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
209 *
210 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
211 */
212 @ExperimentalCoroutinesApi
mapLatestnull213 public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
214 transformLatest { emit(transform(it)) }
215