1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("unused")
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.*
12 import kotlinx.coroutines.channels.*
13 import kotlinx.coroutines.flow.internal.*
14 import kotlinx.coroutines.internal.*
15 import kotlin.jvm.*
16 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
17
18 /**
19 * Name of the property that defines the value of [DEFAULT_CONCURRENCY].
20 * This is a preview API and can be changed in a backwards-incompatible manner within a single release.
21 */
22 @FlowPreview
23 public const val DEFAULT_CONCURRENCY_PROPERTY_NAME: String = "kotlinx.coroutines.flow.defaultConcurrency"
24
25 /**
26 * Default concurrency limit that is used by [flattenMerge] and [flatMapMerge] operators.
27 * It is 16 by default and can be changed on JVM using [DEFAULT_CONCURRENCY_PROPERTY_NAME] property.
28 * This is a preview API and can be changed in a backwards-incompatible manner within a single release.
29 */
30 @FlowPreview
31 public val DEFAULT_CONCURRENCY: Int = systemProp(
32 DEFAULT_CONCURRENCY_PROPERTY_NAME,
33 16, 1, Int.MAX_VALUE
34 )
35
36 /**
37 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
38 * and then concatenating and flattening these flows.
39 *
40 * This method is a shortcut for `map(transform).flattenConcat()`. See [flattenConcat].
41 *
42 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
43 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
44 */
45 @ExperimentalCoroutinesApi
46 public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
47 map(transform).flattenConcat()
48
49 /**
50 * Transforms elements emitted by the original flow by applying [transform], that returns another flow,
51 * and then merging and flattening these flows.
52 *
53 * This operator calls [transform] *sequentially* and then merges the resulting flows with a [concurrency]
54 * limit on the number of concurrently collected flows.
55 * It is a shortcut for `map(transform).flattenMerge(concurrency)`.
56 * See [flattenMerge] for details.
57 *
58 * Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows.
59 * Most likely, suspending operation in [map] operator will be sufficient and linear transformations are much easier to reason about.
60 *
61 * ### Operator fusion
62 *
63 * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
64 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
65 *
66 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
67 * at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
68 */
69 @ExperimentalCoroutinesApi
70 public fun <T, R> Flow<T>.flatMapMerge(
71 concurrency: Int = DEFAULT_CONCURRENCY,
72 transform: suspend (value: T) -> Flow<R>
73 ): Flow<R> =
74 map(transform).flattenMerge(concurrency)
75
76 /**
77 * Flattens the given flow of flows into a single flow in a sequential manner, without interleaving nested flows.
78 *
79 * Inner flows are collected by this operator *sequentially*.
80 */
81 @ExperimentalCoroutinesApi
82 public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
83 collect { value -> emitAll(value) }
84 }
85
86 /**
87 * Merges the given flows into a single flow without preserving an order of elements.
88 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
89 *
90 * ### Operator fusion
91 *
92 * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
93 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
94 */
mergenull95 public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
96 /*
97 * This is a fuseable implementation of the following operator:
98 * channelFlow {
99 * forEach { flow ->
100 * launch {
101 * flow.collect { send(it) }
102 * }
103 * }
104 * }
105 */
106 return ChannelLimitedFlowMerge(this)
107 }
108
109 /**
110 * Merges the given flows into a single flow without preserving an order of elements.
111 * All flows are merged concurrently, without limit on the number of simultaneously collected flows.
112 *
113 * ### Operator fusion
114 *
115 * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
116 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
117 */
mergenull118 public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()
119
120 /**
121 * Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
122 * concurrently collected flows.
123 *
124 * If [concurrency] is more than 1, then inner flows are collected by this operator *concurrently*.
125 * With `concurrency == 1` this operator is identical to [flattenConcat].
126 *
127 * ### Operator fusion
128 *
129 * Applications of [flowOn], [buffer], and [produceIn] _after_ this operator are fused with
130 * its concurrent merging so that only one properly configured channel is used for execution of merging logic.
131 *
132 * When [concurrency] is greater than 1, this operator is [buffered][buffer] by default
133 * and size of its output buffer can be changed by applying subsequent [buffer] operator.
134 *
135 * @param concurrency controls the number of in-flight flows, at most [concurrency] flows are collected
136 * at the same time. By default, it is equal to [DEFAULT_CONCURRENCY].
137 */
138 @ExperimentalCoroutinesApi
139 public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
140 require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
141 return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
142 }
143
144 /**
145 * Returns a flow that produces element by [transform] function every time the original flow emits a value.
146 * When the original flow emits a new value, the previous `transform` block is cancelled, thus the name `transformLatest`.
147 *
148 * For example, the following flow:
149 * ```
150 * flow {
151 * emit("a")
152 * delay(100)
153 * emit("b")
154 * }.transformLatest { value ->
155 * emit(value)
156 * delay(200)
157 * emit(value + "_last")
158 * }
159 * ```
160 * produces `a b b_last`.
161 *
162 * This operator is [buffered][buffer] by default
163 * and size of its output buffer can be changed by applying subsequent [buffer] operator.
164 */
165 @ExperimentalCoroutinesApi
transformLatestnull166 public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
167 ChannelFlowTransformLatest(transform, this)
168
169 /**
170 * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
171 * When the original flow emits a new value, the previous flow produced by `transform` block is cancelled.
172 *
173 * For example, the following flow:
174 * ```
175 * flow {
176 * emit("a")
177 * delay(100)
178 * emit("b")
179 * }.flatMapLatest { value ->
180 * flow {
181 * emit(value)
182 * delay(200)
183 * emit(value + "_last")
184 * }
185 * }
186 * ```
187 * produces `a b b_last`
188 *
189 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
190 */
191 @ExperimentalCoroutinesApi
192 public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
193 transformLatest { emitAll(transform(it)) }
194
195 /**
196 * Returns a flow that emits elements from the original flow transformed by [transform] function.
197 * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
198 *
199 * For example, the following flow:
200 * ```
201 * flow {
202 * emit("a")
203 * delay(100)
204 * emit("b")
205 * }.mapLatest { value ->
206 * println("Started computing $value")
207 * delay(200)
208 * "Computed $value"
209 * }
210 * ```
211 * will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
212 *
213 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
214 */
215 @ExperimentalCoroutinesApi
mapLatestnull216 public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
217 transformLatest { emit(transform(it)) }
218