• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 @file:Suppress("UNCHECKED_CAST", "NON_APPLICABLE_CALL_FOR_BUILDER_INFERENCE") // KT-32203
8 
9 package kotlinx.coroutines.flow
10 
11 import kotlinx.coroutines.flow.internal.*
12 import kotlin.jvm.*
13 import kotlinx.coroutines.flow.flow as safeFlow
14 import kotlinx.coroutines.flow.internal.unsafeFlow as flow
15 
16 /**
17  * Returns a [Flow] whose values are generated with [transform] function by combining
18  * the most recently emitted values by each flow.
19  *
20  * It can be demonstrated with the following example:
21  * ```
22  * val flow = flowOf(1, 2).onEach { delay(10) }
23  * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
24  * flow.combine(flow2) { i, s -> i.toString() + s }.collect {
25  *     println(it) // Will print "1a 2a 2b 2c"
26  * }
27  * ```
28  *
29  * This function is a shorthand for `flow.combineTransform(flow2) { a, b -> emit(transform(a, b)) }
30  */
31 @JvmName("flowCombine")
32 public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> = flow {
33     combineInternal(arrayOf(this@combine, flow), nullArrayFactory(), { emit(transform(it[0] as T1, it[1] as T2)) })
34 }
35 
36 /**
37  * Returns a [Flow] whose values are generated with [transform] function by combining
38  * the most recently emitted values by each flow.
39  *
40  * It can be demonstrated with the following example:
41  * ```
42  * val flow = flowOf(1, 2).onEach { delay(10) }
43  * val flow2 = flowOf("a", "b", "c").onEach { delay(15) }
44  * combine(flow, flow2) { i, s -> i.toString() + s }.collect {
45  *     println(it) // Will print "1a 2a 2b 2c"
46  * }
47  * ```
48  *
49  * This function is a shorthand for `combineTransform(flow, flow2) { a, b -> emit(transform(a, b)) }
50  */
combinenull51 public fun <T1, T2, R> combine(flow: Flow<T1>, flow2: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R> =
52     flow.combine(flow2, transform)
53 
54 /**
55  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
56  *
57  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
58  * generic function that may transform emitted element, skip it or emit it multiple times.
59  *
60  * Its usage can be demonstrated with the following example:
61  * ```
62  * val flow = requestFlow()
63  * val flow2 = searchEngineFlow()
64  * flow.combineTransform(flow2) { request, searchEngine ->
65  *     emit("Downloading in progress")
66  *     val result = download(request, searchEngine)
67  *     emit(result)
68  * }
69  * ```
70  */
71 @JvmName("flowCombineTransform")
72 public fun <T1, T2, R> Flow<T1>.combineTransform(
73     flow: Flow<T2>,
74     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
75 ): Flow<R> = combineTransformUnsafe(this, flow) { args: Array<*> ->
76     transform(
77         args[0] as T1,
78         args[1] as T2
79     )
80 }
81 
82 /**
83  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
84  *
85  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
86  * generic function that may transform emitted element, skip it or emit it multiple times.
87  *
88  * Its usage can be demonstrated with the following example:
89  * ```
90  * val flow = requestFlow()
91  * val flow2 = searchEngineFlow()
92  * combineTransform(flow, flow2) { request, searchEngine ->
93  *     emit("Downloading in progress")
94  *     val result = download(request, searchEngine)
95  *     emit(result)
96  * }
97  * ```
98  */
combineTransformnull99 public fun <T1, T2, R> combineTransform(
100     flow: Flow<T1>,
101     flow2: Flow<T2>,
102     @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
103 ): Flow<R> = combineTransformUnsafe(flow, flow2) { args: Array<*> ->
104     transform(
105         args[0] as T1,
106         args[1] as T2
107     )
108 }
109 
110 /**
111  * Returns a [Flow] whose values are generated with [transform] function by combining
112  * the most recently emitted values by each flow.
113  */
combinenull114 public fun <T1, T2, T3, R> combine(
115     flow: Flow<T1>,
116     flow2: Flow<T2>,
117     flow3: Flow<T3>,
118     @BuilderInference transform: suspend (T1, T2, T3) -> R
119 ): Flow<R> = combineUnsafe(flow, flow2, flow3) { args: Array<*> ->
120     transform(
121         args[0] as T1,
122         args[1] as T2,
123         args[2] as T3
124     )
125 }
126 
127 /**
128  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
129  *
130  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
131  * generic function that may transform emitted element, skip it or emit it multiple times.
132  */
combineTransformnull133 public fun <T1, T2, T3, R> combineTransform(
134     flow: Flow<T1>,
135     flow2: Flow<T2>,
136     flow3: Flow<T3>,
137     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3) -> Unit
138 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3) { args: Array<*> ->
139     transform(
140         args[0] as T1,
141         args[1] as T2,
142         args[2] as T3
143     )
144 }
145 
146 /**
147  * Returns a [Flow] whose values are generated with [transform] function by combining
148  * the most recently emitted values by each flow.
149  */
combinenull150 public fun <T1, T2, T3, T4, R> combine(
151     flow: Flow<T1>,
152     flow2: Flow<T2>,
153     flow3: Flow<T3>,
154     flow4: Flow<T4>,
155     transform: suspend (T1, T2, T3, T4) -> R
156 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
157     transform(
158         args[0] as T1,
159         args[1] as T2,
160         args[2] as T3,
161         args[3] as T4
162     )
163 }
164 
165 /**
166  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
167  *
168  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
169  * generic function that may transform emitted element, skip it or emit it multiple times.
170  */
combineTransformnull171 public fun <T1, T2, T3, T4, R> combineTransform(
172     flow: Flow<T1>,
173     flow2: Flow<T2>,
174     flow3: Flow<T3>,
175     flow4: Flow<T4>,
176     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4) -> Unit
177 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4) { args: Array<*> ->
178     transform(
179         args[0] as T1,
180         args[1] as T2,
181         args[2] as T3,
182         args[3] as T4
183     )
184 }
185 
186 /**
187  * Returns a [Flow] whose values are generated with [transform] function by combining
188  * the most recently emitted values by each flow.
189  */
combinenull190 public fun <T1, T2, T3, T4, T5, R> combine(
191     flow: Flow<T1>,
192     flow2: Flow<T2>,
193     flow3: Flow<T3>,
194     flow4: Flow<T4>,
195     flow5: Flow<T5>,
196     transform: suspend (T1, T2, T3, T4, T5) -> R
197 ): Flow<R> = combineUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
198     transform(
199         args[0] as T1,
200         args[1] as T2,
201         args[2] as T3,
202         args[3] as T4,
203         args[4] as T5
204     )
205 }
206 
207 /**
208  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
209  *
210  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
211  * generic function that may transform emitted element, skip it or emit it multiple times.
212  */
combineTransformnull213 public fun <T1, T2, T3, T4, T5, R> combineTransform(
214     flow: Flow<T1>,
215     flow2: Flow<T2>,
216     flow3: Flow<T3>,
217     flow4: Flow<T4>,
218     flow5: Flow<T5>,
219     @BuilderInference transform: suspend FlowCollector<R>.(T1, T2, T3, T4, T5) -> Unit
220 ): Flow<R> = combineTransformUnsafe(flow, flow2, flow3, flow4, flow5) { args: Array<*> ->
221     transform(
222         args[0] as T1,
223         args[1] as T2,
224         args[2] as T3,
225         args[3] as T4,
226         args[4] as T5
227     )
228 }
229 
230 /**
231  * Returns a [Flow] whose values are generated with [transform] function by combining
232  * the most recently emitted values by each flow.
233  */
combinenull234 public inline fun <reified T, R> combine(
235     vararg flows: Flow<T>,
236     crossinline transform: suspend (Array<T>) -> R
237 ): Flow<R> = flow {
238     combineInternal(flows, { arrayOfNulls(flows.size) }, { emit(transform(it)) })
239 }
240 
241 /**
242  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
243  *
244  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
245  * generic function that may transform emitted element, skip it or emit it multiple times.
246  */
combineTransformnull247 public inline fun <reified T, R> combineTransform(
248     vararg flows: Flow<T>,
249     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
250 ): Flow<R> = safeFlow {
251     combineInternal(flows, { arrayOfNulls(flows.size) }, { transform(it) })
252 }
253 
254 /*
255  * Same as combine, but does not copy array each time, deconstructing existing
256  * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
257  */
combineUnsafenull258 private inline fun <reified T, R> combineUnsafe(
259     vararg flows: Flow<T>,
260     crossinline transform: suspend (Array<T>) -> R
261 ): Flow<R> = flow {
262     combineInternal(flows, nullArrayFactory(), { emit(transform(it)) })
263 }
264 
265 /*
266  * Same as combineTransform, but does not copy array each time, deconstructing existing
267  * array each time. Used in overloads that accept FunctionN instead of Function<Array<R>>
268  */
combineTransformUnsafenull269 private inline fun <reified T, R> combineTransformUnsafe(
270     vararg flows: Flow<T>,
271     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
272 ): Flow<R> = safeFlow {
273     combineInternal(flows, nullArrayFactory(), { transform(it) })
274 }
275 
276 // Saves bunch of anonymous classes
<lambda>null277 private fun <T> nullArrayFactory(): () -> Array<T>? = { null }
278 
279 /**
280  * Returns a [Flow] whose values are generated with [transform] function by combining
281  * the most recently emitted values by each flow.
282  */
combinenull283 public inline fun <reified T, R> combine(
284     flows: Iterable<Flow<T>>,
285     crossinline transform: suspend (Array<T>) -> R
286 ): Flow<R> {
287     val flowArray = flows.toList().toTypedArray()
288     return flow {
289         combineInternal(
290             flowArray,
291             arrayFactory = { arrayOfNulls(flowArray.size) },
292             transform = { emit(transform(it)) })
293     }
294 }
295 
296 /**
297  * Returns a [Flow] whose values are generated by [transform] function that process the most recently emitted values by each flow.
298  *
299  * The receiver of the [transform] is [FlowCollector] and thus `transform` is a
300  * generic function that may transform emitted element, skip it or emit it multiple times.
301  */
combineTransformnull302 public inline fun <reified T, R> combineTransform(
303     flows: Iterable<Flow<T>>,
304     @BuilderInference crossinline transform: suspend FlowCollector<R>.(Array<T>) -> Unit
305 ): Flow<R> {
306     val flowArray = flows.toList().toTypedArray()
307     return safeFlow {
308         combineInternal(flowArray, { arrayOfNulls(flowArray.size) }, { transform(it) })
309     }
310 }
311 
312 /**
313  * Zips values from the current flow (`this`) with [other] flow using provided [transform] function applied to each pair of values.
314  * The resulting flow completes as soon as one of the flows completes and cancel is called on the remaining flow.
315  *
316  * It can be demonstrated with the following example:
317  * ```
318  * val flow = flowOf(1, 2, 3).onEach { delay(10) }
319  * val flow2 = flowOf("a", "b", "c", "d").onEach { delay(15) }
320  * flow.zip(flow2) { i, s -> i.toString() + s }.collect {
321  *     println(it) // Will print "1a 2b 3c"
322  * }
323  * ```
324  *
325  * ### Buffering
326  *
327  * The upstream flow is collected sequentially in the same coroutine without any buffering, while the
328  * [other] flow is collected concurrently as if `buffer(0)` is used. See documentation in the [buffer] operator
329  * for explanation. You can use additional calls to the [buffer] operator as needed for more concurrency.
330  */
zipnull331 public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)
332