1 /*
2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7
8 package kotlinx.coroutines.flow
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.flow.internal.*
12 import kotlin.jvm.*
13
14 /**
15 * Terminal flow operator that collects the given flow but ignores all emitted values.
16 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
17 *
18 * It is a shorthand for `collect {}`.
19 *
20 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
21 * handle an exception that might occur in the upstream flow or during processing, for example:
22 *
23 * ```
24 * flow
25 * .onEach { value -> process(value) }
26 * .catch { e -> handleException(e) }
27 * .collect() // trigger collection of the flow
28 * ```
29 */
collectnull30 public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
31
32 /**
33 * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
34 * It is a shorthand for `scope.launch { flow.collect() }`.
35 *
36 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
37 * handle an exception that might occur in the upstream flow or during processing, for example:
38 *
39 * ```
40 * flow
41 * .onEach { value -> updateUi(value) }
42 * .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
43 * .catch { cause -> LOG.error("Exception: $cause") }
44 * .launchIn(uiScope)
45 * ```
46 *
47 * Note that the resulting value of [launchIn] is not used and the provided scope takes care of cancellation.
48 */
49 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
50 collect() // tail-call
51 }
52
53 /**
54 * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
55 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
56 *
57 * See also [collect] and [withIndex].
58 */
collectIndexednull59 public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
60 collect(object : FlowCollector<T> {
61 private var index = 0
62 override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
63 })
64
65 /**
66 * Terminal flow operator that collects the given flow with a provided [action].
67 * The crucial difference from [collect] is that when the original flow emits a new value
68 * then the [action] block for the previous value is cancelled.
69 *
70 * It can be demonstrated by the following example:
71 *
72 * ```
73 * flow {
74 * emit(1)
75 * delay(50)
76 * emit(2)
77 * }.collectLatest { value ->
78 * println("Collecting $value")
79 * delay(100) // Emulate work
80 * println("$value collected")
81 * }
82 * ```
83 *
84 * prints "Collecting 1, Collecting 2, 2 collected"
85 */
collectLatestnull86 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
87 /*
88 * Implementation note:
89 * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
90 * ```
91 * flowOf(1, 2, 3).collectLatest {
92 * delay(1)
93 * println(it) // Expect only 3 to be printed
94 * }
95 * ```
96 *
97 * It's not the case for intermediate operators which users mostly use for interactive UI,
98 * where performance of dispatch is more important.
99 */
100 mapLatest(action).buffer(0).collect()
101 }
102
103 /**
104 * Collects all the values from the given [flow] and emits them to the collector.
105 * It is a shorthand for `flow.collect { value -> emit(value) }`.
106 */
emitAllnull107 public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
108 ensureActive()
109 flow.collect(this)
110 }
111
112 /** @suppress */
113 @Deprecated(level = DeprecationLevel.HIDDEN, message = "Backwards compatibility with JS and K/N")
collectnull114 public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
115 collect(object : FlowCollector<T> {
116 override suspend fun emit(value: T) = action(value)
117 })
118