1 /*
2 * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7
8 package kotlinx.coroutines.flow
9
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.flow.internal.*
12 import kotlin.jvm.*
13
14 /**
15 * Terminal flow operator that collects the given flow but ignores all emitted values.
16 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
17 *
18 * It is a shorthand for `collect {}`.
19 *
20 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
21 * handle an exception that might occur in the upstream flow or during processing, for example:
22 *
23 * ```
24 * flow
25 * .onEach { value -> process(value) }
26 * .catch { e -> handleException(e) }
27 * .collect() // trigger collection of the flow
28 * ```
29 */
collectnull30 public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
31
32 /**
33 * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
34 * It is a shorthand for `scope.launch { flow.collect() }`.
35 *
36 * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
37 * handle an exception that might occur in the upstream flow or during processing, for example:
38 *
39 * ```
40 * flow
41 * .onEach { value -> updateUi(value) }
42 * .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
43 * .catch { cause -> LOG.error("Exception: $cause") }
44 * .launchIn(uiScope)
45 * ```
46 *
47 * Note that resulting value of [launchIn] is not used the provided scope takes care of cancellation.
48 */
49 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
50 collect() // tail-call
51 }
52
53 /**
54 * Terminal flow operator that collects the given flow with a provided [action].
55 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
56 *
57 * Example of use:
58 *
59 * ```
60 * val flow = getMyEvents()
61 * try {
62 * flow.collect { value ->
63 * println("Received $value")
64 * }
65 * println("My events are consumed successfully")
66 * } catch (e: Throwable) {
67 * println("Exception from the flow: $e")
68 * }
69 * ```
70 */
collectnull71 public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
72 collect(object : FlowCollector<T> {
73 override suspend fun emit(value: T) = action(value)
74 })
75
76 /**
77 * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
78 * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
79 *
80 * See also [collect] and [withIndex].
81 */
collectIndexednull82 public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
83 collect(object : FlowCollector<T> {
84 private var index = 0
85 override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
86 })
87
88 /**
89 * Terminal flow operator that collects the given flow with a provided [action].
90 * The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
91 * value is cancelled.
92 *
93 * It can be demonstrated by the following example:
94 *
95 * ```
96 * flow {
97 * emit(1)
98 * delay(50)
99 * emit(2)
100 * }.collectLatest { value ->
101 * println("Collecting $value")
102 * delay(100) // Emulate work
103 * println("$value collected")
104 * }
105 * ```
106 *
107 * prints "Collecting 1, Collecting 2, 2 collected"
108 */
collectLatestnull109 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
110 /*
111 * Implementation note:
112 * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
113 * ```
114 * flowOf(1, 2, 3).collectLatest {
115 * delay(1)
116 * println(it) // Expect only 3 to be printed
117 * }
118 * ```
119 *
120 * It's not the case for intermediate operators which users mostly use for interactive UI,
121 * where performance of dispatch is more important.
122 */
123 mapLatest(action).buffer(0).collect()
124 }
125
126 /**
127 * Collects all the values from the given [flow] and emits them to the collector.
128 * It is a shorthand for `flow.collect { value -> emit(value) }`.
129 */
130 @BuilderInference
emitAllnull131 public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit = flow.collect(this)
132