1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
8
9 package kotlinx.coroutines.flow
10
11 import kotlinx.coroutines.flow.internal.*
12 import kotlin.jvm.*
13 import kotlinx.coroutines.flow.flow as safeFlow
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15
16 /**
17 * Returns a [Flow] whose values are generated with [transform] function by combining
18 * the most recently emitted values by each flow.
19 *
20 * It can be demonstrated with the following example:
21 * ```
22 * val flow = flowOf(1, 2).onEach { delay(10) }
23 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
24 * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
25 * println(it) // Will print "1a 2a 2b 2c"
26 * }
27 * ```
28 *
29 * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
30 */
31 @JvmName("flowCombine")
32 public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
33 combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
34 }
35
36 /**
37 * Returns a [Flow] whose values are generated with [transform] function by combining
38 * the most recently emitted values by each flow.
39 *
40 * It can be demonstrated with the following example:
41 * ```
42 * val flow = flowOf(1, 2).onEach { delay(10) }
43 * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
44 * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
45 * println(it) // Will print "1a 2a 2b 2c"
46 * }
47 * ```
48 *
49 * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
50 */
combinenull51 public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
52 flow.combine(flow2, transform)
53
54 /**
55 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
56 *
57 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
58 * generic function that may transform emitted element, skip it or emit it multiple times.
59 *
60 * Its usage can be demonstrated with the following example:
61 * ```
62 * val flow = requestFlow()
63 * val flow2 = searchEngineFlow()
64 * flow.combineTransform(flow2) { request, searchEngine ->
65 * emit("Downloading in progress")
66 * val result = download(request, searchEngine)
67 * emit(result)
68 * }
69 * ```
70 */
71 @JvmName("flowCombineTransform")
72 public fun <T1, T2, R> Flow<T1>.combineTransform(
73 flow: Flow<T2>,
74 @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
75 ): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
76 transform(
77 args[0] as T1,
78 args[1] as T2
79 )
80 }
81
82 /**
83 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
84 *
85 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
86 * generic function that may transform emitted element, skip it or emit it multiple times.
87 *
88 * Its usage can be demonstrated with the following example:
89 * ```
90 * val flow = requestFlow()
91 * val flow2 = searchEngineFlow()
92 * combineTransform(flow, flow2) { request, searchEngine ->
93 * emit("Downloading in progress")
94 * val result = download(request, searchEngine)
95 * emit(result)
96 * }
97 * ```
98 */
combineTransformnull99 public fun <T1, T2, R> combineTransform(
100 flow: Flow<T1>,
101 flow2: Flow<T2>,
102 @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
103 ): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
104 transform(
105 args[0] as T1,
106 args[1] as T2
107 )
108 }
109
110 /**
111 * Returns a [Flow] whose values are generated with [transform] function by combining
112 * the most recently emitted values by each flow.
113 */
combinenull114 public fun <T1, T2, T3, R> combine(
115 flow: Flow<T1>,
116 flow2: Flow<T2>,
117 flow3: Flow<T3>,
118 @BuilderInference transform: suspend (T1, T2, T3) -> R
119 ): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
120 transform(
121 args[0] as T1,
122 args[1] as T2,
123 args[2] as T3
124 )
125 }
126
127 /**
128 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
129 *
130 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
131 * generic function that may transform emitted element, skip it or emit it multiple times.
132 */
combineTransformnull133 public fun <T1, T2, T3, R> combineTransform(
134 flow: Flow<T1>,
135 flow2: Flow<T2>,
136 flow3: Flow<T3>,
137 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
138 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
139 transform(
140 args[0] as T1,
141 args[1] as T2,
142 args[2] as T3
143 )
144 }
145
146 /**
147 * Returns a [Flow] whose values are generated with [transform] function by combining
148 * the most recently emitted values by each flow.
149 */
combinenull150 public fun <T1, T2, T3, T4, R> combine(
151 flow: Flow<T1>,
152 flow2: Flow<T2>,
153 flow3: Flow<T3>,
154 flow4: Flow<T4>,
155 transform: suspend (T1, T2, T3, T4) -> R
156 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
157 transform(
158 args[0] as T1,
159 args[1] as T2,
160 args[2] as T3,
161 args[3] as T4
162 )
163 }
164
165 /**
166 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
167 *
168 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
169 * generic function that may transform emitted element, skip it or emit it multiple times.
170 */
combineTransformnull171 public fun <T1, T2, T3, T4, R> combineTransform(
172 flow: Flow<T1>,
173 flow2: Flow<T2>,
174 flow3: Flow<T3>,
175 flow4: Flow<T4>,
176 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
177 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
178 transform(
179 args[0] as T1,
180 args[1] as T2,
181 args[2] as T3,
182 args[3] as T4
183 )
184 }
185
186 /**
187 * Returns a [Flow] whose values are generated with [transform] function by combining
188 * the most recently emitted values by each flow.
189 */
combinenull190 public fun <T1, T2, T3, T4, T5, R> combine(
191 flow: Flow<T1>,
192 flow2: Flow<T2>,
193 flow3: Flow<T3>,
194 flow4: Flow<T4>,
195 flow5: Flow<T5>,
196 transform: suspend (T1, T2, T3, T4, T5) -> R
197 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
198 transform(
199 args[0] as T1,
200 args[1] as T2,
201 args[2] as T3,
202 args[3] as T4,
203 args[4] as T5
204 )
205 }
206
207 /**
208 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
209 *
210 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
211 * generic function that may transform emitted element, skip it or emit it multiple times.
212 */
combineTransformnull213 public fun <T1, T2, T3, T4, T5, R> combineTransform(
214 flow: Flow<T1>,
215 flow2: Flow<T2>,
216 flow3: Flow<T3>,
217 flow4: Flow<T4>,
218 flow5: Flow<T5>,
219 @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
220 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
221 transform(
222 args[0] as T1,
223 args[1] as T2,
224 args[2] as T3,
225 args[3] as T4,
226 args[4] as T5
227 )
228 }
229
230 /**
231 * Returns a [Flow] whose values are generated with [transform] function by combining
232 * the most recently emitted values by each flow.
233 */
combinenull234 public inline fun <reified T, R> combine(
235 vararg flows: Flow<T>,
236 crossinline transform: suspend (Array<T>) -> R
237 ): Flow<R> = flow {
238 combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
239 }
240
241 /**
242 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
243 *
244 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
245 * generic function that may transform emitted element, skip it or emit it multiple times.
246 */
combineTransformnull247 public inline fun <reified T, R> combineTransform(
248 vararg flows: Flow<T>,
249 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
250 ): Flow<R> = safeFlow {
251 combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
252 }
253
254 /*
255 * Same as combine, but does not copy array each time, deconstructing existing
256 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
257 */
combineUnsafenull258 private inline fun <reified T, R> combineUnsafe(
259 vararg flows: Flow<T>,
260 crossinline transform: suspend (Array<T>) -> R
261 ): Flow<R> = flow {
262 combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
263 }
264
265 /*
266 * Same as combineTransform, but does not copy array each time, deconstructing existing
267 * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
268 */
combineTransformUnsafenull269 private inline fun <reified T, R> combineTransformUnsafe(
270 vararg flows: Flow<T>,
271 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
272 ): Flow<R> = safeFlow {
273 combineInternal(flows, nullArrayFactory(), { transform(it) })
274 }
275
276 // Saves bunch of anonymous classes
<lambda>null277 private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
278
279 /**
280 * Returns a [Flow] whose values are generated with [transform] function by combining
281 * the most recently emitted values by each flow.
282 */
combinenull283 public inline fun <reified T, R> combine(
284 flows: Iterable<Flow<T>>,
285 crossinline transform: suspend (Array<T>) -> R
286 ): Flow<R> {
287 val flowArray = flows.toList().toTypedArray()
288 return flow {
289 combineInternal(
290 flowArray,
291 arrayFactory = { arrayOfNulls(flowArray.size) },
292 transform = { emit(transform(it)) })
293 }
294 }
295
296 /**
297 * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
298 *
299 * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
300 * generic function that may transform emitted element, skip it or emit it multiple times.
301 */
combineTransformnull302 public inline fun <reified T, R> combineTransform(
303 flows: Iterable<Flow<T>>,
304 @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
305 ): Flow<R> {
306 val flowArray = flows.toList().toTypedArray()
307 return safeFlow {
308 combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
309 }
310 }
311
312 /**
313 * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
314 * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
315 *
316 * It can be demonstrated with the following example:
317 * ```
318 * val flow = flowOf(1, 2, 3).onEach { delay(10) }
319 * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
320 * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
321 * println(it) // Will print "1a 2b 3c"
322 * }
323 * ```
324 *
325 * ### Buffering
326 *
327 * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
328 * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
329 * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
330 */
zipnull331 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
332