• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 @file:JvmMultifileClass
6 @file:JvmName("FlowKt")
7 
8 package kotlinx.coroutines.flow
9 
10 import kotlinx.coroutines.*
11 import kotlinx.coroutines.flow.internal.*
12 import kotlin.jvm.*
13 
14 /**
15  * Terminal flow operator that collects the given flow but ignores all emitted values.
16  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
17  *
18  * It is a shorthand for `collect {}`.
19  *
20  * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values and
21  * handle an exception that might occur in the upstream flow or during processing, for example:
22  *
23  * ```
24  * flow
25  *     .onEach { value -> process(value) }
26  *     .catch { e -> handleException(e) }
27  *     .collect() // trigger collection of the flow
28  * ```
29  */
collectnull30 public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
31 
32 /**
33  * Terminal flow operator that [launches][launch] the [collection][collect] of the given flow in the [scope].
34  * It is a shorthand for `scope.launch { flow.collect() }`.
35  *
36  * This operator is usually used with [onEach], [onCompletion] and [catch] operators to process all emitted values
37  * handle an exception that might occur in the upstream flow or during processing, for example:
38  *
39  * ```
40  * flow
41  *     .onEach { value -> updateUi(value) }
42  *     .onCompletion { cause -> updateUi(if (cause == null) "Done" else "Failed") }
43  *     .catch { cause -> LOG.error("Exception: $cause") }
44  *     .launchIn(uiScope)
45  * ```
46  *
47  * Note that resulting value of [launchIn] is not used the provided scope takes care of cancellation.
48  */
49 public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
50     collect() // tail-call
51 }
52 
53 /**
54  * Terminal flow operator that collects the given flow with a provided [action].
55  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
56  *
57  * Example of use:
58  *
59  * ```
60  * val flow = getMyEvents()
61  * try {
62  *     flow.collect { value ->
63  *         println("Received $value")
64  *     }
65  *     println("My events are consumed successfully")
66  * } catch (e: Throwable) {
67  *     println("Exception from the flow: $e")
68  * }
69  * ```
70  */
collectnull71 public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
72     collect(object : FlowCollector<T> {
73         override suspend fun emit(value: T) = action(value)
74     })
75 
76 /**
77  * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
78  * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
79  *
80  * See also [collect] and [withIndex].
81  */
collectIndexednull82 public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
83     collect(object : FlowCollector<T> {
84         private var index = 0
85         override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
86     })
87 
88 /**
89  * Terminal flow operator that collects the given flow with a provided [action].
90  * The crucial difference from [collect] is that when the original flow emits a new value, [action] block for previous
91  * value is cancelled.
92  *
93  * It can be demonstrated by the following example:
94  *
95  * ```
96  * flow {
97  *     emit(1)
98  *     delay(50)
99  *     emit(2)
100  * }.collectLatest { value ->
101  *     println("Collecting $value")
102  *     delay(100) // Emulate work
103  *     println("$value collected")
104  * }
105  * ```
106  *
107  * prints "Collecting 1, Collecting 2, 2 collected"
108  */
collectLatestnull109 public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
110     /*
111      * Implementation note:
112      * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
113      * ```
114      * flowOf(1, 2, 3).collectLatest {
115      *     delay(1)
116      *     println(it) // Expect only 3 to be printed
117      * }
118      * ```
119      *
120      * It's not the case for intermediate operators which users mostly use for interactive UI,
121      * where performance of dispatch is more important.
122      */
123     mapLatest(action).buffer(0).collect()
124 }
125 
126 /**
127  * Collects all the values from the given [flow] and emits them to the collector.
128  * It is a shorthand for `flow.collect { value -> emit(value) }`.
129  */
130 @BuilderInference
emitAllnull131 public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit = flow.collect(this)
132